feat(upload): add resumable chunk configuration and file validation
Some checks failed
Build and Publish Docker Image / deploy (push) Failing after 56s

- Add `WARPBOX_RESUMABLE_CHUNK_MODE` and `WARPBOX_RESUMABLE_CHUNK_PATH` environment variables to configure temporary chunk storage.
- Implement strict file validation for resuming uploads to ensure selected files match the pending session's metadata.
- Add `PLANS.md` to document development stages, roadmap, and API specifications (including batching and resumable flows).
This commit is contained in:
2026-06-02 22:13:54 +03:00
parent 5cd476e7f3
commit 313c89483c
22 changed files with 1809 additions and 324 deletions

View File

@@ -2,6 +2,9 @@ package services
import (
"context"
"crypto/sha256"
"crypto/subtle"
"encoding/hex"
"encoding/json"
"fmt"
"io"
@@ -17,9 +20,10 @@ import (
var resumableUploadsBucket = []byte("resumable_uploads")
const (
ResumableStatusUploading = "uploading"
ResumableStatusCompleted = "completed"
ResumableStatusCancelled = "cancelled"
ResumableStatusUploading = "uploading"
ResumableStatusProcessing = "processing"
ResumableStatusCompleted = "completed"
ResumableStatusCancelled = "cancelled"
)
type ResumableFileInput struct {
@@ -30,15 +34,18 @@ type ResumableFileInput struct {
}
type ResumableSession struct {
ID string `json:"id"`
Options UploadOptions `json:"options"`
Files []ResumableFile `json:"files"`
ChunkSize int64 `json:"chunkSize"`
Status string `json:"status"`
BoxID string `json:"boxId,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
ExpiresAt time.Time `json:"expiresAt"`
ID string `json:"id"`
Options UploadOptions `json:"options"`
Files []ResumableFile `json:"files"`
ChunkSize int64 `json:"chunkSize"`
Status string `json:"status"`
BoxID string `json:"boxId,omitempty"`
ResumeTokenHash string `json:"resumeTokenHash,omitempty"`
ResumeToken string `json:"-"`
ChunkRoot string `json:"chunkRoot,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
ExpiresAt time.Time `json:"expiresAt"`
}
type ResumableFile struct {
@@ -58,7 +65,7 @@ func (s *UploadService) ensureResumableBucket() error {
})
}
func (s *UploadService) CreateResumableSession(files []ResumableFileInput, opts UploadOptions, chunkSize int64, retention time.Duration) (ResumableSession, error) {
func (s *UploadService) CreateResumableSession(files []ResumableFileInput, opts UploadOptions, chunkSize int64, retention time.Duration, chunkRoot string) (ResumableSession, error) {
if len(files) == 0 {
return ResumableSession{}, fmt.Errorf("no files were uploaded")
}
@@ -77,15 +84,20 @@ func (s *UploadService) CreateResumableSession(files []ResumableFileInput, opts
return ResumableSession{}, err
}
now := time.Now().UTC()
resumeToken := randomID(32)
sessionID := randomID(12)
session := ResumableSession{
ID: randomID(12),
Options: opts,
Files: sessionFiles,
ChunkSize: chunkSize,
Status: ResumableStatusUploading,
CreatedAt: now,
UpdatedAt: now,
ExpiresAt: now.Add(retention),
ID: sessionID,
Options: opts,
Files: sessionFiles,
ChunkSize: chunkSize,
Status: ResumableStatusUploading,
ResumeTokenHash: resumableTokenHash(sessionID, resumeToken),
ResumeToken: resumeToken,
ChunkRoot: strings.TrimSpace(chunkRoot),
CreatedAt: now,
UpdatedAt: now,
ExpiresAt: now.Add(retention),
}
if err := s.saveResumableSession(session); err != nil {
return ResumableSession{}, err
@@ -93,6 +105,14 @@ func (s *UploadService) CreateResumableSession(files []ResumableFileInput, opts
return session, nil
}
func (s *UploadService) VerifyResumableToken(session ResumableSession, token string) bool {
if session.ResumeTokenHash == "" || strings.TrimSpace(token) == "" {
return false
}
hash := resumableTokenHash(session.ID, token)
return subtle.ConstantTimeCompare([]byte(hash), []byte(session.ResumeTokenHash)) == 1
}
func (s *UploadService) AddResumableFiles(sessionID string, files []ResumableFileInput) (ResumableSession, error) {
if len(files) == 0 {
return s.GetResumableSession(sessionID)
@@ -165,11 +185,11 @@ func (s *UploadService) PutResumableChunk(ctx context.Context, sessionID, fileID
return ResumableSession{}, fmt.Errorf("chunk index is invalid")
}
expectedSize := expectedChunkSize(file.Size, session.ChunkSize, index)
chunkDir := s.resumableFileDir(session.ID, file.ID)
chunkDir := s.resumableFileDirFor(session, file.ID)
if err := os.MkdirAll(chunkDir, 0o755); err != nil {
return ResumableSession{}, err
}
chunkPath := s.resumableChunkPath(session.ID, file.ID, index)
chunkPath := s.resumableChunkPathFor(session, file.ID, index)
tempPath := chunkPath + ".tmp"
target, err := os.OpenFile(tempPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
if err != nil {
@@ -206,20 +226,26 @@ func (s *UploadService) CompleteResumableSession(ctx context.Context, sessionID
if err != nil {
return UploadResult{}, ResumableSession{}, err
}
if (session.Status == ResumableStatusCompleted || session.Status == ResumableStatusProcessing) && session.BoxID != "" {
box, err := s.GetBox(session.BoxID)
if err != nil {
return UploadResult{}, ResumableSession{}, err
}
return s.resultForBox(box, ""), session, nil
}
if err := resumableSessionWritable(session); err != nil {
return UploadResult{}, ResumableSession{}, err
}
staged, cleanup, err := s.assembleResumableFiles(ctx, session)
staged, err := s.resumableIncomingFiles(session)
if err != nil {
return UploadResult{}, ResumableSession{}, err
}
defer cleanup()
result, err := s.CreateBoxFromIncoming(staged, session.Options)
result, err := s.CreateBoxFromIncomingContext(ctx, staged, session.Options)
if err != nil {
return UploadResult{}, ResumableSession{}, err
}
if err := os.RemoveAll(s.resumableSessionDir(session.ID)); err != nil {
if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil {
return UploadResult{}, ResumableSession{}, err
}
session.Status = ResumableStatusCompleted
@@ -231,17 +257,205 @@ func (s *UploadService) CompleteResumableSession(ctx context.Context, sessionID
return result, session, nil
}
func (s *UploadService) CreateProcessingBoxFromResumable(sessionID string) (UploadResult, ResumableSession, error) {
session, err := s.GetResumableSession(sessionID)
if err != nil {
return UploadResult{}, ResumableSession{}, err
}
if (session.Status == ResumableStatusCompleted || session.Status == ResumableStatusProcessing) && session.BoxID != "" {
box, err := s.GetBox(session.BoxID)
if err != nil {
return UploadResult{}, ResumableSession{}, err
}
return s.resultForBox(box, ""), session, nil
}
if err := resumableSessionWritable(session); err != nil {
return UploadResult{}, ResumableSession{}, err
}
if _, err := s.resumableIncomingFiles(session); err != nil {
return UploadResult{}, ResumableSession{}, err
}
now := time.Now().UTC()
expiresAt := now.AddDate(0, 0, 7)
if session.Options.ExpiresInMinutes < 0 || session.Options.MaxDays < 0 {
expiresAt = now.AddDate(100, 0, 0)
} else if session.Options.ExpiresInMinutes > 0 {
expiresAt = now.Add(time.Duration(session.Options.ExpiresInMinutes) * time.Minute)
} else if session.Options.MaxDays > 0 {
expiresAt = now.Add(time.Duration(session.Options.MaxDays) * 24 * time.Hour)
}
box := Box{
ID: randomID(10),
OwnerID: strings.TrimSpace(session.Options.OwnerID),
CollectionID: strings.TrimSpace(session.Options.CollectionID),
CreatorIP: strings.TrimSpace(session.Options.CreatorIP),
StorageBackendID: normalizeBackendID(session.Options.StorageBackendID),
CreatedAt: now,
ExpiresAt: expiresAt,
MaxDownloads: session.Options.MaxDownloads,
Obfuscate: session.Options.ObfuscateMetadata && (strings.TrimSpace(session.Options.Password) != "" || strings.TrimSpace(session.Options.PasswordHash) != ""),
Files: make([]File, 0, len(session.Files)),
}
deleteToken := randomID(32)
box.DeleteTokenHash = deleteTokenHash(box.ID, deleteToken)
if strings.TrimSpace(session.Options.PasswordHash) != "" {
box.PasswordSalt = session.Options.PasswordSalt
box.PasswordHash = session.Options.PasswordHash
} else if strings.TrimSpace(session.Options.Password) != "" {
salt, hash := hashPassword(session.Options.Password)
box.PasswordSalt = salt
box.PasswordHash = hash
}
for _, incoming := range session.Files {
fileID := randomID(8)
storedName := "@each@" + fileID + strings.ToLower(filepath.Ext(incoming.Name))
objectKey := boxObjectKey(box.ID, storedName)
contentType := incoming.ContentType
if contentType == "" {
contentType = "application/octet-stream"
}
box.Files = append(box.Files, File{
ID: fileID,
Name: filepath.Base(incoming.Name),
StoredName: storedName,
Size: incoming.Size,
ContentType: contentType,
PreviewKind: previewKind(contentType),
ObjectKey: objectKey,
Processing: true,
UploadedAt: now,
})
}
if err := s.saveBoxRecord(box); err != nil {
return UploadResult{}, ResumableSession{}, err
}
session.Status = ResumableStatusProcessing
session.BoxID = box.ID
session.UpdatedAt = time.Now().UTC()
if err := s.saveResumableSession(session); err != nil {
return UploadResult{}, ResumableSession{}, err
}
return s.resultForBox(box, deleteToken), session, nil
}
func (s *UploadService) FinalizeProcessingResumableSession(ctx context.Context, sessionID string) (UploadResult, error) {
session, err := s.GetResumableSession(sessionID)
if err != nil {
return UploadResult{}, err
}
if session.Status == ResumableStatusCompleted && session.BoxID != "" {
box, err := s.GetBox(session.BoxID)
if err != nil {
return UploadResult{}, err
}
return s.resultForBox(box, ""), nil
}
if session.Status != ResumableStatusProcessing || session.BoxID == "" {
return UploadResult{}, fmt.Errorf("upload session is not processing")
}
box, err := s.GetBox(session.BoxID)
if err != nil {
return UploadResult{}, err
}
staged, err := s.resumableIncomingFiles(session)
if err != nil {
return UploadResult{}, err
}
if len(staged) != len(box.Files) {
return UploadResult{}, fmt.Errorf("processing file count mismatch")
}
backend, err := s.storage.Backend(box.StorageBackendID)
if err != nil {
return UploadResult{}, err
}
for i, incoming := range staged {
source, err := incoming.Open()
if err != nil {
return UploadResult{}, err
}
file := box.Files[i]
if err := s.writeUploadedObject(ctx, backend, file.ObjectKey, source, incoming.Size(), 0, incoming.ContentType()); err != nil {
source.Close()
_ = backend.Delete(context.Background(), file.ObjectKey)
box.Files[i].ProcessingError = err.Error()
_ = s.saveBoxRecord(box)
return UploadResult{}, err
}
source.Close()
box.Files[i].Processing = false
box.Files[i].ProcessingError = ""
box.Files[i].UploadedAt = time.Now().UTC()
if err := s.saveBoxRecord(box); err != nil {
return UploadResult{}, err
}
}
if err := s.writeBoxMetadata(box); err != nil {
s.logger.Warn("box metadata write failed after resumable processing", "source", "storage", "severity", "warn", "code", 4020, "box_id", box.ID, "error", err.Error())
}
if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil {
return UploadResult{}, err
}
session.Status = ResumableStatusCompleted
session.UpdatedAt = time.Now().UTC()
if err := s.saveResumableSession(session); err != nil {
return UploadResult{}, err
}
return s.resultForBox(box, ""), nil
}
func (s *UploadService) CompleteUploadedResumableSession(ctx context.Context, sessionID string) (UploadResult, ResumableSession, error) {
session, err := s.GetResumableSession(sessionID)
if err != nil {
return UploadResult{}, ResumableSession{}, err
}
if err := resumableSessionWritable(session); err != nil {
return UploadResult{}, ResumableSession{}, err
}
completeFiles := make([]ResumableFile, 0, len(session.Files))
for _, file := range session.Files {
if resumableFileComplete(file) {
completeFiles = append(completeFiles, file)
}
}
if len(completeFiles) == 0 {
return UploadResult{}, ResumableSession{}, fmt.Errorf("no fully uploaded files to finish")
}
partial := session
partial.Files = completeFiles
staged, err := s.resumableIncomingFiles(partial)
if err != nil {
return UploadResult{}, ResumableSession{}, err
}
result, err := s.CreateBoxFromIncomingContext(ctx, staged, session.Options)
if err != nil {
return UploadResult{}, ResumableSession{}, err
}
if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil {
return UploadResult{}, ResumableSession{}, err
}
session.Status = ResumableStatusCompleted
session.BoxID = result.BoxID
session.Files = completeFiles
session.UpdatedAt = time.Now().UTC()
if err := s.deleteResumableSession(session.ID); err != nil {
return UploadResult{}, ResumableSession{}, err
}
return result, session, nil
}
func (s *UploadService) CancelResumableSession(sessionID string) error {
session, err := s.GetResumableSession(sessionID)
if err != nil {
return err
}
session.Status = ResumableStatusCancelled
session.UpdatedAt = time.Now().UTC()
if err := s.saveResumableSession(session); err != nil {
if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil {
return err
}
return os.RemoveAll(s.resumableSessionDir(session.ID))
return s.deleteResumableSession(session.ID)
}
func (s *UploadService) CleanupExpiredResumableSessions(now time.Time) (int, error) {
@@ -256,7 +470,9 @@ func (s *UploadService) CleanupExpiredResumableSessions(now time.Time) (int, err
if err := json.Unmarshal(value, &session); err != nil {
return err
}
if !session.ExpiresAt.After(now) || session.Status != ResumableStatusUploading {
if session.Status == ResumableStatusCompleted ||
session.Status == ResumableStatusCancelled ||
(session.Status == ResumableStatusUploading && !session.ExpiresAt.After(now)) {
candidates = append(candidates, session)
}
return nil
@@ -266,7 +482,7 @@ func (s *UploadService) CleanupExpiredResumableSessions(now time.Time) (int, err
return 0, err
}
for _, session := range candidates {
if err := os.RemoveAll(s.resumableSessionDir(session.ID)); err != nil {
if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil {
return 0, err
}
}
@@ -285,6 +501,16 @@ func (s *UploadService) CleanupExpiredResumableSessions(now time.Time) (int, err
return len(candidates), err
}
func (s *UploadService) deleteResumableSession(sessionID string) error {
return s.db.Update(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(resumableUploadsBucket)
if bucket == nil {
return nil
}
return bucket.Delete([]byte(sessionID))
})
}
func (s *UploadService) saveResumableSession(session ResumableSession) error {
if err := s.ensureResumableBucket(); err != nil {
return err
@@ -341,71 +567,102 @@ func resumableFileKey(name string, size int64, fingerprint string) string {
return strings.TrimSpace(fingerprint) + "|" + filepath.Base(strings.TrimSpace(name)) + "|" + fmt.Sprintf("%d", size)
}
func (s *UploadService) assembleResumableFiles(ctx context.Context, session ResumableSession) ([]IncomingFile, func(), error) {
assembledDir := filepath.Join(s.resumableSessionDir(session.ID), "assembled")
if err := os.MkdirAll(assembledDir, 0o755); err != nil {
return nil, func() {}, err
type resumableIncomingFile struct {
service *UploadService
session ResumableSession
file ResumableFile
}
func (f resumableIncomingFile) Name() string {
return f.file.Name
}
func (f resumableIncomingFile) Size() int64 {
return f.file.Size
}
func (f resumableIncomingFile) ContentType() string {
return f.file.ContentType
}
func (f resumableIncomingFile) Open() (io.ReadCloser, error) {
return &resumableChunkReader{
service: f.service,
session: f.session,
file: f.file,
}, nil
}
type resumableChunkReader struct {
service *UploadService
session ResumableSession
file ResumableFile
index int
current *os.File
}
func (r *resumableChunkReader) Read(p []byte) (int, error) {
for {
if r.current == nil {
if r.index >= r.file.ChunkCount {
return 0, io.EOF
}
chunk, err := os.Open(r.service.resumableChunkPathFor(r.session, r.file.ID, r.index))
if err != nil {
return 0, err
}
r.current = chunk
}
n, err := r.current.Read(p)
if err == io.EOF {
if closeErr := r.current.Close(); closeErr != nil {
r.current = nil
return n, closeErr
}
r.current = nil
r.index++
if n > 0 {
return n, nil
}
continue
}
return n, err
}
cleanup := func() {
_ = os.RemoveAll(assembledDir)
}
func (r *resumableChunkReader) Close() error {
if r.current == nil {
return nil
}
err := r.current.Close()
r.current = nil
return err
}
func (s *UploadService) resumableIncomingFiles(session ResumableSession) ([]IncomingFile, error) {
staged := make([]IncomingFile, 0, len(session.Files))
for _, file := range session.Files {
if len(file.UploadedChunks) != file.ChunkCount {
cleanup()
return nil, func() {}, fmt.Errorf("file %s is missing chunks", file.Name)
}
assembledPath := filepath.Join(assembledDir, file.ID)
target, err := os.OpenFile(assembledPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
if err != nil {
cleanup()
return nil, func() {}, err
return nil, fmt.Errorf("file %s is missing chunks", file.Name)
}
var written int64
for i := 0; i < file.ChunkCount; i++ {
select {
case <-ctx.Done():
_ = target.Close()
cleanup()
return nil, func() {}, ctx.Err()
default:
}
chunk, err := os.Open(s.resumableChunkPath(session.ID, file.ID, i))
info, err := os.Stat(s.resumableChunkPathFor(session, file.ID, i))
if err != nil {
_ = target.Close()
cleanup()
return nil, func() {}, fmt.Errorf("file %s is missing chunks", file.Name)
return nil, fmt.Errorf("file %s is missing chunks", file.Name)
}
n, copyErr := io.Copy(target, chunk)
closeErr := chunk.Close()
if copyErr != nil {
_ = target.Close()
cleanup()
return nil, func() {}, copyErr
}
if closeErr != nil {
_ = target.Close()
cleanup()
return nil, func() {}, closeErr
}
written += n
}
if err := target.Close(); err != nil {
cleanup()
return nil, func() {}, err
written += info.Size()
}
if written != file.Size {
cleanup()
return nil, func() {}, fmt.Errorf("assembled file size mismatch")
return nil, fmt.Errorf("chunk size total mismatch")
}
staged = append(staged, StagedUploadFile{
Filename: file.Name,
FileSize: file.Size,
MIMEType: file.ContentType,
Path: assembledPath,
staged = append(staged, resumableIncomingFile{
service: s,
session: session,
file: file,
})
}
return staged, cleanup, nil
return staged, nil
}
func resumableSessionWritable(session ResumableSession) error {
@@ -418,6 +675,10 @@ func resumableSessionWritable(session ResumableSession) error {
return nil
}
func resumableFileComplete(file ResumableFile) bool {
return file.ChunkCount > 0 && len(file.UploadedChunks) == file.ChunkCount
}
func expectedChunkSize(fileSize, chunkSize int64, index int) int64 {
offset := int64(index) * chunkSize
remaining := fileSize - offset
@@ -441,14 +702,34 @@ func addChunkIndex(chunks []int, index int) []int {
return chunks
}
func resumableTokenHash(sessionID, token string) string {
sum := sha256.Sum256([]byte("warpbox-resumable:" + sessionID + ":" + token))
return hex.EncodeToString(sum[:])
}
func (s *UploadService) resumableSessionDir(sessionID string) string {
return filepath.Join(s.dataDir, "tmp", "uploads", sessionID)
}
func (s *UploadService) resumableSessionDirFor(session ResumableSession) string {
if strings.TrimSpace(session.ChunkRoot) != "" {
return filepath.Join(session.ChunkRoot, session.ID)
}
return s.resumableSessionDir(session.ID)
}
func (s *UploadService) resumableFileDir(sessionID, fileID string) string {
return filepath.Join(s.resumableSessionDir(sessionID), fileID)
}
func (s *UploadService) resumableFileDirFor(session ResumableSession, fileID string) string {
return filepath.Join(s.resumableSessionDirFor(session), fileID)
}
func (s *UploadService) resumableChunkPath(sessionID, fileID string, index int) string {
return filepath.Join(s.resumableFileDir(sessionID, fileID), fmt.Sprintf("%06d.part", index))
}
func (s *UploadService) resumableChunkPathFor(session ResumableSession, fileID string, index int) string {
return filepath.Join(s.resumableFileDirFor(session, fileID), fmt.Sprintf("%06d.part", index))
}

View File

@@ -37,6 +37,11 @@ type UploadPolicySettings struct {
ShortWindowSeconds int `json:"shortWindowSeconds"`
AnonymousStorageBackend string `json:"anonymousStorageBackend"`
UserStorageBackend string `json:"userStorageBackend"`
ResumableUploadsEnabled bool `json:"resumableUploadsEnabled"`
ResumableChunkSizeMB float64 `json:"resumableChunkSizeMb"`
ResumableRetentionHours int `json:"resumableRetentionHours"`
ResumableChunkMode string `json:"resumableChunkMode"`
ResumableChunkPath string `json:"resumableChunkPath"`
}
type UsageRecord struct {
@@ -89,6 +94,11 @@ func NewSettingsService(db *bbolt.DB, defaults config.SettingsDefaults) (*Settin
ShortWindowSeconds: defaults.ShortWindowSeconds,
AnonymousStorageBackend: defaults.AnonymousStorageBackend,
UserStorageBackend: defaults.UserStorageBackend,
ResumableUploadsEnabled: defaults.ResumableUploadsEnabled,
ResumableChunkSizeMB: defaults.ResumableChunkSizeMB,
ResumableRetentionHours: defaults.ResumableRetentionHours,
ResumableChunkMode: defaults.ResumableChunkMode,
ResumableChunkPath: defaults.ResumableChunkPath,
},
}
service.defaults = service.withBuiltinDefaultGaps(service.defaults)
@@ -143,6 +153,15 @@ func (s *SettingsService) withBuiltinDefaultGaps(settings UploadPolicySettings)
if strings.TrimSpace(settings.UserStorageBackend) == "" {
settings.UserStorageBackend = StorageBackendLocal
}
if settings.ResumableChunkSizeMB <= 0 {
settings.ResumableChunkSizeMB = 8
}
if settings.ResumableRetentionHours <= 0 {
settings.ResumableRetentionHours = 24
}
if strings.TrimSpace(settings.ResumableChunkMode) == "" {
settings.ResumableChunkMode = "same"
}
return settings
}
@@ -156,6 +175,13 @@ func (s *SettingsService) UploadPolicy() (UploadPolicySettings, error) {
if err := json.Unmarshal(data, &settings); err != nil {
return err
}
var raw map[string]json.RawMessage
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
if _, ok := raw["resumableUploadsEnabled"]; !ok {
settings.ResumableUploadsEnabled = s.defaults.ResumableUploadsEnabled
}
settings = s.withDefaultGaps(settings)
return nil
})
@@ -217,6 +243,15 @@ func (s *SettingsService) withDefaultGaps(settings UploadPolicySettings) UploadP
if strings.TrimSpace(settings.UserStorageBackend) == "" {
settings.UserStorageBackend = s.defaults.UserStorageBackend
}
if settings.ResumableChunkSizeMB <= 0 {
settings.ResumableChunkSizeMB = s.defaults.ResumableChunkSizeMB
}
if settings.ResumableRetentionHours <= 0 {
settings.ResumableRetentionHours = s.defaults.ResumableRetentionHours
}
if strings.TrimSpace(settings.ResumableChunkMode) == "" {
settings.ResumableChunkMode = s.defaults.ResumableChunkMode
}
return settings
}
@@ -422,6 +457,18 @@ func (s *SettingsService) validate(settings UploadPolicySettings) error {
if settings.ShortWindowRequests <= 0 || settings.ShortWindowSeconds <= 0 {
return fmt.Errorf("short-window rate limits must be positive")
}
if settings.ResumableChunkSizeMB <= 0 {
return fmt.Errorf("resumable chunk size must be positive")
}
if settings.ResumableRetentionHours <= 0 {
return fmt.Errorf("resumable retention must be positive")
}
if settings.ResumableChunkMode != "same" && settings.ResumableChunkMode != "custom" {
return fmt.Errorf("resumable chunk storage mode is invalid")
}
if settings.ResumableChunkMode == "custom" && strings.TrimSpace(settings.ResumableChunkPath) == "" {
return fmt.Errorf("custom resumable chunk path is required")
}
return nil
}

View File

@@ -130,6 +130,8 @@ type File struct {
Thumbnail string `json:"thumbnail,omitempty"`
ObjectKey string `json:"objectKey,omitempty"`
ThumbnailObjectKey string `json:"thumbnailObjectKey,omitempty"`
Processing bool `json:"processing,omitempty"`
ProcessingError string `json:"processingError,omitempty"`
UploadedAt time.Time `json:"uploadedAt"`
}
@@ -150,6 +152,7 @@ type ResultFile struct {
Size string `json:"size"`
URL string `json:"url"`
ThumbnailURL string `json:"thumbnailUrl"`
Processing bool `json:"processing,omitempty"`
}
type AdminStats struct {
@@ -254,6 +257,10 @@ func (s *UploadService) CreateBox(files []*multipart.FileHeader, opts UploadOpti
}
func (s *UploadService) CreateBoxFromIncoming(files []IncomingFile, opts UploadOptions) (UploadResult, error) {
return s.CreateBoxFromIncomingContext(context.Background(), files, opts)
}
func (s *UploadService) CreateBoxFromIncomingContext(ctx context.Context, files []IncomingFile, opts UploadOptions) (UploadResult, error) {
if len(files) == 0 {
return UploadResult{}, fmt.Errorf("no files were uploaded")
}
@@ -297,7 +304,7 @@ func (s *UploadService) CreateBoxFromIncoming(files []IncomingFile, opts UploadO
box.PasswordHash = hash
}
if err := s.writeIncomingFilesToBox(&box, files, opts); err != nil {
if err := s.writeIncomingFilesToBox(ctx, &box, files, opts); err != nil {
return UploadResult{}, err
}
@@ -331,7 +338,7 @@ func (s *UploadService) AppendIncomingFiles(boxID string, files []IncomingFile,
if err != nil {
return UploadResult{}, err
}
if err := s.writeIncomingFilesToBox(&box, files, opts); err != nil {
if err := s.writeIncomingFilesToBox(context.Background(), &box, files, opts); err != nil {
return UploadResult{}, err
}
if err := s.SaveBox(box); err != nil {
@@ -352,7 +359,7 @@ func (s *UploadService) AppendIncomingFiles(boxID string, files []IncomingFile,
// appends the file metadata to box.Files. The box's StorageBackendID determines
// where files land, so it works for both new and existing boxes.
func (s *UploadService) writeFilesToBox(box *Box, files []*multipart.FileHeader, opts UploadOptions) error {
return s.writeIncomingFilesToBox(box, multipartIncomingFiles(files), opts)
return s.writeIncomingFilesToBox(context.Background(), box, multipartIncomingFiles(files), opts)
}
func multipartIncomingFiles(files []*multipart.FileHeader) []IncomingFile {
@@ -363,7 +370,7 @@ func multipartIncomingFiles(files []*multipart.FileHeader) []IncomingFile {
return incoming
}
func (s *UploadService) writeIncomingFilesToBox(box *Box, files []IncomingFile, opts UploadOptions) error {
func (s *UploadService) writeIncomingFilesToBox(ctx context.Context, box *Box, files []IncomingFile, opts UploadOptions) error {
backend, err := s.storage.Backend(box.StorageBackendID)
if err != nil {
return err
@@ -399,8 +406,9 @@ func (s *UploadService) writeIncomingFilesToBox(box *Box, files []IncomingFile,
}
}
if err := s.writeUploadedObject(context.Background(), backend, objectKey, file, incoming.Size(), maxSize, contentType); err != nil {
if err := s.writeUploadedObject(ctx, backend, objectKey, file, incoming.Size(), maxSize, contentType); err != nil {
file.Close()
_ = backend.Delete(context.Background(), objectKey)
return err
}
file.Close()
@@ -811,6 +819,9 @@ func (s *UploadService) ThumbnailObjectKey(box Box, file File) string {
}
func (s *UploadService) OpenFileObject(ctx context.Context, box Box, file File) (StorageObject, error) {
if file.Processing {
return StorageObject{}, fmt.Errorf("file is still processing")
}
backend, err := s.storage.Backend(s.BoxStorageBackendID(box))
if err != nil {
return StorageObject{}, err
@@ -932,6 +943,13 @@ func (s *UploadService) WriteZip(w io.Writer, box Box) error {
}
func (s *UploadService) SaveBox(box Box) error {
if err := s.saveBoxRecord(box); err != nil {
return err
}
return s.writeBoxMetadata(box)
}
func (s *UploadService) saveBoxRecord(box Box) error {
if box.StorageBackendID == "" {
box.StorageBackendID = StorageBackendLocal
}
@@ -941,10 +959,7 @@ func (s *UploadService) SaveBox(box Box) error {
}
return s.db.Update(func(tx *bbolt.Tx) error {
if err := tx.Bucket(boxesBucket).Put([]byte(box.ID), data); err != nil {
return err
}
return s.writeBoxMetadata(box)
return tx.Bucket(boxesBucket).Put([]byte(box.ID), data)
})
}
@@ -957,6 +972,7 @@ func (s *UploadService) resultForBox(box Box, deleteToken string) UploadResult {
Size: helpers.FormatBytes(file.Size),
URL: fmt.Sprintf("%s/d/%s/f/%s", s.baseURL, box.ID, file.ID),
ThumbnailURL: fmt.Sprintf("%s/d/%s/thumb/%s", s.baseURL, box.ID, file.ID),
Processing: file.Processing,
})
}
@@ -1016,9 +1032,26 @@ func (s *UploadService) writeUploadedObject(ctx context.Context, backend Storage
reader = io.LimitReader(source, maxSize)
putSize = size
}
if ctx != nil {
reader = contextReader{ctx: ctx, reader: reader}
}
return backend.Put(ctx, key, reader, putSize, contentType)
}
type contextReader struct {
ctx context.Context
reader io.Reader
}
func (r contextReader) Read(p []byte) (int, error) {
select {
case <-r.ctx.Done():
return 0, r.ctx.Err()
default:
return r.reader.Read(p)
}
}
func boxObjectKey(boxID, name string) string {
return filepath.ToSlash(filepath.Join(boxID, name))
}

View File

@@ -133,10 +133,32 @@ func TestResumableSessionUploadOutOfOrderAndComplete(t *testing.T) {
Size: 11,
ContentType: "text/plain",
Fingerprint: "sha256:first-chunk",
}}, UploadOptions{MaxDays: 1, Password: "secret"}, 4, time.Hour)
}}, UploadOptions{MaxDays: 1, Password: "secret"}, 4, time.Hour, "")
if err != nil {
t.Fatalf("CreateResumableSession returned error: %v", err)
}
if session.ResumeToken == "" || session.ResumeTokenHash == "" {
t.Fatalf("resumable session did not create resume token: %+v", session)
}
if !service.VerifyResumableToken(session, session.ResumeToken) {
t.Fatalf("VerifyResumableToken rejected correct token")
}
if service.VerifyResumableToken(session, "wrong-token") {
t.Fatalf("VerifyResumableToken accepted wrong token")
}
stored, err := service.GetResumableSession(session.ID)
if err != nil {
t.Fatalf("GetResumableSession returned error: %v", err)
}
if stored.ResumeToken != "" {
t.Fatalf("stored session leaked raw resume token")
}
if strings.Contains(stored.ResumeTokenHash, session.ResumeToken) {
t.Fatalf("stored token hash contains raw token")
}
if !service.VerifyResumableToken(stored, session.ResumeToken) {
t.Fatalf("stored session rejected correct token")
}
if session.Options.Password != "" || session.Options.PasswordHash == "" || session.Options.PasswordSalt == "" {
t.Fatalf("resumable session did not hash password before storage: %+v", session.Options)
}
@@ -181,6 +203,13 @@ func TestResumableSessionUploadOutOfOrderAndComplete(t *testing.T) {
if _, err := os.Stat(service.resumableSessionDir(session.ID)); !os.IsNotExist(err) {
t.Fatalf("resumable temp dir after complete error = %v, want os.ErrNotExist", err)
}
replayed, replayedSession, err := service.CompleteResumableSession(testContext(), session.ID)
if err != nil {
t.Fatalf("CompleteResumableSession replay returned error: %v", err)
}
if replayed.BoxID != result.BoxID || replayedSession.Status != ResumableStatusCompleted {
t.Fatalf("replayed result = %+v, session = %+v, want box %s completed", replayed, replayedSession, result.BoxID)
}
}
func TestResumableCompleteRejectsMissingChunks(t *testing.T) {
@@ -189,7 +218,7 @@ func TestResumableCompleteRejectsMissingChunks(t *testing.T) {
Name: "note.txt",
Size: 8,
ContentType: "text/plain",
}}, UploadOptions{MaxDays: 1}, 4, time.Hour)
}}, UploadOptions{MaxDays: 1}, 4, time.Hour, "")
if err != nil {
t.Fatalf("CreateResumableSession returned error: %v", err)
}
@@ -201,6 +230,73 @@ func TestResumableCompleteRejectsMissingChunks(t *testing.T) {
}
}
func TestResumablePartialCompleteKeepsOnlyFinishedFiles(t *testing.T) {
service := newTestUploadService(t)
session, err := service.CreateResumableSession([]ResumableFileInput{
{Name: "done.txt", Size: 4, ContentType: "text/plain", Fingerprint: "done"},
{Name: "partial.txt", Size: 8, ContentType: "text/plain", Fingerprint: "partial"},
}, UploadOptions{MaxDays: 1}, 4, time.Hour, "")
if err != nil {
t.Fatalf("CreateResumableSession returned error: %v", err)
}
if _, err := service.PutResumableChunk(testContext(), session.ID, session.Files[0].ID, 0, strings.NewReader("done")); err != nil {
t.Fatalf("PutResumableChunk done returned error: %v", err)
}
if _, err := service.PutResumableChunk(testContext(), session.ID, session.Files[1].ID, 0, strings.NewReader("part")); err != nil {
t.Fatalf("PutResumableChunk partial returned error: %v", err)
}
result, completed, err := service.CompleteUploadedResumableSession(testContext(), session.ID)
if err != nil {
t.Fatalf("CompleteUploadedResumableSession returned error: %v", err)
}
if completed.Status != ResumableStatusCompleted || completed.BoxID != result.BoxID || len(completed.Files) != 1 {
t.Fatalf("completed session = %+v, result = %+v", completed, result)
}
box := getTestBox(t, service, result.BoxID)
if len(box.Files) != 1 || box.Files[0].Name != "done.txt" {
t.Fatalf("partial completion box files = %+v", box.Files)
}
object, err := service.OpenFileObject(testContext(), box, box.Files[0])
if err != nil {
t.Fatalf("OpenFileObject returned error: %v", err)
}
data, err := io.ReadAll(object.Body)
object.Body.Close()
if err != nil {
t.Fatalf("ReadAll returned error: %v", err)
}
if string(data) != "done" {
t.Fatalf("partial completion object = %q", string(data))
}
if _, err := service.GetResumableSession(session.ID); !os.IsNotExist(err) {
t.Fatalf("GetResumableSession after partial complete error = %v, want os.ErrNotExist", err)
}
if _, err := os.Stat(service.resumableSessionDir(session.ID)); !os.IsNotExist(err) {
t.Fatalf("resumable temp dir after partial complete error = %v, want os.ErrNotExist", err)
}
}
func TestResumablePartialCompleteRejectsNoFinishedFiles(t *testing.T) {
service := newTestUploadService(t)
session, err := service.CreateResumableSession([]ResumableFileInput{{
Name: "partial.txt",
Size: 8,
ContentType: "text/plain",
}}, UploadOptions{MaxDays: 1}, 4, time.Hour, "")
if err != nil {
t.Fatalf("CreateResumableSession returned error: %v", err)
}
if _, err := service.PutResumableChunk(testContext(), session.ID, session.Files[0].ID, 0, strings.NewReader("part")); err != nil {
t.Fatalf("PutResumableChunk returned error: %v", err)
}
if _, _, err := service.CompleteUploadedResumableSession(testContext(), session.ID); err == nil {
t.Fatalf("CompleteUploadedResumableSession accepted no completed files")
}
if _, err := service.GetResumableSession(session.ID); err != nil {
t.Fatalf("GetResumableSession after failed partial complete returned error: %v", err)
}
}
func TestResumableSessionCanAddFilesBeforeComplete(t *testing.T) {
service := newTestUploadService(t)
session, err := service.CreateResumableSession([]ResumableFileInput{{
@@ -208,7 +304,7 @@ func TestResumableSessionCanAddFilesBeforeComplete(t *testing.T) {
Size: 4,
ContentType: "text/plain",
Fingerprint: "one",
}}, UploadOptions{MaxDays: 1}, 4, time.Hour)
}}, UploadOptions{MaxDays: 1}, 4, time.Hour, "")
if err != nil {
t.Fatalf("CreateResumableSession returned error: %v", err)
}
@@ -264,7 +360,7 @@ func TestResumableCleanupRemovesExpiredSessionsAndChunks(t *testing.T) {
Name: "note.txt",
Size: 4,
ContentType: "text/plain",
}}, UploadOptions{MaxDays: 1}, 4, time.Millisecond)
}}, UploadOptions{MaxDays: 1}, 4, time.Millisecond, "")
if err != nil {
t.Fatalf("CreateResumableSession returned error: %v", err)
}