package r2 import ( "bytes" "context" "fmt" "io" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/jeffemmett/upload-service/internal/config" ) const ( multipartThreshold = 100 * 1024 * 1024 // 100MB partSize = 64 * 1024 * 1024 // 64MB chunks presignExpiry = 5 * time.Minute ) type Client struct { s3 *s3.Client presign *s3.PresignClient bucket string } func NewClient(cfg *config.Config) *Client { s3Client := s3.New(s3.Options{ BaseEndpoint: aws.String(cfg.R2Endpoint), Region: "auto", Credentials: credentials.NewStaticCredentialsProvider(cfg.R2AccessKeyID, cfg.R2SecretAccessKey, ""), }) return &Client{ s3: s3Client, presign: s3.NewPresignClient(s3Client), bucket: cfg.R2BucketName, } } // Upload streams the reader to R2. For files > 100MB, uses multipart upload. func (c *Client) Upload(ctx context.Context, key, contentType string, size int64, body io.Reader) error { if size > multipartThreshold { return c.uploadMultipart(ctx, key, contentType, body) } return c.uploadSimple(ctx, key, contentType, body) } func (c *Client) uploadSimple(ctx context.Context, key, contentType string, body io.Reader) error { // R2 requires Content-Length; buffer the body so we can set it. // Only used for files < 100MB, so memory usage is bounded. data, err := io.ReadAll(body) if err != nil { return fmt.Errorf("read body: %w", err) } size := int64(len(data)) _, err = c.s3.PutObject(ctx, &s3.PutObjectInput{ Bucket: &c.bucket, Key: &key, Body: bytes.NewReader(data), ContentType: &contentType, ContentLength: &size, }) return err } func (c *Client) uploadMultipart(ctx context.Context, key, contentType string, body io.Reader) error { create, err := c.s3.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{ Bucket: &c.bucket, Key: &key, ContentType: &contentType, }) if err != nil { return fmt.Errorf("create multipart: %w", err) } uploadID := create.UploadId var parts []types.CompletedPart buf := make([]byte, partSize) partNum := int32(1) for { n, readErr := io.ReadFull(body, buf) if n == 0 && readErr != nil { break } partData := make([]byte, n) copy(partData, buf[:n]) partLen := int64(n) upload, err := c.s3.UploadPart(ctx, &s3.UploadPartInput{ Bucket: &c.bucket, Key: &key, UploadId: uploadID, PartNumber: &partNum, Body: bytes.NewReader(partData), ContentLength: &partLen, }) if err != nil { c.s3.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{ Bucket: &c.bucket, Key: &key, UploadId: uploadID, }) return fmt.Errorf("upload part %d: %w", partNum, err) } parts = append(parts, types.CompletedPart{ PartNumber: &partNum, ETag: upload.ETag, }) partNum++ if readErr != nil { break } } _, err = c.s3.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ Bucket: &c.bucket, Key: &key, UploadId: uploadID, MultipartUpload: &types.CompletedMultipartUpload{ Parts: parts, }, }) if err != nil { return fmt.Errorf("complete multipart: %w", err) } return nil } // PresignGet generates a presigned download URL valid for 5 minutes. // If inline is true, uses inline content-disposition (view in browser). func (c *Client) PresignGet(ctx context.Context, key, filename string, inline bool) (string, error) { disposition := fmt.Sprintf(`attachment; filename="%s"`, filename) if inline { disposition = fmt.Sprintf(`inline; filename="%s"`, filename) } resp, err := c.presign.PresignGetObject(ctx, &s3.GetObjectInput{ Bucket: &c.bucket, Key: &key, ResponseContentDisposition: aws.String(disposition), }, s3.WithPresignExpires(presignExpiry)) if err != nil { return "", err } return resp.URL, nil } // Delete removes an object from R2. func (c *Client) Delete(ctx context.Context, key string) error { _, err := c.s3.DeleteObject(ctx, &s3.DeleteObjectInput{ Bucket: &c.bucket, Key: &key, }) return err }