upload-service/internal/r2/r2.go

171 lines
4.2 KiB
Go

package r2
import (
"context"
"fmt"
"io"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
"github.com/jeffemmett/upload-service/internal/config"
)
const (
multipartThreshold = 100 * 1024 * 1024 // 100MB
partSize = 64 * 1024 * 1024 // 64MB chunks
presignExpiry = 5 * time.Minute
)
type Client struct {
s3 *s3.Client
presign *s3.PresignClient
bucket string
}
func NewClient(cfg *config.Config) *Client {
s3Client := s3.New(s3.Options{
BaseEndpoint: aws.String(cfg.R2Endpoint),
Region: "auto",
Credentials: credentials.NewStaticCredentialsProvider(cfg.R2AccessKeyID, cfg.R2SecretAccessKey, ""),
})
return &Client{
s3: s3Client,
presign: s3.NewPresignClient(s3Client),
bucket: cfg.R2BucketName,
}
}
// Upload streams the reader to R2. For files > 100MB, uses multipart upload.
func (c *Client) Upload(ctx context.Context, key, contentType string, size int64, body io.Reader) error {
if size > multipartThreshold {
return c.uploadMultipart(ctx, key, contentType, body)
}
return c.uploadSimple(ctx, key, contentType, body)
}
func (c *Client) uploadSimple(ctx context.Context, key, contentType string, body io.Reader) error {
_, err := c.s3.PutObject(ctx, &s3.PutObjectInput{
Bucket: &c.bucket,
Key: &key,
Body: body,
ContentType: &contentType,
})
return err
}
func (c *Client) uploadMultipart(ctx context.Context, key, contentType string, body io.Reader) error {
create, err := c.s3.CreateMultipartUpload(ctx, &s3.CreateMultipartUploadInput{
Bucket: &c.bucket,
Key: &key,
ContentType: &contentType,
})
if err != nil {
return fmt.Errorf("create multipart: %w", err)
}
uploadID := create.UploadId
var parts []types.CompletedPart
buf := make([]byte, partSize)
partNum := int32(1)
for {
n, readErr := io.ReadFull(body, buf)
if n == 0 && readErr != nil {
break
}
upload, err := c.s3.UploadPart(ctx, &s3.UploadPartInput{
Bucket: &c.bucket,
Key: &key,
UploadId: uploadID,
PartNumber: &partNum,
Body: io.NopCloser(io.LimitReader(bytesReader(buf[:n]), int64(n))),
})
if err != nil {
c.s3.AbortMultipartUpload(ctx, &s3.AbortMultipartUploadInput{
Bucket: &c.bucket,
Key: &key,
UploadId: uploadID,
})
return fmt.Errorf("upload part %d: %w", partNum, err)
}
parts = append(parts, types.CompletedPart{
PartNumber: &partNum,
ETag: upload.ETag,
})
partNum++
if readErr != nil {
break
}
}
_, err = c.s3.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{
Bucket: &c.bucket,
Key: &key,
UploadId: uploadID,
MultipartUpload: &types.CompletedMultipartUpload{
Parts: parts,
},
})
if err != nil {
return fmt.Errorf("complete multipart: %w", err)
}
return nil
}
// PresignGet generates a presigned download URL valid for 5 minutes.
// If inline is true, uses inline content-disposition (view in browser).
func (c *Client) PresignGet(ctx context.Context, key, filename string, inline bool) (string, error) {
disposition := fmt.Sprintf(`attachment; filename="%s"`, filename)
if inline {
disposition = fmt.Sprintf(`inline; filename="%s"`, filename)
}
resp, err := c.presign.PresignGetObject(ctx, &s3.GetObjectInput{
Bucket: &c.bucket,
Key: &key,
ResponseContentDisposition: aws.String(disposition),
}, s3.WithPresignExpires(presignExpiry))
if err != nil {
return "", err
}
return resp.URL, nil
}
// Delete removes an object from R2.
func (c *Client) Delete(ctx context.Context, key string) error {
_, err := c.s3.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: &c.bucket,
Key: &key,
})
return err
}
// bytesReader wraps a byte slice as an io.Reader.
type bytesReaderImpl struct {
data []byte
pos int
}
func bytesReader(b []byte) io.Reader {
// Make a copy so the buffer can be reused
cp := make([]byte, len(b))
copy(cp, b)
return &bytesReaderImpl{data: cp}
}
func (r *bytesReaderImpl) Read(p []byte) (int, error) {
if r.pos >= len(r.data) {
return 0, io.EOF
}
n := copy(p, r.data[r.pos:])
r.pos += n
return n, nil
}