From 5cd476e7f36390c42421bd156dcecc0625af58c9 Mon Sep 17 00:00:00 2001 From: Daniel Legt Date: Tue, 2 Jun 2026 17:41:41 +0300 Subject: [PATCH] feat(uploads): add native resumable upload support Implement a native chunked resumable upload API and frontend integration to support reliable large file uploads. Changes include: - Added a 3-step resumable upload API flow (create session, upload chunks, complete session). - Introduced configuration options for chunk size, retention hours, and toggling the feature. - Updated the frontend to utilize resumable uploads with progress tracking. - Configured temporary chunk storage under `data/tmp/uploads` with automatic cleanup. - Documented the API flow and configuration in the README. --- .env.example | 3 + README.md | 33 ++ backend/libs/config/config.go | 94 ++-- backend/libs/config/config_test.go | 9 + backend/libs/handlers/app.go | 6 + backend/libs/handlers/resumable.go | 328 ++++++++++++++ backend/libs/handlers/upload.go | 15 +- backend/libs/handlers/upload_stage3_test.go | 210 ++++++++- backend/libs/jobs/cleanup.go | 15 +- backend/libs/services/resumable.go | 454 ++++++++++++++++++++ backend/libs/services/upload.go | 113 ++++- backend/libs/services/upload_test.go | 160 +++++++ backend/static/css/20-upload.css | 22 + backend/static/js/40-upload.js | 399 ++++++++++++++++- backend/templates/pages/api.html | 26 ++ scripts/env/dev.env.example | 3 + 16 files changed, 1805 insertions(+), 85 deletions(-) create mode 100644 backend/libs/handlers/resumable.go create mode 100644 backend/libs/services/resumable.go diff --git a/.env.example b/.env.example index b33b71a..169e5ab 100644 --- a/.env.example +++ b/.env.example @@ -9,6 +9,9 @@ WARPBOX_CLEANUP_ENABLED=true WARPBOX_CLEANUP_EVERY=1h WARPBOX_THUMBNAIL_ENABLED=true WARPBOX_THUMBNAIL_EVERY=1m +WARPBOX_RESUMABLE_UPLOADS_ENABLED=true +WARPBOX_RESUMABLE_CHUNK_MB=64 +WARPBOX_RESUMABLE_RETENTION_HOURS=1 WARPBOX_MAX_UPLOAD_SIZE_MB=16384 WARPBOX_ANONYMOUS_UPLOADS_ENABLED=true WARPBOX_ANONYMOUS_MAX_UPLOAD_MB=512 diff --git a/README.md b/README.md index 34d5f6c..007eaef 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,9 @@ Upload policy defaults are also configured in megabytes and can later be changed - `WARPBOX_SHORT_WINDOW_SECONDS=60` - `WARPBOX_ANONYMOUS_STORAGE_BACKEND=local` - `WARPBOX_USER_STORAGE_BACKEND=local` +- `WARPBOX_RESUMABLE_UPLOADS_ENABLED=true` +- `WARPBOX_RESUMABLE_CHUNK_MB=8` +- `WARPBOX_RESUMABLE_RETENTION_HOURS=24` - `WARPBOX_TRUSTED_PROXIES=` controls whether forwarded client IP headers are accepted only from specific proxy IPs/CIDRs. See [SECURITY_PROXY.md](./SECURITY_PROXY.md). Runtime data is configured with `WARPBOX_DATA_DIR` and defaults to `./data` in the dev environment. @@ -43,6 +46,10 @@ Large uploads are expected to take minutes on normal home/server connections. Ke mid-upload; `WARPBOX_READ_HEADER_TIMEOUT=15s` still protects header reads from slowloris-style connections. +Browser uploads use Warpbox-native resumable uploads by default. Chunks are stored temporarily under +`data/tmp/uploads/{session_id}` and then streamed into the selected storage backend when the upload +is completed. Stale sessions are cleaned by the cleanup job after `WARPBOX_RESUMABLE_RETENTION_HOURS`. + Background jobs are enabled with `WARPBOX_JOBS_ENABLED=true`. Individual jobs can be toggled with `WARPBOX_CLEANUP_ENABLED` and `WARPBOX_THUMBNAIL_ENABLED`, and their schedules are configured with `WARPBOX_CLEANUP_EVERY` and `WARPBOX_THUMBNAIL_EVERY`. @@ -242,6 +249,31 @@ ShareX selection — which ShareX sends as separate back-to-back requests — la link. The shipped `.sxcu` sets `X-Warpbox-Batch: sharex`; remove that header for one box per file. Requests without the header behave exactly as before. +### Resumable API flow + +Custom clients can use the resumable JSON API for large files: + +```bash +# 1. Create a resumable session from file metadata. +curl -s http://localhost:8080/api/v1/uploads/resumable \ + -H 'Accept: application/json' \ + -H 'Content-Type: application/json' \ + -d '{"files":[{"name":"large.bin","size":1048576,"contentType":"application/octet-stream"}],"expiresMinutes":1440}' + +# 2. Upload exact-sized chunks using the returned sessionId, file id, and chunkSize. +dd if=./large.bin bs=8388608 count=1 skip=0 2>/dev/null | \ + curl -X PUT --data-binary @- \ + http://localhost:8080/api/v1/uploads/resumable/SESSION_ID/files/FILE_ID/chunks/0 + +# 3. Complete the session after all chunks are present. +curl -X POST -H 'Accept: application/json' \ + http://localhost:8080/api/v1/uploads/resumable/SESSION_ID/complete +``` + +The complete response is the same JSON shape as `POST /api/v1/upload`, including `boxUrl`, +`manageUrl`, `deleteUrl`, and file URLs. Send `Authorization: Bearer ` on every resumable +request to upload as an account. + ## Stage 4 Accounts + Personal Boxes - `/register` bootstraps the first admin account only when no users exist. @@ -275,6 +307,7 @@ Warpbox keeps local runtime data under the configured data directory: - `data/files/{box_id}/@each@{file_id}.ext` - uploaded file contents when the local backend is selected. - `data/files/{box_id}/@thumb@{file_id}.jpg` - generated previews when the local backend is selected. +- `data/tmp/uploads/{session_id}` - temporary chunks for unfinished resumable uploads. - `data/db/warpbox.bbolt` - bbolt metadata database for boxes and file records. - `data/db/warpbox.bbolt` also stores users, sessions, invites, and collections. - `data/db/warpbox.bbolt` stores upload policy settings and daily usage records keyed by plain IP diff --git a/backend/libs/config/config.go b/backend/libs/config/config.go index 7dbf7bf..6cb7cdd 100644 --- a/backend/libs/config/config.go +++ b/backend/libs/config/config.go @@ -11,27 +11,30 @@ import ( ) type Config struct { - AppName string - AppVersion string - Environment string - Addr string - BaseURL string - DataDir string - AdminToken string - StaticDir string - TemplateDir string - ReadHeaderTimeout time.Duration - ReadTimeout time.Duration - WriteTimeout time.Duration - IdleTimeout time.Duration - TrustedProxies []string - JobsEnabled bool - CleanupEnabled bool - CleanupEvery time.Duration - ThumbnailEnabled bool - ThumbnailEvery time.Duration - MaxUploadSize int64 - DefaultSettings SettingsDefaults + AppName string + AppVersion string + Environment string + Addr string + BaseURL string + DataDir string + AdminToken string + StaticDir string + TemplateDir string + ReadHeaderTimeout time.Duration + ReadTimeout time.Duration + WriteTimeout time.Duration + IdleTimeout time.Duration + TrustedProxies []string + JobsEnabled bool + CleanupEnabled bool + CleanupEvery time.Duration + ThumbnailEnabled bool + ThumbnailEvery time.Duration + ResumableUploadsEnabled bool + ResumableChunkSize int64 + ResumableRetention time.Duration + MaxUploadSize int64 + DefaultSettings SettingsDefaults } type SettingsDefaults struct { @@ -56,26 +59,29 @@ type SettingsDefaults struct { func Load() (Config, error) { cfg := Config{ - AppName: envString("WARPBOX_APP_NAME", "warpbox.dev"), - AppVersion: envString("APP_VERSION", "dev"), - Environment: envString("WARPBOX_ENV", "development"), - Addr: envString("WARPBOX_ADDR", ":8080"), - BaseURL: strings.TrimRight(envString("WARPBOX_BASE_URL", "http://localhost:8080"), "/"), - DataDir: envString("WARPBOX_DATA_DIR", defaultPath("data")), - AdminToken: envString("WARPBOX_ADMIN_TOKEN", ""), - StaticDir: envString("WARPBOX_STATIC_DIR", defaultPath("static")), - TemplateDir: envString("WARPBOX_TEMPLATE_DIR", defaultPath("templates")), - ReadHeaderTimeout: envDuration("WARPBOX_READ_HEADER_TIMEOUT", 15*time.Second), - ReadTimeout: envDuration("WARPBOX_READ_TIMEOUT", 0), - WriteTimeout: envDuration("WARPBOX_WRITE_TIMEOUT", 0), - IdleTimeout: envDuration("WARPBOX_IDLE_TIMEOUT", 120*time.Second), - TrustedProxies: envCSV("WARPBOX_TRUSTED_PROXIES"), - JobsEnabled: envBool("WARPBOX_JOBS_ENABLED", true), - CleanupEnabled: envBool("WARPBOX_CLEANUP_ENABLED", true), - CleanupEvery: envDuration("WARPBOX_CLEANUP_EVERY", time.Hour), - ThumbnailEnabled: envBool("WARPBOX_THUMBNAIL_ENABLED", true), - ThumbnailEvery: envDuration("WARPBOX_THUMBNAIL_EVERY", time.Minute), - MaxUploadSize: envMegabytes("WARPBOX_MAX_UPLOAD_SIZE_MB", 2048), // 2 GiB default. + AppName: envString("WARPBOX_APP_NAME", "warpbox.dev"), + AppVersion: envString("APP_VERSION", "dev"), + Environment: envString("WARPBOX_ENV", "development"), + Addr: envString("WARPBOX_ADDR", ":8080"), + BaseURL: strings.TrimRight(envString("WARPBOX_BASE_URL", "http://localhost:8080"), "/"), + DataDir: envString("WARPBOX_DATA_DIR", defaultPath("data")), + AdminToken: envString("WARPBOX_ADMIN_TOKEN", ""), + StaticDir: envString("WARPBOX_STATIC_DIR", defaultPath("static")), + TemplateDir: envString("WARPBOX_TEMPLATE_DIR", defaultPath("templates")), + ReadHeaderTimeout: envDuration("WARPBOX_READ_HEADER_TIMEOUT", 15*time.Second), + ReadTimeout: envDuration("WARPBOX_READ_TIMEOUT", 0), + WriteTimeout: envDuration("WARPBOX_WRITE_TIMEOUT", 0), + IdleTimeout: envDuration("WARPBOX_IDLE_TIMEOUT", 120*time.Second), + TrustedProxies: envCSV("WARPBOX_TRUSTED_PROXIES"), + JobsEnabled: envBool("WARPBOX_JOBS_ENABLED", true), + CleanupEnabled: envBool("WARPBOX_CLEANUP_ENABLED", true), + CleanupEvery: envDuration("WARPBOX_CLEANUP_EVERY", time.Hour), + ThumbnailEnabled: envBool("WARPBOX_THUMBNAIL_ENABLED", true), + ThumbnailEvery: envDuration("WARPBOX_THUMBNAIL_EVERY", time.Minute), + ResumableUploadsEnabled: envBool("WARPBOX_RESUMABLE_UPLOADS_ENABLED", true), + ResumableChunkSize: envMegabytes("WARPBOX_RESUMABLE_CHUNK_MB", 8), + ResumableRetention: time.Duration(envInt("WARPBOX_RESUMABLE_RETENTION_HOURS", 24)) * time.Hour, + MaxUploadSize: envMegabytes("WARPBOX_MAX_UPLOAD_SIZE_MB", 2048), // 2 GiB default. DefaultSettings: SettingsDefaults{ AnonymousUploadsEnabled: envBool("WARPBOX_ANONYMOUS_UPLOADS_ENABLED", true), AnonymousMaxUploadMB: envMegabytesLimitFloat("WARPBOX_ANONYMOUS_MAX_UPLOAD_MB", 512), @@ -103,6 +109,12 @@ func Load() (Config, error) { if cfg.MaxUploadSize <= 0 { return Config{}, fmt.Errorf("WARPBOX_MAX_UPLOAD_SIZE_MB must be positive") } + if cfg.ResumableChunkSize <= 0 { + return Config{}, fmt.Errorf("WARPBOX_RESUMABLE_CHUNK_MB must be positive") + } + if cfg.ResumableRetention <= 0 { + return Config{}, fmt.Errorf("WARPBOX_RESUMABLE_RETENTION_HOURS must be positive") + } if !validUnlimitedMegabyteLimit(cfg.DefaultSettings.AnonymousMaxUploadMB) || !validUnlimitedMegabyteLimit(cfg.DefaultSettings.AnonymousDailyUploadMB) || !validUnlimitedMegabyteLimit(cfg.DefaultSettings.UserDailyUploadMB) || diff --git a/backend/libs/config/config_test.go b/backend/libs/config/config_test.go index 1eff696..a8d15bf 100644 --- a/backend/libs/config/config_test.go +++ b/backend/libs/config/config_test.go @@ -68,4 +68,13 @@ func TestLoadDefaultsUseLargeUploadFriendlyTimeouts(t *testing.T) { if cfg.WriteTimeout != 0 { t.Fatalf("WriteTimeout = %s, want 0 for long uploads", cfg.WriteTimeout) } + if !cfg.ResumableUploadsEnabled { + t.Fatalf("ResumableUploadsEnabled = false, want true") + } + if cfg.ResumableChunkSize != 8*1024*1024 { + t.Fatalf("ResumableChunkSize = %d, want 8 MiB", cfg.ResumableChunkSize) + } + if cfg.ResumableRetention != 24*time.Hour { + t.Fatalf("ResumableRetention = %s, want 24h", cfg.ResumableRetention) + } } diff --git a/backend/libs/handlers/app.go b/backend/libs/handlers/app.go index c982e72..c3ab120 100644 --- a/backend/libs/handlers/app.go +++ b/backend/libs/handlers/app.go @@ -141,6 +141,12 @@ func (a *App) RegisterRoutes(mux *http.ServeMux) { mux.HandleFunc("GET /api/v1/schemas/upload-request.json", a.UploadRequestSchema) mux.HandleFunc("GET /api/v1/schemas/upload-response.json", a.UploadResponseSchema) mux.HandleFunc("POST /api/v1/upload", a.Upload) + mux.HandleFunc("POST /api/v1/uploads/resumable", a.CreateResumableUpload) + mux.HandleFunc("GET /api/v1/uploads/resumable/{sessionID}", a.ResumableUploadStatus) + mux.HandleFunc("POST /api/v1/uploads/resumable/{sessionID}/files", a.AddResumableFiles) + mux.HandleFunc("PUT /api/v1/uploads/resumable/{sessionID}/files/{fileID}/chunks/{index}", a.PutResumableChunk) + mux.HandleFunc("POST /api/v1/uploads/resumable/{sessionID}/complete", a.CompleteResumableUpload) + mux.HandleFunc("DELETE /api/v1/uploads/resumable/{sessionID}", a.CancelResumableUpload) mux.HandleFunc("GET /emoji/{pack}/{file}", a.EmojiAsset) mux.Handle("GET /static/", a.Static()) } diff --git a/backend/libs/handlers/resumable.go b/backend/libs/handlers/resumable.go new file mode 100644 index 0000000..61ad676 --- /dev/null +++ b/backend/libs/handlers/resumable.go @@ -0,0 +1,328 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "warpbox.dev/backend/libs/helpers" + "warpbox.dev/backend/libs/jobs" + "warpbox.dev/backend/libs/services" +) + +type resumableCreateRequest struct { + Files []services.ResumableFileInput `json:"files"` + MaxDays int `json:"maxDays"` + ExpiresMinutes int `json:"expiresMinutes"` + MaxDownloads int `json:"maxDownloads"` + Password string `json:"password"` + ObfuscateMetadata bool `json:"obfuscateMetadata"` + CollectionID string `json:"collectionId"` +} + +type resumableSessionResponse struct { + SessionID string `json:"sessionId"` + ChunkSize int64 `json:"chunkSize"` + Status string `json:"status"` + BoxID string `json:"boxId,omitempty"` + ExpiresAt string `json:"expiresAt"` + Files []services.ResumableFile `json:"files"` +} + +func (a *App) CreateResumableUpload(w http.ResponseWriter, r *http.Request) { + if !a.cfg.ResumableUploadsEnabled { + helpers.WriteJSONError(w, http.StatusForbidden, "resumable uploads are disabled") + return + } + user, loggedIn, authErr := a.currentUserWithAuthError(r) + if authErr != nil { + a.logger.Warn("resumable upload rejected invalid bearer token", withRequestLogAttrs(r, "source", "user-upload", "severity", "warn", "code", 4011)...) + helpers.WriteJSONError(w, http.StatusUnauthorized, "invalid bearer token") + return + } + isAdminUpload := loggedIn && user.Role == services.UserRoleAdmin + settings, policy, ok := a.loadUploadPolicyForAPI(w, r, user, loggedIn) + if !ok { + return + } + if !loggedIn && !settings.AnonymousUploadsEnabled { + a.logger.Warn("resumable anonymous upload rejected disabled", withRequestLogAttrs(r, "source", "user-upload", "severity", "warn", "code", 4013)...) + helpers.WriteJSONError(w, http.StatusForbidden, "anonymous uploads are disabled") + return + } + rateKey := uploadRateKey(r, user, loggedIn) + if !isAdminUpload && policy.ShortRequests > 0 && !a.rateLimiter.Allow("upload:"+rateKey, policy.ShortRequests, policy.ShortWindow, time.Now().UTC()) { + a.logger.Warn("resumable upload rate limited", withRequestLogAttrs(r, "source", "user-upload", "severity", "warn", "code", 4291, "user_id", user.ID)...) + helpers.WriteJSONError(w, http.StatusTooManyRequests, "too many upload requests, please slow down") + return + } + + var payload resumableCreateRequest + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + helpers.WriteJSONError(w, http.StatusBadRequest, "upload session request could not be read") + return + } + fileSizes := make([]int64, 0, len(payload.Files)) + var totalBytes int64 + for _, file := range payload.Files { + fileSizes = append(fileSizes, file.Size) + totalBytes += file.Size + } + if !isAdminUpload { + if status, message := a.checkUploadPolicyForSizes(r, user, loggedIn, settings, policy, fileSizes, totalBytes); message != "" { + a.logger.Warn("resumable upload rejected by policy", withRequestLogAttrs(r, "source", "quota", "severity", "warn", "code", status, "user_id", user.ID, "message", message, "bytes", totalBytes, "files", len(payload.Files))...) + helpers.WriteJSONError(w, status, message) + return + } + if status, message := a.checkBoxCreationPolicy(r, user, loggedIn, policy); message != "" { + a.logger.Warn("resumable upload rejected by box policy", withRequestLogAttrs(r, "source", "quota", "severity", "warn", "code", status, "user_id", user.ID, "message", message, "bytes", totalBytes, "files", len(payload.Files))...) + helpers.WriteJSONError(w, status, message) + return + } + } + + opts, err := a.resumableUploadOptions(r, payload, user, loggedIn, isAdminUpload, policy) + if err != nil { + helpers.WriteJSONError(w, http.StatusRequestEntityTooLarge, err.Error()) + return + } + session, err := a.uploadService.CreateResumableSession(payload.Files, opts, a.cfg.ResumableChunkSize, a.cfg.ResumableRetention) + if err != nil { + a.logger.Warn("resumable session create failed", withRequestLogAttrs(r, "source", "user-upload", "severity", "warn", "code", 4002, "user_id", user.ID, "error", err.Error())...) + helpers.WriteJSONError(w, http.StatusBadRequest, err.Error()) + return + } + a.logger.Info("resumable upload session created", withRequestLogAttrs(r, "source", "user-upload", "severity", "user_activity", "code", 2002, "user_id", user.ID, "session_id", session.ID, "files", len(session.Files), "bytes", totalBytes, "anonymous", !loggedIn)...) + helpers.WriteJSON(w, http.StatusCreated, resumableResponse(session)) +} + +func (a *App) ResumableUploadStatus(w http.ResponseWriter, r *http.Request) { + session, ok := a.authorizedResumableSession(w, r) + if !ok { + return + } + helpers.WriteJSON(w, http.StatusOK, resumableResponse(session)) +} + +func (a *App) AddResumableFiles(w http.ResponseWriter, r *http.Request) { + session, ok := a.authorizedResumableSession(w, r) + if !ok { + return + } + user, loggedIn, _ := a.currentUserWithAuthError(r) + isAdminUpload := loggedIn && user.Role == services.UserRoleAdmin + settings, policy, ok := a.loadUploadPolicyForAPI(w, r, user, loggedIn) + if !ok { + return + } + var payload struct { + Files []services.ResumableFileInput `json:"files"` + } + if err := json.NewDecoder(r.Body).Decode(&payload); err != nil { + helpers.WriteJSONError(w, http.StatusBadRequest, "upload files request could not be read") + return + } + fileSizes := make([]int64, 0, len(session.Files)+len(payload.Files)) + var totalBytes int64 + for _, file := range session.Files { + fileSizes = append(fileSizes, file.Size) + totalBytes += file.Size + } + for _, file := range payload.Files { + fileSizes = append(fileSizes, file.Size) + totalBytes += file.Size + } + if !isAdminUpload { + if status, message := a.checkUploadPolicyForSizes(r, user, loggedIn, settings, policy, fileSizes, totalBytes); message != "" { + helpers.WriteJSONError(w, status, message) + return + } + } + updated, err := a.uploadService.AddResumableFiles(session.ID, payload.Files) + if err != nil { + helpers.WriteJSONError(w, http.StatusBadRequest, err.Error()) + return + } + a.logger.Info("resumable upload files added", withRequestLogAttrs(r, "source", "user-upload", "severity", "user_activity", "code", 2006, "session_id", session.ID, "added", len(updated.Files)-len(session.Files), "files", len(updated.Files))...) + helpers.WriteJSON(w, http.StatusOK, resumableResponse(updated)) +} + +func (a *App) PutResumableChunk(w http.ResponseWriter, r *http.Request) { + session, ok := a.authorizedResumableSession(w, r) + if !ok { + return + } + fileID := r.PathValue("fileID") + index, err := strconv.Atoi(r.PathValue("index")) + if err != nil { + helpers.WriteJSONError(w, http.StatusBadRequest, "chunk index is invalid") + return + } + updated, err := a.uploadService.PutResumableChunk(r.Context(), session.ID, fileID, index, r.Body) + if err != nil { + a.logger.Warn("resumable chunk failed", withRequestLogAttrs(r, "source", "user-upload", "severity", "warn", "code", 4003, "session_id", session.ID, "file_id", fileID, "chunk", index, "error", err.Error())...) + helpers.WriteJSONError(w, http.StatusBadRequest, err.Error()) + return + } + a.logger.Info("resumable chunk uploaded", withRequestLogAttrs(r, "source", "user-upload", "severity", "user_activity", "code", 2003, "session_id", session.ID, "file_id", fileID, "chunk", index)...) + helpers.WriteJSON(w, http.StatusOK, resumableResponse(updated)) +} + +func (a *App) CompleteResumableUpload(w http.ResponseWriter, r *http.Request) { + session, ok := a.authorizedResumableSession(w, r) + if !ok { + return + } + user, loggedIn, _ := a.currentUserWithAuthError(r) + isAdminUpload := loggedIn && user.Role == services.UserRoleAdmin + settings, policy, ok := a.loadUploadPolicyForAPI(w, r, user, loggedIn) + if !ok { + return + } + fileSizes := make([]int64, 0, len(session.Files)) + var totalBytes int64 + for _, file := range session.Files { + fileSizes = append(fileSizes, file.Size) + totalBytes += file.Size + } + if !isAdminUpload { + if status, message := a.checkUploadPolicyForSizes(r, user, loggedIn, settings, policy, fileSizes, totalBytes); message != "" { + helpers.WriteJSONError(w, status, message) + return + } + if status, message := a.checkBoxCreationPolicy(r, user, loggedIn, policy); message != "" { + helpers.WriteJSONError(w, status, message) + return + } + if status, message := a.checkStorageBackendCapacity(session.Options.StorageBackendID, settings, totalBytes); message != "" { + helpers.WriteJSONError(w, status, message) + return + } + } + result, completed, err := a.uploadService.CompleteResumableSession(r.Context(), session.ID) + if err != nil { + a.logger.Warn("resumable upload complete failed", withRequestLogAttrs(r, "source", "user-upload", "severity", "warn", "code", 4004, "session_id", session.ID, "error", err.Error())...) + helpers.WriteJSONError(w, http.StatusBadRequest, err.Error()) + return + } + if !isAdminUpload { + if err := a.recordUploadUsage(r, user, loggedIn, totalBytes, 1); err != nil { + a.logger.Warn("failed to record resumable upload usage", "source", "quota", "severity", "warn", "code", 4404, "error", err.Error()) + } + if err := a.settingsService.CleanupUsage(time.Now().UTC(), settings.UsageRetentionDays); err != nil { + a.logger.Warn("failed to cleanup upload usage", "source", "quota", "severity", "warn", "code", 4405, "error", err.Error()) + } + } + jobs.GenerateThumbnailsForBoxAsync(a.uploadService, a.logger, result.BoxID) + a.logger.Info("resumable upload completed", withRequestLogAttrs(r, "source", "user-upload", "severity", "user_activity", "code", 2004, "user_id", user.ID, "session_id", completed.ID, "box_id", result.BoxID, "files", len(result.Files), "bytes", totalBytes, "admin", isAdminUpload, "anonymous", !loggedIn)...) + helpers.WriteJSON(w, http.StatusCreated, result) +} + +func (a *App) CancelResumableUpload(w http.ResponseWriter, r *http.Request) { + session, ok := a.authorizedResumableSession(w, r) + if !ok { + return + } + if err := a.uploadService.CancelResumableSession(session.ID); err != nil { + helpers.WriteJSONError(w, http.StatusBadRequest, err.Error()) + return + } + a.logger.Info("resumable upload cancelled", withRequestLogAttrs(r, "source", "user-upload", "severity", "user_activity", "code", 2005, "session_id", session.ID)...) + w.WriteHeader(http.StatusNoContent) +} + +func (a *App) authorizedResumableSession(w http.ResponseWriter, r *http.Request) (services.ResumableSession, bool) { + user, loggedIn, authErr := a.currentUserWithAuthError(r) + if authErr != nil { + helpers.WriteJSONError(w, http.StatusUnauthorized, "invalid bearer token") + return services.ResumableSession{}, false + } + session, err := a.uploadService.GetResumableSession(r.PathValue("sessionID")) + if err != nil { + helpers.WriteJSONError(w, http.StatusNotFound, "upload session not found") + return services.ResumableSession{}, false + } + if loggedIn { + if session.Options.OwnerID != user.ID { + helpers.WriteJSONError(w, http.StatusForbidden, "upload session not found") + return services.ResumableSession{}, false + } + return session, true + } + if session.Options.OwnerID != "" || session.Options.CreatorIP != uploadClientIP(r) { + helpers.WriteJSONError(w, http.StatusForbidden, "upload session not found") + return services.ResumableSession{}, false + } + return session, true +} + +func (a *App) loadUploadPolicyForAPI(w http.ResponseWriter, r *http.Request, user services.User, loggedIn bool) (services.UploadPolicySettings, services.EffectiveUploadPolicy, bool) { + settings, err := a.settingsService.UploadPolicy() + if err != nil { + a.logger.Error("failed to load upload policy", "source", "settings", "severity", "error", "code", 5006, "error", err.Error()) + helpers.WriteJSONError(w, http.StatusInternalServerError, "upload policy could not be loaded") + return services.UploadPolicySettings{}, services.EffectiveUploadPolicy{}, false + } + return settings, a.effectiveUploadPolicy(settings, user, loggedIn), true +} + +func (a *App) resumableUploadOptions(r *http.Request, payload resumableCreateRequest, user services.User, loggedIn, isAdminUpload bool, policy services.EffectiveUploadPolicy) (services.UploadOptions, error) { + var ownerID string + var collectionID string + if loggedIn { + ownerID = user.ID + collectionID = strings.TrimSpace(payload.CollectionID) + if !a.authService.CollectionOwnedBy(collectionID, user.ID) { + return services.UploadOptions{}, fmt.Errorf("collection not found") + } + } + unlimitedExpiry := isAdminUpload || policy.MaxDays < 0 + rawMaxDays := payload.MaxDays + maxDays := rawMaxDays + if maxDays <= 0 { + maxDays = 7 + if policy.MaxDays > 0 && policy.MaxDays < maxDays { + maxDays = policy.MaxDays + } + } + expiresMinutes := payload.ExpiresMinutes + if expiresMinutes < 0 || rawMaxDays < 0 { + if !unlimitedExpiry { + return services.UploadOptions{}, fmt.Errorf("expiration cannot exceed %d days", policy.MaxDays) + } + expiresMinutes = -1 + } else if !unlimitedExpiry { + if maxDays > policy.MaxDays { + return services.UploadOptions{}, fmt.Errorf("expiration cannot exceed %d days", policy.MaxDays) + } + if expiresMinutes > 0 && expiresMinutes > policy.MaxDays*24*60 { + return services.UploadOptions{}, fmt.Errorf("expiration cannot exceed %d days", policy.MaxDays) + } + } + return services.UploadOptions{ + MaxDays: maxDays, + ExpiresInMinutes: expiresMinutes, + MaxDownloads: payload.MaxDownloads, + Password: payload.Password, + ObfuscateMetadata: payload.ObfuscateMetadata, + OwnerID: ownerID, + CollectionID: collectionID, + SkipSizeLimit: isAdminUpload || policy.MaxUploadMB < 0, + CreatorIP: uploadClientIP(r), + StorageBackendID: policy.StorageBackendID, + }, nil +} + +func resumableResponse(session services.ResumableSession) resumableSessionResponse { + return resumableSessionResponse{ + SessionID: session.ID, + ChunkSize: session.ChunkSize, + Status: session.Status, + BoxID: session.BoxID, + ExpiresAt: session.ExpiresAt.Format(time.RFC3339), + Files: session.Files, + } +} diff --git a/backend/libs/handlers/upload.go b/backend/libs/handlers/upload.go index 2db6dd2..8ce7602 100644 --- a/backend/libs/handlers/upload.go +++ b/backend/libs/handlers/upload.go @@ -228,11 +228,22 @@ func (a *App) checkUploadPolicy(r *http.Request, user services.User, loggedIn bo if len(files) == 0 { return 0, "" } + sizes := make([]int64, 0, len(files)) + for _, file := range files { + sizes = append(sizes, file.Size) + } + return a.checkUploadPolicyForSizes(r, user, loggedIn, settings, policy, sizes, totalBytes) +} + +func (a *App) checkUploadPolicyForSizes(r *http.Request, user services.User, loggedIn bool, settings services.UploadPolicySettings, policy services.EffectiveUploadPolicy, fileSizes []int64, totalBytes int64) (int, string) { + if len(fileSizes) == 0 { + return 0, "" + } now := time.Now().UTC() if policy.MaxUploadMB > 0 { maxBytes := services.MegabytesToBytes(policy.MaxUploadMB) - for _, file := range files { - if file.Size > maxBytes { + for _, fileSize := range fileSizes { + if fileSize > maxBytes { return http.StatusRequestEntityTooLarge, "file exceeds upload size limit" } } diff --git a/backend/libs/handlers/upload_stage3_test.go b/backend/libs/handlers/upload_stage3_test.go index 09d7128..f3e64de 100644 --- a/backend/libs/handlers/upload_stage3_test.go +++ b/backend/libs/handlers/upload_stage3_test.go @@ -2,6 +2,7 @@ package handlers import ( "bytes" + "context" "encoding/json" "io" "log/slog" @@ -10,8 +11,10 @@ import ( "net/http/httptest" "os" "path/filepath" + "strconv" "strings" "testing" + "time" "warpbox.dev/backend/libs/config" "warpbox.dev/backend/libs/services" @@ -103,6 +106,196 @@ func TestUploadTextResponseReturnsOnlyBoxURL(t *testing.T) { } } +func TestResumableUploadFlowCreatesNormalBox(t *testing.T) { + app, cleanup := newTestApp(t) + defer cleanup() + + createBody := `{"files":[{"name":"note.txt","size":11,"contentType":"text/plain"}],"expiresMinutes":60}` + createRequest := httptest.NewRequest(http.MethodPost, "/api/v1/uploads/resumable", strings.NewReader(createBody)) + createRequest.Header.Set("Accept", "application/json") + createResponse := httptest.NewRecorder() + app.CreateResumableUpload(createResponse, createRequest) + if createResponse.Code != http.StatusCreated { + t.Fatalf("create status = %d, body = %s", createResponse.Code, createResponse.Body.String()) + } + var session struct { + SessionID string `json:"sessionId"` + ChunkSize int64 `json:"chunkSize"` + Files []struct { + ID string `json:"id"` + ChunkCount int `json:"chunkCount"` + UploadedChunks []int `json:"uploadedChunks"` + } `json:"files"` + } + if err := json.Unmarshal(createResponse.Body.Bytes(), &session); err != nil { + t.Fatalf("json.Unmarshal session returned error: %v", err) + } + if session.SessionID == "" || session.ChunkSize != 4 || len(session.Files) != 1 || session.Files[0].ChunkCount != 3 { + t.Fatalf("unexpected session response: %+v", session) + } + + chunks := map[int]string{1: "o wo", 0: "hell", 2: "rld"} + for index, body := range chunks { + request := httptest.NewRequest(http.MethodPut, "/api/v1/uploads/resumable/"+session.SessionID+"/files/"+session.Files[0].ID+"/chunks/"+strconv.Itoa(index), strings.NewReader(body)) + request.SetPathValue("sessionID", session.SessionID) + request.SetPathValue("fileID", session.Files[0].ID) + request.SetPathValue("index", strconv.Itoa(index)) + response := httptest.NewRecorder() + app.PutResumableChunk(response, request) + if response.Code != http.StatusOK { + t.Fatalf("chunk %d status = %d, body = %s", index, response.Code, response.Body.String()) + } + } + + completeRequest := httptest.NewRequest(http.MethodPost, "/api/v1/uploads/resumable/"+session.SessionID+"/complete", nil) + completeRequest.SetPathValue("sessionID", session.SessionID) + completeResponse := httptest.NewRecorder() + app.CompleteResumableUpload(completeResponse, completeRequest) + if completeResponse.Code != http.StatusCreated { + t.Fatalf("complete status = %d, body = %s", completeResponse.Code, completeResponse.Body.String()) + } + var payload services.UploadResult + if err := json.Unmarshal(completeResponse.Body.Bytes(), &payload); err != nil { + t.Fatalf("json.Unmarshal result returned error: %v", err) + } + box, err := app.uploadService.GetBox(payload.BoxID) + if err != nil { + t.Fatalf("GetBox returned error: %v", err) + } + if len(box.Files) != 1 || box.Files[0].Name != "note.txt" || box.Files[0].Size != 11 { + t.Fatalf("unexpected box files: %+v", box.Files) + } + object, err := app.uploadService.OpenFileObject(context.Background(), box, box.Files[0]) + if err != nil { + t.Fatalf("OpenFileObject returned error: %v", err) + } + data, err := io.ReadAll(object.Body) + object.Body.Close() + if err != nil { + t.Fatalf("ReadAll returned error: %v", err) + } + if string(data) != "hello world" { + t.Fatalf("uploaded body = %q", string(data)) + } +} + +func TestResumableUploadRequiresAllChunks(t *testing.T) { + app, cleanup := newTestApp(t) + defer cleanup() + + createRequest := httptest.NewRequest(http.MethodPost, "/api/v1/uploads/resumable", strings.NewReader(`{"files":[{"name":"note.txt","size":8,"contentType":"text/plain"}]}`)) + createResponse := httptest.NewRecorder() + app.CreateResumableUpload(createResponse, createRequest) + if createResponse.Code != http.StatusCreated { + t.Fatalf("create status = %d, body = %s", createResponse.Code, createResponse.Body.String()) + } + var session struct { + SessionID string `json:"sessionId"` + Files []struct { + ID string `json:"id"` + } `json:"files"` + } + if err := json.Unmarshal(createResponse.Body.Bytes(), &session); err != nil { + t.Fatalf("json.Unmarshal session returned error: %v", err) + } + chunkRequest := httptest.NewRequest(http.MethodPut, "/api/v1/uploads/resumable/"+session.SessionID+"/files/"+session.Files[0].ID+"/chunks/0", strings.NewReader("hell")) + chunkRequest.SetPathValue("sessionID", session.SessionID) + chunkRequest.SetPathValue("fileID", session.Files[0].ID) + chunkRequest.SetPathValue("index", "0") + chunkResponse := httptest.NewRecorder() + app.PutResumableChunk(chunkResponse, chunkRequest) + if chunkResponse.Code != http.StatusOK { + t.Fatalf("chunk status = %d, body = %s", chunkResponse.Code, chunkResponse.Body.String()) + } + + completeRequest := httptest.NewRequest(http.MethodPost, "/api/v1/uploads/resumable/"+session.SessionID+"/complete", nil) + completeRequest.SetPathValue("sessionID", session.SessionID) + completeResponse := httptest.NewRecorder() + app.CompleteResumableUpload(completeResponse, completeRequest) + if completeResponse.Code != http.StatusBadRequest { + t.Fatalf("complete status = %d, body = %s", completeResponse.Code, completeResponse.Body.String()) + } +} + +func TestResumableUploadCanAddFilesToExistingSession(t *testing.T) { + app, cleanup := newTestApp(t) + defer cleanup() + + createBody := `{"files":[{"name":"one.txt","size":4,"contentType":"text/plain","fingerprint":"one"}],"expiresMinutes":60}` + createRequest := httptest.NewRequest(http.MethodPost, "/api/v1/uploads/resumable", strings.NewReader(createBody)) + createResponse := httptest.NewRecorder() + app.CreateResumableUpload(createResponse, createRequest) + if createResponse.Code != http.StatusCreated { + t.Fatalf("create status = %d, body = %s", createResponse.Code, createResponse.Body.String()) + } + var session struct { + SessionID string `json:"sessionId"` + Files []struct { + ID string `json:"id"` + } `json:"files"` + } + if err := json.Unmarshal(createResponse.Body.Bytes(), &session); err != nil { + t.Fatalf("json.Unmarshal session returned error: %v", err) + } + firstChunk := httptest.NewRequest(http.MethodPut, "/api/v1/uploads/resumable/"+session.SessionID+"/files/"+session.Files[0].ID+"/chunks/0", strings.NewReader("one!")) + firstChunk.SetPathValue("sessionID", session.SessionID) + firstChunk.SetPathValue("fileID", session.Files[0].ID) + firstChunk.SetPathValue("index", "0") + firstChunkResponse := httptest.NewRecorder() + app.PutResumableChunk(firstChunkResponse, firstChunk) + if firstChunkResponse.Code != http.StatusOK { + t.Fatalf("first chunk status = %d, body = %s", firstChunkResponse.Code, firstChunkResponse.Body.String()) + } + + addRequest := httptest.NewRequest(http.MethodPost, "/api/v1/uploads/resumable/"+session.SessionID+"/files", strings.NewReader(`{"files":[{"name":"two.txt","size":4,"contentType":"text/plain","fingerprint":"two"}]}`)) + addRequest.SetPathValue("sessionID", session.SessionID) + addResponse := httptest.NewRecorder() + app.AddResumableFiles(addResponse, addRequest) + if addResponse.Code != http.StatusOK { + t.Fatalf("add status = %d, body = %s", addResponse.Code, addResponse.Body.String()) + } + var updated struct { + Files []struct { + ID string `json:"id"` + Name string `json:"name"` + UploadedChunks []int `json:"uploadedChunks"` + } `json:"files"` + } + if err := json.Unmarshal(addResponse.Body.Bytes(), &updated); err != nil { + t.Fatalf("json.Unmarshal updated returned error: %v", err) + } + if len(updated.Files) != 2 || len(updated.Files[0].UploadedChunks) != 1 { + t.Fatalf("unexpected updated session: %+v", updated) + } + secondChunk := httptest.NewRequest(http.MethodPut, "/api/v1/uploads/resumable/"+session.SessionID+"/files/"+updated.Files[1].ID+"/chunks/0", strings.NewReader("two!")) + secondChunk.SetPathValue("sessionID", session.SessionID) + secondChunk.SetPathValue("fileID", updated.Files[1].ID) + secondChunk.SetPathValue("index", "0") + secondChunkResponse := httptest.NewRecorder() + app.PutResumableChunk(secondChunkResponse, secondChunk) + if secondChunkResponse.Code != http.StatusOK { + t.Fatalf("second chunk status = %d, body = %s", secondChunkResponse.Code, secondChunkResponse.Body.String()) + } + completeRequest := httptest.NewRequest(http.MethodPost, "/api/v1/uploads/resumable/"+session.SessionID+"/complete", nil) + completeRequest.SetPathValue("sessionID", session.SessionID) + completeResponse := httptest.NewRecorder() + app.CompleteResumableUpload(completeResponse, completeRequest) + if completeResponse.Code != http.StatusCreated { + t.Fatalf("complete status = %d, body = %s", completeResponse.Code, completeResponse.Body.String()) + } + var payload services.UploadResult + if err := json.Unmarshal(completeResponse.Body.Bytes(), &payload); err != nil { + t.Fatalf("json.Unmarshal result returned error: %v", err) + } + box, err := app.uploadService.GetBox(payload.BoxID) + if err != nil { + t.Fatalf("GetBox returned error: %v", err) + } + if len(box.Files) != 2 { + t.Fatalf("box file count = %d, want 2", len(box.Files)) + } +} + func TestManageBoxAndDeleteFlow(t *testing.T) { app, cleanup := newTestApp(t) defer cleanup() @@ -214,13 +407,16 @@ func newTestApp(t *testing.T) (*App, func()) { logger := slog.New(slog.NewTextHandler(io.Discard, nil)) cfg := config.Config{ - AppName: "warpbox.dev", - AppVersion: "test", - BaseURL: "http://example.test", - DataDir: filepath.Join(root, "data"), - StaticDir: staticDir, - TemplateDir: templateDir, - MaxUploadSize: 1024 * 1024, + AppName: "warpbox.dev", + AppVersion: "test", + BaseURL: "http://example.test", + DataDir: filepath.Join(root, "data"), + StaticDir: staticDir, + TemplateDir: templateDir, + MaxUploadSize: 1024 * 1024, + ResumableUploadsEnabled: true, + ResumableChunkSize: 4, + ResumableRetention: time.Hour, DefaultSettings: config.SettingsDefaults{ AnonymousUploadsEnabled: true, AnonymousMaxUploadMB: 1, diff --git a/backend/libs/jobs/cleanup.go b/backend/libs/jobs/cleanup.go index 23fc016..8061588 100644 --- a/backend/libs/jobs/cleanup.go +++ b/backend/libs/jobs/cleanup.go @@ -22,6 +22,14 @@ func newCleanupJob(cfg config.Config, logger *slog.Logger, uploadService *servic if cleaned > 0 { logger.Info("cleanup job complete", "source", "housekeeping", "severity", "user_activity", "code", 2202, "cleaned", cleaned) } + cleanedUploads, err := uploadService.CleanupExpiredResumableSessions(time.Now().UTC()) + if err != nil { + logger.Warn("resumable upload cleanup failed", "source", "housekeeping", "severity", "warn", "code", 4204, "error", err.Error()) + return + } + if cleanedUploads > 0 { + logger.Info("resumable uploads cleaned", "source", "housekeeping", "severity", "user_activity", "code", 2204, "cleaned", cleanedUploads) + } if banService != nil { cleanedEvents, err := banService.CleanupAbuseEvents(time.Now().UTC()) if err != nil { @@ -37,7 +45,12 @@ func newCleanupJob(cfg config.Config, logger *slog.Logger, uploadService *servic } func RunCleanupNow(uploadService *services.UploadService, logger *slog.Logger) (int, error) { - return cleanupUnavailableBoxes(uploadService, logger) + cleaned, err := cleanupUnavailableBoxes(uploadService, logger) + if err != nil { + return cleaned, err + } + cleanedUploads, err := uploadService.CleanupExpiredResumableSessions(time.Now().UTC()) + return cleaned + cleanedUploads, err } func cleanupUnavailableBoxes(uploadService *services.UploadService, logger *slog.Logger) (int, error) { diff --git a/backend/libs/services/resumable.go b/backend/libs/services/resumable.go new file mode 100644 index 0000000..295e82e --- /dev/null +++ b/backend/libs/services/resumable.go @@ -0,0 +1,454 @@ +package services + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "path/filepath" + "sort" + "strings" + "time" + + "go.etcd.io/bbolt" +) + +var resumableUploadsBucket = []byte("resumable_uploads") + +const ( + ResumableStatusUploading = "uploading" + ResumableStatusCompleted = "completed" + ResumableStatusCancelled = "cancelled" +) + +type ResumableFileInput struct { + Name string `json:"name"` + Size int64 `json:"size"` + ContentType string `json:"contentType"` + Fingerprint string `json:"fingerprint,omitempty"` +} + +type ResumableSession struct { + ID string `json:"id"` + Options UploadOptions `json:"options"` + Files []ResumableFile `json:"files"` + ChunkSize int64 `json:"chunkSize"` + Status string `json:"status"` + BoxID string `json:"boxId,omitempty"` + CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` + ExpiresAt time.Time `json:"expiresAt"` +} + +type ResumableFile struct { + ID string `json:"id"` + Name string `json:"name"` + Size int64 `json:"size"` + ContentType string `json:"contentType"` + Fingerprint string `json:"fingerprint,omitempty"` + ChunkCount int `json:"chunkCount"` + UploadedChunks []int `json:"uploadedChunks"` +} + +func (s *UploadService) ensureResumableBucket() error { + return s.db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(resumableUploadsBucket) + return err + }) +} + +func (s *UploadService) CreateResumableSession(files []ResumableFileInput, opts UploadOptions, chunkSize int64, retention time.Duration) (ResumableSession, error) { + if len(files) == 0 { + return ResumableSession{}, fmt.Errorf("no files were uploaded") + } + if chunkSize <= 0 { + return ResumableSession{}, fmt.Errorf("chunk size must be positive") + } + if retention <= 0 { + return ResumableSession{}, fmt.Errorf("retention must be positive") + } + if strings.TrimSpace(opts.Password) != "" { + opts.PasswordSalt, opts.PasswordHash = hashPassword(opts.Password) + opts.Password = "" + } + sessionFiles, err := s.resumableFilesFromInput(files, opts, chunkSize, nil) + if err != nil { + return ResumableSession{}, err + } + now := time.Now().UTC() + session := ResumableSession{ + ID: randomID(12), + Options: opts, + Files: sessionFiles, + ChunkSize: chunkSize, + Status: ResumableStatusUploading, + CreatedAt: now, + UpdatedAt: now, + ExpiresAt: now.Add(retention), + } + if err := s.saveResumableSession(session); err != nil { + return ResumableSession{}, err + } + return session, nil +} + +func (s *UploadService) AddResumableFiles(sessionID string, files []ResumableFileInput) (ResumableSession, error) { + if len(files) == 0 { + return s.GetResumableSession(sessionID) + } + session, err := s.GetResumableSession(sessionID) + if err != nil { + return ResumableSession{}, err + } + if err := resumableSessionWritable(session); err != nil { + return ResumableSession{}, err + } + existing := make(map[string]bool) + for _, file := range session.Files { + existing[resumableFileKey(file.Name, file.Size, file.Fingerprint)] = true + } + newFiles, err := s.resumableFilesFromInput(files, session.Options, session.ChunkSize, existing) + if err != nil { + return ResumableSession{}, err + } + if len(newFiles) == 0 { + return session, nil + } + session.Files = append(session.Files, newFiles...) + session.UpdatedAt = time.Now().UTC() + if err := s.saveResumableSession(session); err != nil { + return ResumableSession{}, err + } + return session, nil +} + +func (s *UploadService) GetResumableSession(id string) (ResumableSession, error) { + var session ResumableSession + err := s.db.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(resumableUploadsBucket) + if bucket == nil { + return os.ErrNotExist + } + data := bucket.Get([]byte(id)) + if data == nil { + return os.ErrNotExist + } + return json.Unmarshal(data, &session) + }) + if err != nil { + return ResumableSession{}, err + } + return session, nil +} + +func (s *UploadService) PutResumableChunk(ctx context.Context, sessionID, fileID string, index int, body io.Reader) (ResumableSession, error) { + session, err := s.GetResumableSession(sessionID) + if err != nil { + return ResumableSession{}, err + } + if err := resumableSessionWritable(session); err != nil { + return ResumableSession{}, err + } + fileIndex := -1 + for i, file := range session.Files { + if file.ID == fileID { + fileIndex = i + break + } + } + if fileIndex < 0 { + return ResumableSession{}, os.ErrNotExist + } + file := session.Files[fileIndex] + if index < 0 || index >= file.ChunkCount { + return ResumableSession{}, fmt.Errorf("chunk index is invalid") + } + expectedSize := expectedChunkSize(file.Size, session.ChunkSize, index) + chunkDir := s.resumableFileDir(session.ID, file.ID) + if err := os.MkdirAll(chunkDir, 0o755); err != nil { + return ResumableSession{}, err + } + chunkPath := s.resumableChunkPath(session.ID, file.ID, index) + tempPath := chunkPath + ".tmp" + target, err := os.OpenFile(tempPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + if err != nil { + return ResumableSession{}, err + } + written, copyErr := io.Copy(target, io.LimitReader(body, expectedSize+1)) + closeErr := target.Close() + if copyErr != nil { + _ = os.Remove(tempPath) + return ResumableSession{}, copyErr + } + if closeErr != nil { + _ = os.Remove(tempPath) + return ResumableSession{}, closeErr + } + if written != expectedSize { + _ = os.Remove(tempPath) + return ResumableSession{}, fmt.Errorf("chunk size mismatch") + } + if err := os.Rename(tempPath, chunkPath); err != nil { + _ = os.Remove(tempPath) + return ResumableSession{}, err + } + session.Files[fileIndex].UploadedChunks = addChunkIndex(session.Files[fileIndex].UploadedChunks, index) + session.UpdatedAt = time.Now().UTC() + if err := s.saveResumableSession(session); err != nil { + return ResumableSession{}, err + } + return session, nil +} + +func (s *UploadService) CompleteResumableSession(ctx context.Context, sessionID string) (UploadResult, ResumableSession, error) { + session, err := s.GetResumableSession(sessionID) + if err != nil { + return UploadResult{}, ResumableSession{}, err + } + if err := resumableSessionWritable(session); err != nil { + return UploadResult{}, ResumableSession{}, err + } + staged, cleanup, err := s.assembleResumableFiles(ctx, session) + if err != nil { + return UploadResult{}, ResumableSession{}, err + } + defer cleanup() + + result, err := s.CreateBoxFromIncoming(staged, session.Options) + if err != nil { + return UploadResult{}, ResumableSession{}, err + } + if err := os.RemoveAll(s.resumableSessionDir(session.ID)); err != nil { + return UploadResult{}, ResumableSession{}, err + } + session.Status = ResumableStatusCompleted + session.BoxID = result.BoxID + session.UpdatedAt = time.Now().UTC() + if err := s.saveResumableSession(session); err != nil { + return UploadResult{}, ResumableSession{}, err + } + return result, session, nil +} + +func (s *UploadService) CancelResumableSession(sessionID string) error { + session, err := s.GetResumableSession(sessionID) + if err != nil { + return err + } + session.Status = ResumableStatusCancelled + session.UpdatedAt = time.Now().UTC() + if err := s.saveResumableSession(session); err != nil { + return err + } + return os.RemoveAll(s.resumableSessionDir(session.ID)) +} + +func (s *UploadService) CleanupExpiredResumableSessions(now time.Time) (int, error) { + candidates := make([]ResumableSession, 0) + err := s.db.View(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(resumableUploadsBucket) + if bucket == nil { + return nil + } + return bucket.ForEach(func(_, value []byte) error { + var session ResumableSession + if err := json.Unmarshal(value, &session); err != nil { + return err + } + if !session.ExpiresAt.After(now) || session.Status != ResumableStatusUploading { + candidates = append(candidates, session) + } + return nil + }) + }) + if err != nil { + return 0, err + } + for _, session := range candidates { + if err := os.RemoveAll(s.resumableSessionDir(session.ID)); err != nil { + return 0, err + } + } + err = s.db.Update(func(tx *bbolt.Tx) error { + bucket := tx.Bucket(resumableUploadsBucket) + if bucket == nil { + return nil + } + for _, session := range candidates { + if err := bucket.Delete([]byte(session.ID)); err != nil { + return err + } + } + return nil + }) + return len(candidates), err +} + +func (s *UploadService) saveResumableSession(session ResumableSession) error { + if err := s.ensureResumableBucket(); err != nil { + return err + } + return s.db.Update(func(tx *bbolt.Tx) error { + data, err := json.Marshal(session) + if err != nil { + return err + } + return tx.Bucket(resumableUploadsBucket).Put([]byte(session.ID), data) + }) +} + +func (s *UploadService) resumableFilesFromInput(files []ResumableFileInput, opts UploadOptions, chunkSize int64, existing map[string]bool) ([]ResumableFile, error) { + sessionFiles := make([]ResumableFile, 0, len(files)) + for _, file := range files { + file.Name = filepath.Base(strings.TrimSpace(file.Name)) + if file.Name == "." || file.Name == "" { + return nil, fmt.Errorf("file name is required") + } + if file.Size < 0 { + return nil, fmt.Errorf("file size is invalid") + } + fingerprint := strings.TrimSpace(file.Fingerprint) + key := resumableFileKey(file.Name, file.Size, fingerprint) + if existing != nil && existing[key] { + continue + } + if !opts.SkipSizeLimit { + if err := s.ValidateSize(file.Size); err != nil { + return nil, err + } + } + chunks := int((file.Size + chunkSize - 1) / chunkSize) + if chunks == 0 { + chunks = 1 + } + sessionFiles = append(sessionFiles, ResumableFile{ + ID: randomID(8), + Name: file.Name, + Size: file.Size, + ContentType: strings.TrimSpace(file.ContentType), + Fingerprint: fingerprint, + ChunkCount: chunks, + }) + if existing != nil { + existing[key] = true + } + } + return sessionFiles, nil +} + +func resumableFileKey(name string, size int64, fingerprint string) string { + return strings.TrimSpace(fingerprint) + "|" + filepath.Base(strings.TrimSpace(name)) + "|" + fmt.Sprintf("%d", size) +} + +func (s *UploadService) assembleResumableFiles(ctx context.Context, session ResumableSession) ([]IncomingFile, func(), error) { + assembledDir := filepath.Join(s.resumableSessionDir(session.ID), "assembled") + if err := os.MkdirAll(assembledDir, 0o755); err != nil { + return nil, func() {}, err + } + cleanup := func() { + _ = os.RemoveAll(assembledDir) + } + staged := make([]IncomingFile, 0, len(session.Files)) + for _, file := range session.Files { + if len(file.UploadedChunks) != file.ChunkCount { + cleanup() + return nil, func() {}, fmt.Errorf("file %s is missing chunks", file.Name) + } + assembledPath := filepath.Join(assembledDir, file.ID) + target, err := os.OpenFile(assembledPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) + if err != nil { + cleanup() + return nil, func() {}, err + } + var written int64 + for i := 0; i < file.ChunkCount; i++ { + select { + case <-ctx.Done(): + _ = target.Close() + cleanup() + return nil, func() {}, ctx.Err() + default: + } + chunk, err := os.Open(s.resumableChunkPath(session.ID, file.ID, i)) + if err != nil { + _ = target.Close() + cleanup() + return nil, func() {}, fmt.Errorf("file %s is missing chunks", file.Name) + } + n, copyErr := io.Copy(target, chunk) + closeErr := chunk.Close() + if copyErr != nil { + _ = target.Close() + cleanup() + return nil, func() {}, copyErr + } + if closeErr != nil { + _ = target.Close() + cleanup() + return nil, func() {}, closeErr + } + written += n + } + if err := target.Close(); err != nil { + cleanup() + return nil, func() {}, err + } + if written != file.Size { + cleanup() + return nil, func() {}, fmt.Errorf("assembled file size mismatch") + } + staged = append(staged, StagedUploadFile{ + Filename: file.Name, + FileSize: file.Size, + MIMEType: file.ContentType, + Path: assembledPath, + }) + } + return staged, cleanup, nil +} + +func resumableSessionWritable(session ResumableSession) error { + if session.Status != ResumableStatusUploading { + return fmt.Errorf("upload session is not active") + } + if !session.ExpiresAt.After(time.Now().UTC()) { + return fmt.Errorf("upload session expired") + } + return nil +} + +func expectedChunkSize(fileSize, chunkSize int64, index int) int64 { + offset := int64(index) * chunkSize + remaining := fileSize - offset + if remaining < 0 { + return 0 + } + if remaining > chunkSize { + return chunkSize + } + return remaining +} + +func addChunkIndex(chunks []int, index int) []int { + for _, chunk := range chunks { + if chunk == index { + return chunks + } + } + chunks = append(chunks, index) + sort.Ints(chunks) + return chunks +} + +func (s *UploadService) resumableSessionDir(sessionID string) string { + return filepath.Join(s.dataDir, "tmp", "uploads", sessionID) +} + +func (s *UploadService) resumableFileDir(sessionID, fileID string) string { + return filepath.Join(s.resumableSessionDir(sessionID), fileID) +} + +func (s *UploadService) resumableChunkPath(sessionID, fileID string, index int) string { + return filepath.Join(s.resumableFileDir(sessionID, fileID), fmt.Sprintf("%06d.part", index)) +} diff --git a/backend/libs/services/upload.go b/backend/libs/services/upload.go index afbea2c..1e492c5 100644 --- a/backend/libs/services/upload.go +++ b/backend/libs/services/upload.go @@ -42,6 +42,8 @@ type UploadOptions struct { ExpiresInMinutes int MaxDownloads int Password string + PasswordSalt string + PasswordHash string ObfuscateMetadata bool OwnerID string CollectionID string @@ -50,6 +52,56 @@ type UploadOptions struct { StorageBackendID string } +type IncomingFile interface { + Name() string + Size() int64 + ContentType() string + Open() (io.ReadCloser, error) +} + +type multipartIncomingFile struct { + header *multipart.FileHeader +} + +func (f multipartIncomingFile) Name() string { + return f.header.Filename +} + +func (f multipartIncomingFile) Size() int64 { + return f.header.Size +} + +func (f multipartIncomingFile) ContentType() string { + return f.header.Header.Get("Content-Type") +} + +func (f multipartIncomingFile) Open() (io.ReadCloser, error) { + return f.header.Open() +} + +type StagedUploadFile struct { + Filename string + FileSize int64 + MIMEType string + Path string +} + +func (f StagedUploadFile) Name() string { + return f.Filename +} + +func (f StagedUploadFile) Size() int64 { + return f.FileSize +} + +func (f StagedUploadFile) ContentType() string { + return f.MIMEType +} + +func (f StagedUploadFile) Open() (io.ReadCloser, error) { + return os.Open(f.Path) +} + type Box struct { ID string `json:"id"` OwnerID string `json:"ownerId,omitempty"` @@ -198,6 +250,10 @@ func (s *UploadService) ValidateSize(size int64) error { } func (s *UploadService) CreateBox(files []*multipart.FileHeader, opts UploadOptions) (UploadResult, error) { + return s.CreateBoxFromIncoming(multipartIncomingFiles(files), opts) +} + +func (s *UploadService) CreateBoxFromIncoming(files []IncomingFile, opts UploadOptions) (UploadResult, error) { if len(files) == 0 { return UploadResult{}, fmt.Errorf("no files were uploaded") } @@ -232,13 +288,16 @@ func (s *UploadService) CreateBox(files []*multipart.FileHeader, opts UploadOpti } deleteToken := randomID(32) box.DeleteTokenHash = deleteTokenHash(box.ID, deleteToken) - if strings.TrimSpace(opts.Password) != "" { + if strings.TrimSpace(opts.PasswordHash) != "" { + box.PasswordSalt = opts.PasswordSalt + box.PasswordHash = opts.PasswordHash + } else if strings.TrimSpace(opts.Password) != "" { salt, hash := hashPassword(opts.Password) box.PasswordSalt = salt box.PasswordHash = hash } - if err := s.writeFilesToBox(&box, files, opts); err != nil { + if err := s.writeIncomingFilesToBox(&box, files, opts); err != nil { return UploadResult{}, err } @@ -261,6 +320,10 @@ func (s *UploadService) CreateBox(files []*multipart.FileHeader, opts UploadOpti // selection into a single box). The box keeps its original expiry, password and // other settings; only the new files are written. func (s *UploadService) AppendFiles(boxID string, files []*multipart.FileHeader, opts UploadOptions) (UploadResult, error) { + return s.AppendIncomingFiles(boxID, multipartIncomingFiles(files), opts) +} + +func (s *UploadService) AppendIncomingFiles(boxID string, files []IncomingFile, opts UploadOptions) (UploadResult, error) { if len(files) == 0 { return UploadResult{}, fmt.Errorf("no files were uploaded") } @@ -268,7 +331,7 @@ func (s *UploadService) AppendFiles(boxID string, files []*multipart.FileHeader, if err != nil { return UploadResult{}, err } - if err := s.writeFilesToBox(&box, files, opts); err != nil { + if err := s.writeIncomingFilesToBox(&box, files, opts); err != nil { return UploadResult{}, err } if err := s.SaveBox(box); err != nil { @@ -289,14 +352,26 @@ func (s *UploadService) AppendFiles(boxID string, files []*multipart.FileHeader, // appends the file metadata to box.Files. The box's StorageBackendID determines // where files land, so it works for both new and existing boxes. func (s *UploadService) writeFilesToBox(box *Box, files []*multipart.FileHeader, opts UploadOptions) error { + return s.writeIncomingFilesToBox(box, multipartIncomingFiles(files), opts) +} + +func multipartIncomingFiles(files []*multipart.FileHeader) []IncomingFile { + incoming := make([]IncomingFile, 0, len(files)) + for _, file := range files { + incoming = append(incoming, multipartIncomingFile{header: file}) + } + return incoming +} + +func (s *UploadService) writeIncomingFilesToBox(box *Box, files []IncomingFile, opts UploadOptions) error { backend, err := s.storage.Backend(box.StorageBackendID) if err != nil { return err } - for _, header := range files { + for _, incoming := range files { if !opts.SkipSizeLimit { - if err := s.ValidateSize(header.Size); err != nil { + if err := s.ValidateSize(incoming.Size()); err != nil { return err } } @@ -306,15 +381,15 @@ func (s *UploadService) writeFilesToBox(box *Box, files []*multipart.FileHeader, maxSize = 0 } - file, err := header.Open() + file, err := incoming.Open() if err != nil { return err } fileID := randomID(8) - storedName := "@each@" + fileID + strings.ToLower(filepath.Ext(header.Filename)) + storedName := "@each@" + fileID + strings.ToLower(filepath.Ext(incoming.Name())) objectKey := boxObjectKey(box.ID, storedName) - contentType := header.Header.Get("Content-Type") + contentType := incoming.ContentType() if contentType == "" { buffer := make([]byte, 512) n, _ := file.Read(buffer) @@ -324,7 +399,7 @@ func (s *UploadService) writeFilesToBox(box *Box, files []*multipart.FileHeader, } } - if err := s.writeUploadedObject(context.Background(), backend, objectKey, file, header.Size, maxSize, contentType); err != nil { + if err := s.writeUploadedObject(context.Background(), backend, objectKey, file, incoming.Size(), maxSize, contentType); err != nil { file.Close() return err } @@ -332,9 +407,9 @@ func (s *UploadService) writeFilesToBox(box *Box, files []*multipart.FileHeader, box.Files = append(box.Files, File{ ID: fileID, - Name: filepath.Base(header.Filename), + Name: filepath.Base(incoming.Name()), StoredName: storedName, - Size: header.Size, + Size: incoming.Size(), ContentType: contentType, PreviewKind: previewKind(contentType), ObjectKey: objectKey, @@ -931,21 +1006,17 @@ func writeUploadedFile(path string, source multipart.File, maxSize int64) error return nil } -func (s *UploadService) writeUploadedObject(ctx context.Context, backend StorageBackend, key string, source multipart.File, size, maxSize int64, contentType string) error { +func (s *UploadService) writeUploadedObject(ctx context.Context, backend StorageBackend, key string, source io.Reader, size, maxSize int64, contentType string) error { var reader io.Reader = source + putSize := size if maxSize > 0 { - reader = io.LimitReader(source, maxSize+1) - var buffer bytes.Buffer - written, err := io.Copy(&buffer, reader) - if err != nil { - return err - } - if written > maxSize { + if size > maxSize { return fmt.Errorf("file exceeds max upload size") } - return backend.Put(ctx, key, bytes.NewReader(buffer.Bytes()), written, contentType) + reader = io.LimitReader(source, maxSize) + putSize = size } - return backend.Put(ctx, key, reader, size, contentType) + return backend.Put(ctx, key, reader, putSize, contentType) } func boxObjectKey(boxID, name string) string { diff --git a/backend/libs/services/upload_test.go b/backend/libs/services/upload_test.go index 4a58fa3..313919c 100644 --- a/backend/libs/services/upload_test.go +++ b/backend/libs/services/upload_test.go @@ -126,6 +126,166 @@ func TestLocalStorageBackendAndLegacyFallback(t *testing.T) { object.Body.Close() } +func TestResumableSessionUploadOutOfOrderAndComplete(t *testing.T) { + service := newTestUploadService(t) + session, err := service.CreateResumableSession([]ResumableFileInput{{ + Name: "note.txt", + Size: 11, + ContentType: "text/plain", + Fingerprint: "sha256:first-chunk", + }}, UploadOptions{MaxDays: 1, Password: "secret"}, 4, time.Hour) + if err != nil { + t.Fatalf("CreateResumableSession returned error: %v", err) + } + if session.Options.Password != "" || session.Options.PasswordHash == "" || session.Options.PasswordSalt == "" { + t.Fatalf("resumable session did not hash password before storage: %+v", session.Options) + } + if session.Files[0].ChunkCount != 3 { + t.Fatalf("ChunkCount = %d, want 3", session.Files[0].ChunkCount) + } + if session.Files[0].Fingerprint != "sha256:first-chunk" { + t.Fatalf("Fingerprint = %q", session.Files[0].Fingerprint) + } + for index, body := range map[int]string{2: "rld", 0: "hell", 1: "o wo"} { + updated, err := service.PutResumableChunk(testContext(), session.ID, session.Files[0].ID, index, strings.NewReader(body)) + if err != nil { + t.Fatalf("PutResumableChunk(%d) returned error: %v", index, err) + } + if len(updated.Files[0].UploadedChunks) == 0 { + t.Fatalf("UploadedChunks was not updated") + } + } + result, completed, err := service.CompleteResumableSession(testContext(), session.ID) + if err != nil { + t.Fatalf("CompleteResumableSession returned error: %v", err) + } + if completed.Status != ResumableStatusCompleted || completed.BoxID != result.BoxID { + t.Fatalf("completed session = %+v, result = %+v", completed, result) + } + box := getTestBox(t, service, result.BoxID) + if box.PasswordHash == "" || box.PasswordSalt == "" || box.PasswordHash != session.Options.PasswordHash { + t.Fatalf("completed box did not preserve hashed password") + } + object, err := service.OpenFileObject(testContext(), box, box.Files[0]) + if err != nil { + t.Fatalf("OpenFileObject returned error: %v", err) + } + data, err := io.ReadAll(object.Body) + object.Body.Close() + if err != nil { + t.Fatalf("ReadAll returned error: %v", err) + } + if string(data) != "hello world" { + t.Fatalf("object body = %q", string(data)) + } + if _, err := os.Stat(service.resumableSessionDir(session.ID)); !os.IsNotExist(err) { + t.Fatalf("resumable temp dir after complete error = %v, want os.ErrNotExist", err) + } +} + +func TestResumableCompleteRejectsMissingChunks(t *testing.T) { + service := newTestUploadService(t) + session, err := service.CreateResumableSession([]ResumableFileInput{{ + Name: "note.txt", + Size: 8, + ContentType: "text/plain", + }}, UploadOptions{MaxDays: 1}, 4, time.Hour) + if err != nil { + t.Fatalf("CreateResumableSession returned error: %v", err) + } + if _, err := service.PutResumableChunk(testContext(), session.ID, session.Files[0].ID, 0, strings.NewReader("hell")); err != nil { + t.Fatalf("PutResumableChunk returned error: %v", err) + } + if _, _, err := service.CompleteResumableSession(testContext(), session.ID); err == nil { + t.Fatalf("CompleteResumableSession accepted missing chunks") + } +} + +func TestResumableSessionCanAddFilesBeforeComplete(t *testing.T) { + service := newTestUploadService(t) + session, err := service.CreateResumableSession([]ResumableFileInput{{ + Name: "one.txt", + Size: 4, + ContentType: "text/plain", + Fingerprint: "one", + }}, UploadOptions{MaxDays: 1}, 4, time.Hour) + if err != nil { + t.Fatalf("CreateResumableSession returned error: %v", err) + } + if _, err := service.PutResumableChunk(testContext(), session.ID, session.Files[0].ID, 0, strings.NewReader("one!")); err != nil { + t.Fatalf("PutResumableChunk one returned error: %v", err) + } + updated, err := service.AddResumableFiles(session.ID, []ResumableFileInput{{ + Name: "two.txt", + Size: 4, + ContentType: "text/plain", + Fingerprint: "two", + }}) + if err != nil { + t.Fatalf("AddResumableFiles returned error: %v", err) + } + if len(updated.Files) != 2 { + t.Fatalf("files after add = %d, want 2", len(updated.Files)) + } + if updated.Files[0].UploadedChunks[0] != 0 { + t.Fatalf("existing uploaded chunk was not preserved: %+v", updated.Files[0]) + } + if _, err := service.AddResumableFiles(session.ID, []ResumableFileInput{{ + Name: "two.txt", + Size: 4, + ContentType: "text/plain", + Fingerprint: "two", + }}); err != nil { + t.Fatalf("duplicate AddResumableFiles returned error: %v", err) + } + updated, err = service.GetResumableSession(session.ID) + if err != nil { + t.Fatalf("GetResumableSession returned error: %v", err) + } + if len(updated.Files) != 2 { + t.Fatalf("duplicate add changed file count to %d", len(updated.Files)) + } + if _, err := service.PutResumableChunk(testContext(), session.ID, updated.Files[1].ID, 0, strings.NewReader("two!")); err != nil { + t.Fatalf("PutResumableChunk two returned error: %v", err) + } + result, _, err := service.CompleteResumableSession(testContext(), session.ID) + if err != nil { + t.Fatalf("CompleteResumableSession returned error: %v", err) + } + box := getTestBox(t, service, result.BoxID) + if len(box.Files) != 2 { + t.Fatalf("completed box file count = %d, want 2", len(box.Files)) + } +} + +func TestResumableCleanupRemovesExpiredSessionsAndChunks(t *testing.T) { + service := newTestUploadService(t) + session, err := service.CreateResumableSession([]ResumableFileInput{{ + Name: "note.txt", + Size: 4, + ContentType: "text/plain", + }}, UploadOptions{MaxDays: 1}, 4, time.Millisecond) + if err != nil { + t.Fatalf("CreateResumableSession returned error: %v", err) + } + if _, err := service.PutResumableChunk(testContext(), session.ID, session.Files[0].ID, 0, strings.NewReader("hell")); err != nil { + t.Fatalf("PutResumableChunk returned error: %v", err) + } + cleaned, err := service.CleanupExpiredResumableSessions(time.Now().UTC().Add(time.Hour)) + if err != nil { + t.Fatalf("CleanupExpiredResumableSessions returned error: %v", err) + } + if cleaned != 1 { + t.Fatalf("cleaned = %d, want 1", cleaned) + } + if _, err := service.GetResumableSession(session.ID); !os.IsNotExist(err) { + t.Fatalf("GetResumableSession after cleanup error = %v, want os.ErrNotExist", err) + } + if _, err := os.Stat(service.resumableSessionDir(session.ID)); !os.IsNotExist(err) { + t.Fatalf("resumable temp dir after cleanup error = %v, want os.ErrNotExist", err) + } +} + func TestContaboStorageConfigAllowsDisplayNamesWithSpaces(t *testing.T) { service := newTestUploadService(t) cfg, err := service.Storage().CreateS3Backend(StorageBackendConfig{ diff --git a/backend/static/css/20-upload.css b/backend/static/css/20-upload.css index e3873d4..d2145d6 100644 --- a/backend/static/css/20-upload.css +++ b/backend/static/css/20-upload.css @@ -335,10 +335,13 @@ button { .file-progress-side { width: min(10rem, 32vw); display: grid; + grid-template-columns: minmax(0, 1fr) auto; gap: 0.35rem; + align-items: center; } .file-progress-percent { + grid-column: 1 / -1; color: var(--muted-foreground); font-size: 0.75rem; text-align: right; @@ -349,6 +352,25 @@ button { margin-top: 0; } +.upload-file-remove { + width: 1.65rem; + height: 1.65rem; + min-height: 1.65rem; + padding: 0; + border-color: var(--border); + border-radius: 999px; + color: var(--muted-foreground); + background: var(--surface-1); + font-size: 1rem; + line-height: 1; +} + +.upload-file-remove:hover { + color: var(--foreground); + border-color: var(--primary); + background: var(--surface-1-hover); +} + .result-item small, .download-item small, .result-item code, diff --git a/backend/static/js/40-upload.js b/backend/static/js/40-upload.js index 2bba240..e48643c 100644 --- a/backend/static/js/40-upload.js +++ b/backend/static/js/40-upload.js @@ -13,6 +13,7 @@ const copyURL = document.querySelector("#copy-url"); const openBox = document.querySelector("#open-box"); const manageLink = document.querySelector("#manage-link"); + const RESUMABLE_SESSIONS_KEY = "warpbox-resumable-sessions"; if (!form || !dropZone || !fileInput) { return; @@ -42,6 +43,7 @@ let latestBoxURL = ""; let selectedFiles = []; + let uploadLocked = false; ["dragenter", "dragover"].forEach((eventName) => { dropZone.addEventListener(eventName, (event) => { @@ -57,33 +59,52 @@ }); }); - dropZone.addEventListener("drop", (event) => { - if (event.dataTransfer && event.dataTransfer.files.length > 0) { - fileInput.files = event.dataTransfer.files; - updateSelectedState(event.dataTransfer.files); + document.addEventListener("dragover", (event) => { + if (event.dataTransfer && Array.from(event.dataTransfer.types || []).includes("Files")) { + event.preventDefault(); } }); - fileInput.addEventListener("change", () => updateSelectedState(fileInput.files)); + document.addEventListener("drop", (event) => { + if (!event.dataTransfer || !event.dataTransfer.files.length) { + return; + } + event.preventDefault(); + if (!dropZone.contains(event.target)) { + addSelectedFiles(event.dataTransfer.files); + } + }); + + dropZone.addEventListener("drop", (event) => { + if (event.dataTransfer && event.dataTransfer.files.length > 0) { + addSelectedFiles(event.dataTransfer.files); + } + }); + + fileInput.addEventListener("change", () => { + addSelectedFiles(fileInput.files); + fileInput.value = ""; + }); form.addEventListener("submit", async (event) => { event.preventDefault(); - if (!fileInput.files || fileInput.files.length === 0) { + if (selectedFiles.length === 0) { updateStatus("Choose at least one file first."); return; } const submit = form.querySelector("button[type='submit']"); - const formData = new FormData(form); - selectedFiles = Array.from(fileInput.files); + const formData = uploadFormData(); renderQueue(selectedFiles, "queued"); setLoading(true, submit); try { - const payload = await uploadWithProgress(form.action, formData, selectedFiles); + const payload = await uploadResumable(form.action, formData, selectedFiles); renderResult(payload); form.reset(); - updateSelectedState([]); + selectedFiles = []; + fileInput.value = ""; + updateSelectedState(); } catch (error) { updateStatus(error.message || "Upload failed"); } finally { @@ -97,8 +118,27 @@ }); } - function updateSelectedState(files) { - selectedFiles = Array.from(files || []); + function addSelectedFiles(files) { + if (uploadLocked) { + return; + } + Array.from(files || []).forEach((file) => { + if (!selectedFiles.some((existing) => fileIdentity(existing) === fileIdentity(file))) { + selectedFiles.push(file); + } + }); + updateSelectedState(); + } + + function removeSelectedFile(index) { + if (uploadLocked) { + return; + } + selectedFiles.splice(index, 1); + updateSelectedState(); + } + + function updateSelectedState() { const count = selectedFiles.length || 0; const title = dropZone.querySelector(".drop-title"); if (title) { @@ -116,6 +156,7 @@ } function setLoading(isLoading, submit) { + uploadLocked = isLoading; if (progress) { progress.hidden = !isLoading; } @@ -201,18 +242,294 @@ }); } + async function uploadResumable(fallbackUrl, formData, files) { + if (!window.fetch || typeof Blob === "undefined") { + return uploadWithProgress(fallbackUrl, formData, files); + } + + updateStatus("Fingerprinting files..."); + const fingerprints = await Promise.all(files.map((file) => fileFingerprint(file))); + const createPayload = { + files: files.map((file, index) => ({ + name: file.name, + size: file.size, + contentType: file.type || "application/octet-stream", + fingerprint: fingerprints[index], + })), + expiresMinutes: parseInt(formData.get("expires_minutes") || "0", 10) || 0, + maxDownloads: parseInt(formData.get("max_downloads") || "0", 10) || 0, + password: formData.get("password") || "", + obfuscateMetadata: formData.get("obfuscate_metadata") === "on", + collectionId: formData.get("collection_id") || "", + }; + const persistable = !createPayload.password; + let session = persistable ? await findResumableSession(createPayload) : null; + if (session) { + session = await addMissingResumableFiles(session, createPayload); + } + if (!session || session.status !== "uploading") { + try { + session = await createResumableSession(createPayload); + } catch (error) { + if ((error.message || "").toLowerCase().includes("resumable uploads are disabled")) { + return uploadWithProgress(fallbackUrl, formData, files); + } + throw error; + } + if (persistable) { + saveResumableSession(session, createPayload); + } + } + const sessionFiles = files.map((file, index) => matchSessionFile(session, createPayload.files[index])); + if (sessionFiles.some((file) => !file)) { + throw new Error("Upload session could not match the selected files"); + } + + updateStatus("Uploading..."); + const totalBytes = files.reduce((sum, file) => sum + file.size, 0); + const completedByFile = new Array(files.length).fill(0); + sessionFiles.forEach((sessionFile, index) => { + completedByFile[index] = uploadedBytesForSessionFile(sessionFile, session.chunkSize); + setSingleFileProgress(index, files[index], percentForBytes(completedByFile[index], files[index].size)); + }); + setTotalProgress(percentForBytes(completedByFile.reduce((sum, bytes) => sum + bytes, 0), totalBytes)); + + for (let fileIndex = 0; fileIndex < files.length; fileIndex++) { + const file = files[fileIndex]; + const sessionFile = sessionFiles[fileIndex]; + const uploaded = new Set(sessionFile.uploadedChunks || []); + for (let chunkIndex = 0; chunkIndex < sessionFile.chunkCount; chunkIndex++) { + if (uploaded.has(chunkIndex)) { + continue; + } + const start = chunkIndex * session.chunkSize; + const end = Math.min(file.size, start + session.chunkSize); + await uploadChunk(session.sessionId, sessionFile.id, chunkIndex, file.slice(start, end), (loaded) => { + const currentTotal = completedByFile.reduce((sum, bytes) => sum + bytes, 0) + loaded; + setTotalProgress(percentForBytes(currentTotal, totalBytes)); + setSingleFileProgress(fileIndex, file, percentForBytes(completedByFile[fileIndex] + loaded, file.size)); + updateStatus(`${percentForBytes(currentTotal, totalBytes)}%`); + }); + completedByFile[fileIndex] += end - start; + uploaded.add(chunkIndex); + sessionFile.uploadedChunks = Array.from(uploaded).sort((a, b) => a - b); + if (persistable) { + saveResumableSession(session, createPayload); + } + } + setSingleFileProgress(fileIndex, file, 100); + } + + const resultPayload = await completeResumableSession(session.sessionId); + if (persistable) { + removeResumableSession(session.sessionId); + } + setTotalProgress(100); + setFileProgress(files, 100); + return resultPayload; + } + + async function createResumableSession(payload) { + const response = await fetch("/api/v1/uploads/resumable", { + method: "POST", + headers: { + "Accept": "application/json", + "Content-Type": "application/json", + }, + body: JSON.stringify(payload), + }); + return readUploadJSON(response, "Upload session could not be created"); + } + + async function fetchResumableStatus(sessionID) { + const response = await fetch(`/api/v1/uploads/resumable/${encodeURIComponent(sessionID)}`, { + headers: { "Accept": "application/json" }, + }); + return readUploadJSON(response, "Upload session could not be resumed"); + } + + async function addResumableFiles(sessionID, files) { + const response = await fetch(`/api/v1/uploads/resumable/${encodeURIComponent(sessionID)}/files`, { + method: "POST", + headers: { + "Accept": "application/json", + "Content-Type": "application/json", + }, + body: JSON.stringify({ files }), + }); + return readUploadJSON(response, "Upload session files could not be added"); + } + + function uploadChunk(sessionID, fileID, chunkIndex, chunk, onProgress) { + return new Promise((resolve, reject) => { + const request = new XMLHttpRequest(); + request.open("PUT", `/api/v1/uploads/resumable/${encodeURIComponent(sessionID)}/files/${encodeURIComponent(fileID)}/chunks/${chunkIndex}`); + request.setRequestHeader("Accept", "application/json"); + request.upload.addEventListener("progress", (event) => { + if (event.lengthComputable && onProgress) { + onProgress(event.loaded); + } + }); + request.addEventListener("load", () => { + if (request.status < 200 || request.status >= 300) { + let payload = {}; + try { + payload = JSON.parse(request.responseText || "{}"); + } catch (error) { + payload = {}; + } + reject(new Error(payload.error || "Chunk upload failed")); + return; + } + resolve(); + }); + request.addEventListener("error", () => reject(new Error("Network error during chunk upload"))); + request.addEventListener("abort", () => reject(new Error("Chunk upload aborted"))); + request.send(chunk); + }); + } + + async function completeResumableSession(sessionID) { + const response = await fetch(`/api/v1/uploads/resumable/${encodeURIComponent(sessionID)}/complete`, { + method: "POST", + headers: { "Accept": "application/json" }, + }); + return readUploadJSON(response, "Upload could not be completed"); + } + + async function readUploadJSON(response, fallback) { + let payload = {}; + try { + payload = await response.json(); + } catch (error) { + payload = {}; + } + if (!response.ok) { + throw new Error(payload.error || fallback); + } + return payload; + } + + async function findResumableSession(payload) { + const records = loadResumableSessions(); + const optionKey = resumableOptionKey(payload); + const selectedKeys = new Set(payload.files.map((file) => resumableFileKey(file))); + for (const record of records) { + if (record.optionKey !== optionKey) { + continue; + } + if (!record.files || !record.files.some((file) => selectedKeys.has(resumableFileKey(file)))) { + continue; + } + const session = await fetchResumableStatus(record.sessionId).catch(() => null); + if (!session || session.status !== "uploading") { + removeResumableSession(record.sessionId); + continue; + } + const sessionKeys = new Set(session.files.map((file) => resumableFileKey(file))); + const sessionHasOnlySelectedFiles = session.files.every((file) => selectedKeys.has(resumableFileKey(file))); + const selectedContainsSessionFile = Array.from(sessionKeys).some((key) => selectedKeys.has(key)); + if (sessionHasOnlySelectedFiles && selectedContainsSessionFile) { + return session; + } + } + return null; + } + + async function addMissingResumableFiles(session, payload) { + const existing = new Set(session.files.map((file) => resumableFileKey(file))); + const missing = payload.files.filter((file) => !existing.has(resumableFileKey(file))); + if (missing.length === 0) { + return session; + } + return addResumableFiles(session.sessionId, missing); + } + + function matchSessionFile(session, file) { + const key = resumableFileKey(file); + return session.files.find((sessionFile) => resumableFileKey(sessionFile) === key) || null; + } + + function resumableOptionKey(payload) { + return [ + payload.expiresMinutes, + payload.maxDownloads, + payload.obfuscateMetadata ? "1" : "0", + payload.collectionId || "", + ].join(":"); + } + + function resumableFileKey(file) { + return [file.name, file.size, file.fingerprint || ""].join(":"); + } + + function loadResumableSessions() { + try { + const value = localStorage.getItem(RESUMABLE_SESSIONS_KEY); + const records = value ? JSON.parse(value) : []; + return Array.isArray(records) ? records : []; + } catch (error) { + return []; + } + } + + function saveResumableSession(session, payload) { + try { + const records = loadResumableSessions().filter((record) => record.sessionId !== session.sessionId); + records.push({ + sessionId: session.sessionId, + optionKey: resumableOptionKey(payload), + files: session.files.map((file) => ({ + name: file.name, + size: file.size, + fingerprint: file.fingerprint || "", + })), + updatedAt: new Date().toISOString(), + }); + localStorage.setItem(RESUMABLE_SESSIONS_KEY, JSON.stringify(records.slice(-25))); + } catch (error) { + /* ignore persistence failures */ + } + } + + function removeResumableSession(sessionID) { + try { + const records = loadResumableSessions().filter((record) => record.sessionId !== sessionID); + localStorage.setItem(RESUMABLE_SESSIONS_KEY, JSON.stringify(records)); + } catch (error) { + /* ignore persistence failures */ + } + } + + function uploadedBytesForSessionFile(file, chunkSize) { + return (file.uploadedChunks || []).reduce((sum, index) => { + const start = index * chunkSize; + const end = Math.min(file.size, start + chunkSize); + return sum + Math.max(0, end - start); + }, 0); + } + + function percentForBytes(bytes, total) { + if (!total) { + return 100; + } + return Math.max(0, Math.min(100, Math.round((bytes / total) * 100))); + } + function renderQueue(files, status) { if (!uploadQueue) { return; } uploadQueue.hidden = files.length === 0; uploadQueue.replaceChildren(); - files.forEach((file) => { + files.forEach((file, index) => { uploadQueue.append(createFileRow({ name: file.name, meta: window.Warpbox.formatBytes(file.size), progress: status === "queued" ? 0 : 100, status, + index, + removable: status === "queued", })); }); } @@ -221,6 +538,7 @@ const row = document.createElement("div"); row.className = "result-item upload-file-row"; row.dataset.fileName = file.name; + row.dataset.fileIndex = file.index || 0; const body = document.createElement("span"); const name = document.createElement("strong"); @@ -242,11 +560,47 @@ fill.style.transform = `scaleX(${file.progress / 100})`; bar.append(fill); side.append(percent, bar); + if (file.removable) { + const remove = document.createElement("button"); + remove.className = "upload-file-remove"; + remove.type = "button"; + remove.setAttribute("aria-label", `Remove ${file.name}`); + remove.textContent = "×"; + remove.addEventListener("click", () => removeSelectedFile(file.index || 0)); + side.append(remove); + } row.append(body, side); return row; } + function uploadFormData() { + const formData = new FormData(form); + formData.delete("file"); + selectedFiles.forEach((file) => { + formData.append("file", file, file.name); + }); + return formData; + } + + function fileIdentity(file) { + return [file.name, file.size, file.lastModified || 0].join(":"); + } + + async function fileFingerprint(file) { + if (!window.crypto || !window.crypto.subtle || !file.slice || typeof TextEncoder === "undefined") { + return fileIdentity(file); + } + const sampleSize = Math.min(file.size, 1024 * 1024); + const sample = await file.slice(0, sampleSize).arrayBuffer(); + const metadata = new TextEncoder().encode([file.name, file.size, file.lastModified || 0, sampleSize].join(":")); + const combined = new Uint8Array(metadata.byteLength + sample.byteLength); + combined.set(metadata, 0); + combined.set(new Uint8Array(sample), metadata.byteLength); + const digest = await window.crypto.subtle.digest("SHA-256", combined); + return Array.from(new Uint8Array(digest)).map((byte) => byte.toString(16).padStart(2, "0")).join(""); + } + function setTotalProgress(percent) { if (totalProgressBar) { totalProgressBar.style.transform = `scaleX(${Math.max(0, Math.min(100, percent)) / 100})`; @@ -271,4 +625,23 @@ } }); } + + function setSingleFileProgress(index, file, progress) { + if (!uploadQueue) { + return; + } + const row = uploadQueue.querySelector(`.upload-file-row[data-file-index="${index}"]`); + if (!row) { + return; + } + const percent = row.querySelector(".file-progress-percent"); + const fill = row.querySelector(".file-progress span"); + const normalized = Math.max(0, Math.min(100, progress)); + if (percent) { + percent.textContent = `${normalized}%`; + } + if (fill) { + fill.style.transform = `scaleX(${normalized / 100})`; + } + } })(); diff --git a/backend/templates/pages/api.html b/backend/templates/pages/api.html index 2de754c..a3c94b4 100644 --- a/backend/templates/pages/api.html +++ b/backend/templates/pages/api.html @@ -14,6 +14,10 @@

Endpoints

Upload
POST /api/v1/upload
+
Resumable create
POST /api/v1/uploads/resumable
+
Resumable status
GET /api/v1/uploads/resumable/{sessionID}
+
Resumable chunk
PUT /api/v1/uploads/resumable/{sessionID}/files/{fileID}/chunks/{index}
+
Resumable complete
POST /api/v1/uploads/resumable/{sessionID}/complete
Health
GET /health
Request schema
/api/v1/schemas/upload-request.json
Response schema
/api/v1/schemas/upload-response.json
@@ -21,6 +25,28 @@ +
+
+

Resumable uploads

+

Browser uploads use the resumable API by default. Custom clients can use the same flow: create a session with file metadata, upload exact-sized chunks, then complete the session. Chunks are temporary and are cleaned if the session expires.

+
# 1. Create a session.
+curl -s {{.Data.BaseURL}}/api/v1/uploads/resumable \
+  -H 'Accept: application/json' \
+  -H 'Content-Type: application/json' \
+  -d '{"files":[{"name":"report.pdf","size":1048576,"contentType":"application/pdf"}],"expiresMinutes":1440}'
+
+# 2. Upload each chunk using the returned sessionId, file id, and chunkSize.
+dd if=./report.pdf bs=8388608 count=1 skip=0 2>/dev/null | \
+  curl -X PUT --data-binary @- \
+  {{.Data.BaseURL}}/api/v1/uploads/resumable/SESSION_ID/files/FILE_ID/chunks/0
+
+# 3. Complete after all chunks are present. The response is the normal upload JSON.
+curl -X POST -H 'Accept: application/json' \
+  {{.Data.BaseURL}}/api/v1/uploads/resumable/SESSION_ID/complete
+

For authenticated uploads, send the same Authorization: Bearer <token> header on every resumable request. Incomplete chunks are stored under data/tmp/uploads before finalizing into the selected storage backend.

+
+
+

Curl upload

diff --git a/scripts/env/dev.env.example b/scripts/env/dev.env.example index b33b71a..98ca4f7 100644 --- a/scripts/env/dev.env.example +++ b/scripts/env/dev.env.example @@ -9,6 +9,9 @@ WARPBOX_CLEANUP_ENABLED=true WARPBOX_CLEANUP_EVERY=1h WARPBOX_THUMBNAIL_ENABLED=true WARPBOX_THUMBNAIL_EVERY=1m +WARPBOX_RESUMABLE_UPLOADS_ENABLED=true +WARPBOX_RESUMABLE_CHUNK_MB=16 +WARPBOX_RESUMABLE_RETENTION_HOURS=1 WARPBOX_MAX_UPLOAD_SIZE_MB=16384 WARPBOX_ANONYMOUS_UPLOADS_ENABLED=true WARPBOX_ANONYMOUS_MAX_UPLOAD_MB=512