feat(uploads): add native resumable upload support
Implement a native chunked resumable upload API and frontend integration to support reliable large file uploads. Changes include: - Added a 3-step resumable upload API flow (create session, upload chunks, complete session). - Introduced configuration options for chunk size, retention hours, and toggling the feature. - Updated the frontend to utilize resumable uploads with progress tracking. - Configured temporary chunk storage under `data/tmp/uploads` with automatic cleanup. - Documented the API flow and configuration in the README.
This commit is contained in:
454
backend/libs/services/resumable.go
Normal file
454
backend/libs/services/resumable.go
Normal file
@@ -0,0 +1,454 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var resumableUploadsBucket = []byte("resumable_uploads")
|
||||
|
||||
const (
|
||||
ResumableStatusUploading = "uploading"
|
||||
ResumableStatusCompleted = "completed"
|
||||
ResumableStatusCancelled = "cancelled"
|
||||
)
|
||||
|
||||
type ResumableFileInput struct {
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
ContentType string `json:"contentType"`
|
||||
Fingerprint string `json:"fingerprint,omitempty"`
|
||||
}
|
||||
|
||||
type ResumableSession struct {
|
||||
ID string `json:"id"`
|
||||
Options UploadOptions `json:"options"`
|
||||
Files []ResumableFile `json:"files"`
|
||||
ChunkSize int64 `json:"chunkSize"`
|
||||
Status string `json:"status"`
|
||||
BoxID string `json:"boxId,omitempty"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
ExpiresAt time.Time `json:"expiresAt"`
|
||||
}
|
||||
|
||||
type ResumableFile struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Size int64 `json:"size"`
|
||||
ContentType string `json:"contentType"`
|
||||
Fingerprint string `json:"fingerprint,omitempty"`
|
||||
ChunkCount int `json:"chunkCount"`
|
||||
UploadedChunks []int `json:"uploadedChunks"`
|
||||
}
|
||||
|
||||
func (s *UploadService) ensureResumableBucket() error {
|
||||
return s.db.Update(func(tx *bbolt.Tx) error {
|
||||
_, err := tx.CreateBucketIfNotExists(resumableUploadsBucket)
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (s *UploadService) CreateResumableSession(files []ResumableFileInput, opts UploadOptions, chunkSize int64, retention time.Duration) (ResumableSession, error) {
|
||||
if len(files) == 0 {
|
||||
return ResumableSession{}, fmt.Errorf("no files were uploaded")
|
||||
}
|
||||
if chunkSize <= 0 {
|
||||
return ResumableSession{}, fmt.Errorf("chunk size must be positive")
|
||||
}
|
||||
if retention <= 0 {
|
||||
return ResumableSession{}, fmt.Errorf("retention must be positive")
|
||||
}
|
||||
if strings.TrimSpace(opts.Password) != "" {
|
||||
opts.PasswordSalt, opts.PasswordHash = hashPassword(opts.Password)
|
||||
opts.Password = ""
|
||||
}
|
||||
sessionFiles, err := s.resumableFilesFromInput(files, opts, chunkSize, nil)
|
||||
if err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
session := ResumableSession{
|
||||
ID: randomID(12),
|
||||
Options: opts,
|
||||
Files: sessionFiles,
|
||||
ChunkSize: chunkSize,
|
||||
Status: ResumableStatusUploading,
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
ExpiresAt: now.Add(retention),
|
||||
}
|
||||
if err := s.saveResumableSession(session); err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
return session, nil
|
||||
}
|
||||
|
||||
func (s *UploadService) AddResumableFiles(sessionID string, files []ResumableFileInput) (ResumableSession, error) {
|
||||
if len(files) == 0 {
|
||||
return s.GetResumableSession(sessionID)
|
||||
}
|
||||
session, err := s.GetResumableSession(sessionID)
|
||||
if err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
if err := resumableSessionWritable(session); err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
existing := make(map[string]bool)
|
||||
for _, file := range session.Files {
|
||||
existing[resumableFileKey(file.Name, file.Size, file.Fingerprint)] = true
|
||||
}
|
||||
newFiles, err := s.resumableFilesFromInput(files, session.Options, session.ChunkSize, existing)
|
||||
if err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
if len(newFiles) == 0 {
|
||||
return session, nil
|
||||
}
|
||||
session.Files = append(session.Files, newFiles...)
|
||||
session.UpdatedAt = time.Now().UTC()
|
||||
if err := s.saveResumableSession(session); err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
return session, nil
|
||||
}
|
||||
|
||||
func (s *UploadService) GetResumableSession(id string) (ResumableSession, error) {
|
||||
var session ResumableSession
|
||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(resumableUploadsBucket)
|
||||
if bucket == nil {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
data := bucket.Get([]byte(id))
|
||||
if data == nil {
|
||||
return os.ErrNotExist
|
||||
}
|
||||
return json.Unmarshal(data, &session)
|
||||
})
|
||||
if err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
return session, nil
|
||||
}
|
||||
|
||||
func (s *UploadService) PutResumableChunk(ctx context.Context, sessionID, fileID string, index int, body io.Reader) (ResumableSession, error) {
|
||||
session, err := s.GetResumableSession(sessionID)
|
||||
if err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
if err := resumableSessionWritable(session); err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
fileIndex := -1
|
||||
for i, file := range session.Files {
|
||||
if file.ID == fileID {
|
||||
fileIndex = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if fileIndex < 0 {
|
||||
return ResumableSession{}, os.ErrNotExist
|
||||
}
|
||||
file := session.Files[fileIndex]
|
||||
if index < 0 || index >= file.ChunkCount {
|
||||
return ResumableSession{}, fmt.Errorf("chunk index is invalid")
|
||||
}
|
||||
expectedSize := expectedChunkSize(file.Size, session.ChunkSize, index)
|
||||
chunkDir := s.resumableFileDir(session.ID, file.ID)
|
||||
if err := os.MkdirAll(chunkDir, 0o755); err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
chunkPath := s.resumableChunkPath(session.ID, file.ID, index)
|
||||
tempPath := chunkPath + ".tmp"
|
||||
target, err := os.OpenFile(tempPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
|
||||
if err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
written, copyErr := io.Copy(target, io.LimitReader(body, expectedSize+1))
|
||||
closeErr := target.Close()
|
||||
if copyErr != nil {
|
||||
_ = os.Remove(tempPath)
|
||||
return ResumableSession{}, copyErr
|
||||
}
|
||||
if closeErr != nil {
|
||||
_ = os.Remove(tempPath)
|
||||
return ResumableSession{}, closeErr
|
||||
}
|
||||
if written != expectedSize {
|
||||
_ = os.Remove(tempPath)
|
||||
return ResumableSession{}, fmt.Errorf("chunk size mismatch")
|
||||
}
|
||||
if err := os.Rename(tempPath, chunkPath); err != nil {
|
||||
_ = os.Remove(tempPath)
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
session.Files[fileIndex].UploadedChunks = addChunkIndex(session.Files[fileIndex].UploadedChunks, index)
|
||||
session.UpdatedAt = time.Now().UTC()
|
||||
if err := s.saveResumableSession(session); err != nil {
|
||||
return ResumableSession{}, err
|
||||
}
|
||||
return session, nil
|
||||
}
|
||||
|
||||
func (s *UploadService) CompleteResumableSession(ctx context.Context, sessionID string) (UploadResult, ResumableSession, error) {
|
||||
session, err := s.GetResumableSession(sessionID)
|
||||
if err != nil {
|
||||
return UploadResult{}, ResumableSession{}, err
|
||||
}
|
||||
if err := resumableSessionWritable(session); err != nil {
|
||||
return UploadResult{}, ResumableSession{}, err
|
||||
}
|
||||
staged, cleanup, err := s.assembleResumableFiles(ctx, session)
|
||||
if err != nil {
|
||||
return UploadResult{}, ResumableSession{}, err
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
result, err := s.CreateBoxFromIncoming(staged, session.Options)
|
||||
if err != nil {
|
||||
return UploadResult{}, ResumableSession{}, err
|
||||
}
|
||||
if err := os.RemoveAll(s.resumableSessionDir(session.ID)); err != nil {
|
||||
return UploadResult{}, ResumableSession{}, err
|
||||
}
|
||||
session.Status = ResumableStatusCompleted
|
||||
session.BoxID = result.BoxID
|
||||
session.UpdatedAt = time.Now().UTC()
|
||||
if err := s.saveResumableSession(session); err != nil {
|
||||
return UploadResult{}, ResumableSession{}, err
|
||||
}
|
||||
return result, session, nil
|
||||
}
|
||||
|
||||
func (s *UploadService) CancelResumableSession(sessionID string) error {
|
||||
session, err := s.GetResumableSession(sessionID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
session.Status = ResumableStatusCancelled
|
||||
session.UpdatedAt = time.Now().UTC()
|
||||
if err := s.saveResumableSession(session); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.RemoveAll(s.resumableSessionDir(session.ID))
|
||||
}
|
||||
|
||||
func (s *UploadService) CleanupExpiredResumableSessions(now time.Time) (int, error) {
|
||||
candidates := make([]ResumableSession, 0)
|
||||
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(resumableUploadsBucket)
|
||||
if bucket == nil {
|
||||
return nil
|
||||
}
|
||||
return bucket.ForEach(func(_, value []byte) error {
|
||||
var session ResumableSession
|
||||
if err := json.Unmarshal(value, &session); err != nil {
|
||||
return err
|
||||
}
|
||||
if !session.ExpiresAt.After(now) || session.Status != ResumableStatusUploading {
|
||||
candidates = append(candidates, session)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
for _, session := range candidates {
|
||||
if err := os.RemoveAll(s.resumableSessionDir(session.ID)); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
err = s.db.Update(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(resumableUploadsBucket)
|
||||
if bucket == nil {
|
||||
return nil
|
||||
}
|
||||
for _, session := range candidates {
|
||||
if err := bucket.Delete([]byte(session.ID)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return len(candidates), err
|
||||
}
|
||||
|
||||
func (s *UploadService) saveResumableSession(session ResumableSession) error {
|
||||
if err := s.ensureResumableBucket(); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.db.Update(func(tx *bbolt.Tx) error {
|
||||
data, err := json.Marshal(session)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return tx.Bucket(resumableUploadsBucket).Put([]byte(session.ID), data)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *UploadService) resumableFilesFromInput(files []ResumableFileInput, opts UploadOptions, chunkSize int64, existing map[string]bool) ([]ResumableFile, error) {
|
||||
sessionFiles := make([]ResumableFile, 0, len(files))
|
||||
for _, file := range files {
|
||||
file.Name = filepath.Base(strings.TrimSpace(file.Name))
|
||||
if file.Name == "." || file.Name == "" {
|
||||
return nil, fmt.Errorf("file name is required")
|
||||
}
|
||||
if file.Size < 0 {
|
||||
return nil, fmt.Errorf("file size is invalid")
|
||||
}
|
||||
fingerprint := strings.TrimSpace(file.Fingerprint)
|
||||
key := resumableFileKey(file.Name, file.Size, fingerprint)
|
||||
if existing != nil && existing[key] {
|
||||
continue
|
||||
}
|
||||
if !opts.SkipSizeLimit {
|
||||
if err := s.ValidateSize(file.Size); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
chunks := int((file.Size + chunkSize - 1) / chunkSize)
|
||||
if chunks == 0 {
|
||||
chunks = 1
|
||||
}
|
||||
sessionFiles = append(sessionFiles, ResumableFile{
|
||||
ID: randomID(8),
|
||||
Name: file.Name,
|
||||
Size: file.Size,
|
||||
ContentType: strings.TrimSpace(file.ContentType),
|
||||
Fingerprint: fingerprint,
|
||||
ChunkCount: chunks,
|
||||
})
|
||||
if existing != nil {
|
||||
existing[key] = true
|
||||
}
|
||||
}
|
||||
return sessionFiles, nil
|
||||
}
|
||||
|
||||
func resumableFileKey(name string, size int64, fingerprint string) string {
|
||||
return strings.TrimSpace(fingerprint) + "|" + filepath.Base(strings.TrimSpace(name)) + "|" + fmt.Sprintf("%d", size)
|
||||
}
|
||||
|
||||
func (s *UploadService) assembleResumableFiles(ctx context.Context, session ResumableSession) ([]IncomingFile, func(), error) {
|
||||
assembledDir := filepath.Join(s.resumableSessionDir(session.ID), "assembled")
|
||||
if err := os.MkdirAll(assembledDir, 0o755); err != nil {
|
||||
return nil, func() {}, err
|
||||
}
|
||||
cleanup := func() {
|
||||
_ = os.RemoveAll(assembledDir)
|
||||
}
|
||||
staged := make([]IncomingFile, 0, len(session.Files))
|
||||
for _, file := range session.Files {
|
||||
if len(file.UploadedChunks) != file.ChunkCount {
|
||||
cleanup()
|
||||
return nil, func() {}, fmt.Errorf("file %s is missing chunks", file.Name)
|
||||
}
|
||||
assembledPath := filepath.Join(assembledDir, file.ID)
|
||||
target, err := os.OpenFile(assembledPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
|
||||
if err != nil {
|
||||
cleanup()
|
||||
return nil, func() {}, err
|
||||
}
|
||||
var written int64
|
||||
for i := 0; i < file.ChunkCount; i++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
_ = target.Close()
|
||||
cleanup()
|
||||
return nil, func() {}, ctx.Err()
|
||||
default:
|
||||
}
|
||||
chunk, err := os.Open(s.resumableChunkPath(session.ID, file.ID, i))
|
||||
if err != nil {
|
||||
_ = target.Close()
|
||||
cleanup()
|
||||
return nil, func() {}, fmt.Errorf("file %s is missing chunks", file.Name)
|
||||
}
|
||||
n, copyErr := io.Copy(target, chunk)
|
||||
closeErr := chunk.Close()
|
||||
if copyErr != nil {
|
||||
_ = target.Close()
|
||||
cleanup()
|
||||
return nil, func() {}, copyErr
|
||||
}
|
||||
if closeErr != nil {
|
||||
_ = target.Close()
|
||||
cleanup()
|
||||
return nil, func() {}, closeErr
|
||||
}
|
||||
written += n
|
||||
}
|
||||
if err := target.Close(); err != nil {
|
||||
cleanup()
|
||||
return nil, func() {}, err
|
||||
}
|
||||
if written != file.Size {
|
||||
cleanup()
|
||||
return nil, func() {}, fmt.Errorf("assembled file size mismatch")
|
||||
}
|
||||
staged = append(staged, StagedUploadFile{
|
||||
Filename: file.Name,
|
||||
FileSize: file.Size,
|
||||
MIMEType: file.ContentType,
|
||||
Path: assembledPath,
|
||||
})
|
||||
}
|
||||
return staged, cleanup, nil
|
||||
}
|
||||
|
||||
func resumableSessionWritable(session ResumableSession) error {
|
||||
if session.Status != ResumableStatusUploading {
|
||||
return fmt.Errorf("upload session is not active")
|
||||
}
|
||||
if !session.ExpiresAt.After(time.Now().UTC()) {
|
||||
return fmt.Errorf("upload session expired")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func expectedChunkSize(fileSize, chunkSize int64, index int) int64 {
|
||||
offset := int64(index) * chunkSize
|
||||
remaining := fileSize - offset
|
||||
if remaining < 0 {
|
||||
return 0
|
||||
}
|
||||
if remaining > chunkSize {
|
||||
return chunkSize
|
||||
}
|
||||
return remaining
|
||||
}
|
||||
|
||||
func addChunkIndex(chunks []int, index int) []int {
|
||||
for _, chunk := range chunks {
|
||||
if chunk == index {
|
||||
return chunks
|
||||
}
|
||||
}
|
||||
chunks = append(chunks, index)
|
||||
sort.Ints(chunks)
|
||||
return chunks
|
||||
}
|
||||
|
||||
func (s *UploadService) resumableSessionDir(sessionID string) string {
|
||||
return filepath.Join(s.dataDir, "tmp", "uploads", sessionID)
|
||||
}
|
||||
|
||||
func (s *UploadService) resumableFileDir(sessionID, fileID string) string {
|
||||
return filepath.Join(s.resumableSessionDir(sessionID), fileID)
|
||||
}
|
||||
|
||||
func (s *UploadService) resumableChunkPath(sessionID, fileID string, index int) string {
|
||||
return filepath.Join(s.resumableFileDir(sessionID, fileID), fmt.Sprintf("%06d.part", index))
|
||||
}
|
||||
Reference in New Issue
Block a user