package r2 import ( "context" "fmt" "io" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/jeffemmett/upload-service/internal/config" ) const ( multipartThreshold = 100 * 1024 * 1024 // 100MB partSize = 64 * 1024 * 1024 // 64MB chunks presignExpiry = 5 * time.Minute ) type Client struct { s3 *s3.Client presign *s3.PresignClient bucket string } func NewClient(cfg *config.Config) *Client { s3Client := s3.New(s3.Options{ BaseEndpoint: aws.String(cfg.R2Endpoint), Region: "auto", Credentials: credentials.NewStaticCredentialsProvider(cfg.R2AccessKeyID, cfg.R2SecretAccessKey, ""), }) return &Client{ s3: s3Client, presign: s3.NewPresignClient(s3Client), bucket: cfg.R2BucketName, } } // Upload streams the reader to R2. For files > 100MB, uses multipart upload. func (c *Client) Upload(ctx context.Context, key, contentType string, size int64, body io.Reader) error { if size > multipartThreshold { return c.uploadMultipart(ctx, key, contentType, body) } return c.uploadSimple(ctx, key, contentType, body) } func (c *Client) uploadSimple(ctx context.Context, key, contentType string, body io.Reader) error { _, err := c.s3.PutObject(ctx, &s3.PutObjectInput{ Bucket: &c.bucket, Key: &key, Body: body, ContentType: &contentType, }) return err } func (c *Client) uploadMultipart(ctx context.Context, key, contentType string, body io.Reader) error { create, err := c.s3.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ Bucket: &c.bucket, Key: &key, ContentType: &contentType, }) if err != nil { return fmt.Errorf("create multipart: %w", err) } uploadID := create.UploadId var parts []types.CompletedPart buf := make([]byte, partSize) partNum := int32(1) for { n, readErr := io.ReadFull(body, buf) if n == 0 && readErr != nil { break } upload, err := c.s3.UploadPart(ctx, &s3.UploadPartInput{ Bucket: &c.bucket, Key: &key, UploadId: uploadID, PartNumber: &partNum, Body: io.NopCloser(io.LimitReader(bytesReader(buf[:n]), int64(n))), }) if err != nil { c.s3.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ Bucket: &c.bucket, Key: &key, UploadId: uploadID, }) return fmt.Errorf("upload part %d: %w", partNum, err) } parts = append(parts, types.CompletedPart{ PartNumber: &partNum, ETag: upload.ETag, }) partNum++ if readErr != nil { break } } _, err = c.s3.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ Bucket: &c.bucket, Key: &key, UploadId: uploadID, MultipartUpload: &types.CompletedMultipartUpload{ Parts: parts, }, }) if err != nil { return fmt.Errorf("complete multipart: %w", err) } return nil } // PresignGet generates a presigned download URL valid for 5 minutes. func (c *Client) PresignGet(ctx context.Context, key, filename string) (string, error) { resp, err := c.presign.PresignGetObject(ctx, &s3.GetObjectInput{ Bucket: &c.bucket, Key: &key, ResponseContentDisposition: aws.String(fmt.Sprintf(`attachment; filename="%s"`, filename)), }, s3.WithPresignExpires(presignExpiry)) if err != nil { return "", err } return resp.URL, nil } // Delete removes an object from R2. func (c *Client) Delete(ctx context.Context, key string) error { _, err := c.s3.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: &c.bucket, Key: &key, }) return err } // bytesReader wraps a byte slice as an io.Reader. type bytesReaderImpl struct { data []byte pos int } func bytesReader(b []byte) io.Reader { // Make a copy so the buffer can be reused cp := make([]byte, len(b)) copy(cp, b) return &bytesReaderImpl{data: cp} } func (r *bytesReaderImpl) Read(p []byte) (int, error) { if r.pos >= len(r.data) { return 0, io.EOF } n := copy(p, r.data[r.pos:]) r.pos += n return n, nil }