upload-service/internal/r2/r2.go

163 lines
4.1 KiB
Go

package r2
import (
"bytes"
"context"
"fmt"
"io"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/jeffemmett/upload-service/internal/config"
)
const (
multipartThreshold = 100 * 1024 * 1024 // 100MB
partSize = 64 * 1024 * 1024 // 64MB chunks
presignExpiry = 5 * time.Minute
)
type Client struct {
s3 *s3.Client
presign *s3.PresignClient
bucket string
}
func NewClient(cfg *config.Config) *Client {
s3Client := s3.New(s3.Options{
BaseEndpoint: aws.String(cfg.R2Endpoint),
Region: "auto",
Credentials: credentials.NewStaticCredentialsProvider(cfg.R2AccessKeyID, cfg.R2SecretAccessKey, ""),
})
return &Client{
s3: s3Client,
presign: s3.NewPresignClient(s3Client),
bucket: cfg.R2BucketName,
}
}
// Upload streams the reader to R2. For files > 100MB, uses multipart upload.
func (c *Client) Upload(ctx context.Context, key, contentType string, size int64, body io.Reader) error {
if size > multipartThreshold {
return c.uploadMultipart(ctx, key, contentType, body)
}
return c.uploadSimple(ctx, key, contentType, body)
}
func (c *Client) uploadSimple(ctx context.Context, key, contentType string, body io.Reader) error {
// R2 requires Content-Length; buffer the body so we can set it.
// Only used for files < 100MB, so memory usage is bounded.
data, err := io.ReadAll(body)
if err != nil {
return fmt.Errorf("read body: %w", err)
}
size := int64(len(data))
_, err = c.s3.PutObject(ctx, &s3.PutObjectInput{
Bucket: &c.bucket,
Key: &key,
Body: bytes.NewReader(data),
ContentType: &contentType,
ContentLength: &size,
})
return err
}
func (c *Client) uploadMultipart(ctx context.Context, key, contentType string, body io.Reader) error {
create, err := c.s3.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: &c.bucket,
Key: &key,
ContentType: &contentType,
})
if err != nil {
return fmt.Errorf("create multipart: %w", err)
}
uploadID := create.UploadId
var parts []types.CompletedPart
buf := make([]byte, partSize)
partNum := int32(1)
for {
n, readErr := io.ReadFull(body, buf)
if n == 0 && readErr != nil {
break
}
partData := make([]byte, n)
copy(partData, buf[:n])
partLen := int64(n)
upload, err := c.s3.UploadPart(ctx, &s3.UploadPartInput{
Bucket: &c.bucket,
Key: &key,
UploadId: uploadID,
PartNumber: &partNum,
Body: bytes.NewReader(partData),
ContentLength: &partLen,
})
if err != nil {
c.s3.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: &c.bucket,
Key: &key,
UploadId: uploadID,
})
return fmt.Errorf("upload part %d: %w", partNum, err)
}
parts = append(parts, types.CompletedPart{
PartNumber: &partNum,
ETag: upload.ETag,
})
partNum++
if readErr != nil {
break
}
}
_, err = c.s3.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &c.bucket,
Key: &key,
UploadId: uploadID,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: parts,
},
})
if err != nil {
return fmt.Errorf("complete multipart: %w", err)
}
return nil
}
// PresignGet generates a presigned download URL valid for 5 minutes.
// If inline is true, uses inline content-disposition (view in browser).
func (c *Client) PresignGet(ctx context.Context, key, filename string, inline bool) (string, error) {
disposition := fmt.Sprintf(`attachment; filename="%s"`, filename)
if inline {
disposition = fmt.Sprintf(`inline; filename="%s"`, filename)
}
resp, err := c.presign.PresignGetObject(ctx, &s3.GetObjectInput{
Bucket: &c.bucket,
Key: &key,
ResponseContentDisposition: aws.String(disposition),
}, s3.WithPresignExpires(presignExpiry))
if err != nil {
return "", err
}
return resp.URL, nil
}
// Delete removes an object from R2.
func (c *Client) Delete(ctx context.Context, key string) error {
_, err := c.s3.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: &c.bucket,
Key: &key,
})
return err
}