Files
warpbox-dev/backend/libs/services/storage_s3.go

123 lines
3.9 KiB
Go
Raw Normal View History

package services
import (
"bytes"
"context"
"fmt"
"io"
"net/url"
"strings"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
type s3StorageBackend struct {
cfg StorageBackendConfig
client *minio.Client
}
func newS3StorageBackend(cfg StorageBackendConfig) (*s3StorageBackend, error) {
endpoint := normalizeS3Endpoint(cfg.Endpoint)
client, err := minio.New(endpoint, &minio.Options{
Creds: credentials.NewStaticV4(cfg.AccessKey, cfg.SecretKey, ""),
Secure: cfg.UseSSL,
Region: cfg.Region,
BucketLookup: s3BucketLookup(cfg.PathStyle),
})
if err != nil {
return nil, err
}
return &s3StorageBackend{cfg: cfg, client: client}, nil
}
func (b *s3StorageBackend) ID() string { return b.cfg.ID }
func (b *s3StorageBackend) Type() string { return StorageBackendS3 }
func (b *s3StorageBackend) Put(ctx context.Context, key string, body io.Reader, size int64, contentType string) error {
cleanKey := cleanObjectKey(key)
opts := minio.PutObjectOptions{ContentType: contentType}
_, err := b.client.PutObject(ctx, b.cfg.Bucket, cleanKey, body, size, opts)
if err != nil {
return fmt.Errorf("s3 put object %q in bucket %q failed: %w", cleanKey, b.cfg.Bucket, err)
}
return nil
}
func (b *s3StorageBackend) Get(ctx context.Context, key string) (StorageObject, error) {
cleanKey := cleanObjectKey(key)
object, err := b.client.GetObject(ctx, b.cfg.Bucket, cleanKey, minio.GetObjectOptions{})
if err != nil {
return StorageObject{}, fmt.Errorf("s3 get object %q from bucket %q failed: %w", cleanKey, b.cfg.Bucket, err)
}
info, err := object.Stat()
if err != nil {
object.Close()
return StorageObject{}, fmt.Errorf("s3 stat object %q in bucket %q failed: %w", cleanKey, b.cfg.Bucket, err)
}
return StorageObject{Key: key, Size: info.Size, ContentType: info.ContentType, ModTime: info.LastModified, Body: object}, nil
}
func (b *s3StorageBackend) Delete(ctx context.Context, key string) error {
cleanKey := cleanObjectKey(key)
if err := b.client.RemoveObject(ctx, b.cfg.Bucket, cleanKey, minio.RemoveObjectOptions{}); err != nil {
return fmt.Errorf("s3 delete object %q from bucket %q failed: %w", cleanKey, b.cfg.Bucket, err)
}
return nil
}
func (b *s3StorageBackend) DeletePrefix(ctx context.Context, prefix string) error {
prefix = strings.TrimSuffix(cleanObjectKey(prefix), "/") + "/"
objects := b.client.ListObjects(ctx, b.cfg.Bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: true})
for object := range objects {
if object.Err != nil {
return fmt.Errorf("s3 list prefix %q in bucket %q failed: %w", prefix, b.cfg.Bucket, object.Err)
}
if err := b.Delete(ctx, object.Key); err != nil {
return err
}
}
return nil
}
func (b *s3StorageBackend) Usage(ctx context.Context) (int64, error) {
var total int64
for object := range b.client.ListObjects(ctx, b.cfg.Bucket, minio.ListObjectsOptions{Recursive: true}) {
if object.Err != nil {
return 0, fmt.Errorf("s3 usage list bucket %q failed: %w", b.cfg.Bucket, object.Err)
}
total += object.Size
}
return total, nil
}
func (b *s3StorageBackend) Test(ctx context.Context) error {
exists, err := b.client.BucketExists(ctx, b.cfg.Bucket)
if err != nil {
return fmt.Errorf("s3 bucket check for %q failed: %w", b.cfg.Bucket, err)
}
if !exists {
return fmt.Errorf("bucket %q does not exist", b.cfg.Bucket)
}
key := ".warpbox-storage-test-" + randomID(6)
if err := b.Put(ctx, key, bytes.NewReader([]byte("ok")), 2, "text/plain"); err != nil {
return err
}
return b.Delete(ctx, key)
}
func s3BucketLookup(pathStyle bool) minio.BucketLookupType {
if pathStyle {
return minio.BucketLookupPath
}
return minio.BucketLookupAuto
}
func normalizeS3Endpoint(endpoint string) string {
endpoint = strings.TrimSpace(endpoint)
if parsed, err := url.Parse(endpoint); err == nil && parsed.Host != "" {
return parsed.Host
}
return strings.TrimPrefix(strings.TrimPrefix(endpoint, "https://"), "http://")
}