package services import ( "context" "crypto/sha256" "crypto/subtle" "encoding/hex" "encoding/json" "fmt" "io" "os" "path/filepath" "sort" "strings" "time" "go.etcd.io/bbolt" ) var resumableUploadsBucket = []byte("resumable_uploads") const ( ResumableStatusUploading = "uploading" ResumableStatusProcessing = "processing" ResumableStatusCompleted = "completed" ResumableStatusCancelled = "cancelled" ) type ResumableFileInput struct { Name string `json:"name"` Size int64 `json:"size"` ContentType string `json:"contentType"` Fingerprint string `json:"fingerprint,omitempty"` } type ResumableSession struct { ID string `json:"id"` Options UploadOptions `json:"options"` Files []ResumableFile `json:"files"` ChunkSize int64 `json:"chunkSize"` Status string `json:"status"` BoxID string `json:"boxId,omitempty"` ResumeTokenHash string `json:"resumeTokenHash,omitempty"` ResumeToken string `json:"-"` ChunkRoot string `json:"chunkRoot,omitempty"` CreatedAt time.Time `json:"createdAt"` UpdatedAt time.Time `json:"updatedAt"` ExpiresAt time.Time `json:"expiresAt"` } type ResumableFile struct { ID string `json:"id"` Name string `json:"name"` Size int64 `json:"size"` ContentType string `json:"contentType"` Fingerprint string `json:"fingerprint,omitempty"` ChunkCount int `json:"chunkCount"` UploadedChunks []int `json:"uploadedChunks"` } func (s *UploadService) ensureResumableBucket() error { return s.db.Update(func(tx *bbolt.Tx) error { _, err := tx.CreateBucketIfNotExists(resumableUploadsBucket) return err }) } func (s *UploadService) CreateResumableSession(files []ResumableFileInput, opts UploadOptions, chunkSize int64, retention time.Duration, chunkRoot string) (ResumableSession, error) { if len(files) == 0 { return ResumableSession{}, fmt.Errorf("no files were uploaded") } if chunkSize <= 0 { return ResumableSession{}, fmt.Errorf("chunk size must be positive") } if retention <= 0 { return ResumableSession{}, fmt.Errorf("retention must be positive") } if strings.TrimSpace(opts.Password) != "" { opts.PasswordSalt, opts.PasswordHash = hashPassword(opts.Password) opts.Password = "" } sessionFiles, err := s.resumableFilesFromInput(files, opts, chunkSize, nil) if err != nil { return ResumableSession{}, err } now := time.Now().UTC() resumeToken := randomID(32) sessionID := randomID(12) session := ResumableSession{ ID: sessionID, Options: opts, Files: sessionFiles, ChunkSize: chunkSize, Status: ResumableStatusUploading, ResumeTokenHash: resumableTokenHash(sessionID, resumeToken), ResumeToken: resumeToken, ChunkRoot: strings.TrimSpace(chunkRoot), CreatedAt: now, UpdatedAt: now, ExpiresAt: now.Add(retention), } if err := s.saveResumableSession(session); err != nil { return ResumableSession{}, err } return session, nil } func (s *UploadService) VerifyResumableToken(session ResumableSession, token string) bool { if session.ResumeTokenHash == "" || strings.TrimSpace(token) == "" { return false } hash := resumableTokenHash(session.ID, token) return subtle.ConstantTimeCompare([]byte(hash), []byte(session.ResumeTokenHash)) == 1 } func (s *UploadService) AddResumableFiles(sessionID string, files []ResumableFileInput) (ResumableSession, error) { if len(files) == 0 { return s.GetResumableSession(sessionID) } session, err := s.GetResumableSession(sessionID) if err != nil { return ResumableSession{}, err } if err := resumableSessionWritable(session); err != nil { return ResumableSession{}, err } existing := make(map[string]bool) for _, file := range session.Files { existing[resumableFileKey(file.Name, file.Size, file.Fingerprint)] = true } newFiles, err := s.resumableFilesFromInput(files, session.Options, session.ChunkSize, existing) if err != nil { return ResumableSession{}, err } if len(newFiles) == 0 { return session, nil } session.Files = append(session.Files, newFiles...) session.UpdatedAt = time.Now().UTC() if err := s.saveResumableSession(session); err != nil { return ResumableSession{}, err } return session, nil } func (s *UploadService) GetResumableSession(id string) (ResumableSession, error) { var session ResumableSession err := s.db.View(func(tx *bbolt.Tx) error { bucket := tx.Bucket(resumableUploadsBucket) if bucket == nil { return os.ErrNotExist } data := bucket.Get([]byte(id)) if data == nil { return os.ErrNotExist } return json.Unmarshal(data, &session) }) if err != nil { return ResumableSession{}, err } return session, nil } func (s *UploadService) PutResumableChunk(ctx context.Context, sessionID, fileID string, index int, body io.Reader) (ResumableSession, error) { session, err := s.GetResumableSession(sessionID) if err != nil { return ResumableSession{}, err } if err := resumableSessionWritable(session); err != nil { return ResumableSession{}, err } fileIndex := -1 for i, file := range session.Files { if file.ID == fileID { fileIndex = i break } } if fileIndex < 0 { return ResumableSession{}, os.ErrNotExist } file := session.Files[fileIndex] if index < 0 || index >= file.ChunkCount { return ResumableSession{}, fmt.Errorf("chunk index is invalid") } expectedSize := expectedChunkSize(file.Size, session.ChunkSize, index) chunkDir := s.resumableFileDirFor(session, file.ID) if err := os.MkdirAll(chunkDir, 0o755); err != nil { return ResumableSession{}, err } chunkPath := s.resumableChunkPathFor(session, file.ID, index) tempPath := chunkPath + ".tmp" target, err := os.OpenFile(tempPath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600) if err != nil { return ResumableSession{}, err } written, copyErr := io.Copy(target, io.LimitReader(body, expectedSize+1)) closeErr := target.Close() if copyErr != nil { _ = os.Remove(tempPath) return ResumableSession{}, copyErr } if closeErr != nil { _ = os.Remove(tempPath) return ResumableSession{}, closeErr } if written != expectedSize { _ = os.Remove(tempPath) return ResumableSession{}, fmt.Errorf("chunk size mismatch") } if err := os.Rename(tempPath, chunkPath); err != nil { _ = os.Remove(tempPath) return ResumableSession{}, err } session.Files[fileIndex].UploadedChunks = addChunkIndex(session.Files[fileIndex].UploadedChunks, index) session.UpdatedAt = time.Now().UTC() if err := s.saveResumableSession(session); err != nil { return ResumableSession{}, err } return session, nil } func (s *UploadService) CompleteResumableSession(ctx context.Context, sessionID string) (UploadResult, ResumableSession, error) { session, err := s.GetResumableSession(sessionID) if err != nil { return UploadResult{}, ResumableSession{}, err } if (session.Status == ResumableStatusCompleted || session.Status == ResumableStatusProcessing) && session.BoxID != "" { box, err := s.GetBox(session.BoxID) if err != nil { return UploadResult{}, ResumableSession{}, err } return s.resultForBox(box, ""), session, nil } if err := resumableSessionWritable(session); err != nil { return UploadResult{}, ResumableSession{}, err } staged, err := s.resumableIncomingFiles(session) if err != nil { return UploadResult{}, ResumableSession{}, err } result, err := s.CreateBoxFromIncomingContext(ctx, staged, session.Options) if err != nil { return UploadResult{}, ResumableSession{}, err } if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil { return UploadResult{}, ResumableSession{}, err } session.Status = ResumableStatusCompleted session.BoxID = result.BoxID session.UpdatedAt = time.Now().UTC() if err := s.saveResumableSession(session); err != nil { return UploadResult{}, ResumableSession{}, err } return result, session, nil } func (s *UploadService) CreateProcessingBoxFromResumable(sessionID string) (UploadResult, ResumableSession, error) { session, err := s.GetResumableSession(sessionID) if err != nil { return UploadResult{}, ResumableSession{}, err } if (session.Status == ResumableStatusCompleted || session.Status == ResumableStatusProcessing) && session.BoxID != "" { box, err := s.GetBox(session.BoxID) if err != nil { return UploadResult{}, ResumableSession{}, err } return s.resultForBox(box, ""), session, nil } if err := resumableSessionWritable(session); err != nil { return UploadResult{}, ResumableSession{}, err } if _, err := s.resumableIncomingFiles(session); err != nil { return UploadResult{}, ResumableSession{}, err } now := time.Now().UTC() expiresAt := now.AddDate(0, 0, 7) if session.Options.ExpiresInMinutes < 0 || session.Options.MaxDays < 0 { expiresAt = now.AddDate(100, 0, 0) } else if session.Options.ExpiresInMinutes > 0 { expiresAt = now.Add(time.Duration(session.Options.ExpiresInMinutes) * time.Minute) } else if session.Options.MaxDays > 0 { expiresAt = now.Add(time.Duration(session.Options.MaxDays) * 24 * time.Hour) } box := Box{ ID: randomID(10), OwnerID: strings.TrimSpace(session.Options.OwnerID), CollectionID: strings.TrimSpace(session.Options.CollectionID), CreatorIP: strings.TrimSpace(session.Options.CreatorIP), StorageBackendID: normalizeBackendID(session.Options.StorageBackendID), CreatedAt: now, ExpiresAt: expiresAt, MaxDownloads: session.Options.MaxDownloads, Obfuscate: session.Options.ObfuscateMetadata && (strings.TrimSpace(session.Options.Password) != "" || strings.TrimSpace(session.Options.PasswordHash) != ""), Files: make([]File, 0, len(session.Files)), } deleteToken := randomID(32) box.DeleteTokenHash = deleteTokenHash(box.ID, deleteToken) if strings.TrimSpace(session.Options.PasswordHash) != "" { box.PasswordSalt = session.Options.PasswordSalt box.PasswordHash = session.Options.PasswordHash } else if strings.TrimSpace(session.Options.Password) != "" { salt, hash := hashPassword(session.Options.Password) box.PasswordSalt = salt box.PasswordHash = hash } for _, incoming := range session.Files { fileID := randomID(8) storedName := "@each@" + fileID + strings.ToLower(filepath.Ext(incoming.Name)) objectKey := boxObjectKey(box.ID, storedName) contentType := incoming.ContentType if contentType == "" { contentType = "application/octet-stream" } box.Files = append(box.Files, File{ ID: fileID, Name: cleanUploadDisplayName(incoming.Name), StoredName: storedName, Size: incoming.Size, ContentType: contentType, PreviewKind: previewKind(contentType), ObjectKey: objectKey, Processing: true, UploadedAt: now, }) } if err := s.saveBoxRecord(box); err != nil { return UploadResult{}, ResumableSession{}, err } session.Status = ResumableStatusProcessing session.BoxID = box.ID session.UpdatedAt = time.Now().UTC() if err := s.saveResumableSession(session); err != nil { return UploadResult{}, ResumableSession{}, err } return s.resultForBox(box, deleteToken), session, nil } func (s *UploadService) FinalizeProcessingResumableSession(ctx context.Context, sessionID string) (UploadResult, error) { session, err := s.GetResumableSession(sessionID) if err != nil { return UploadResult{}, err } if session.Status == ResumableStatusCompleted && session.BoxID != "" { box, err := s.GetBox(session.BoxID) if err != nil { return UploadResult{}, err } return s.resultForBox(box, ""), nil } if session.Status != ResumableStatusProcessing || session.BoxID == "" { return UploadResult{}, fmt.Errorf("upload session is not processing") } box, err := s.GetBox(session.BoxID) if err != nil { return UploadResult{}, err } staged, err := s.resumableIncomingFiles(session) if err != nil { return UploadResult{}, err } if len(staged) != len(box.Files) { return UploadResult{}, fmt.Errorf("processing file count mismatch") } backend, err := s.storage.Backend(box.StorageBackendID) if err != nil { _ = s.markProcessingBoxFailed(box, err) return UploadResult{}, err } for i, incoming := range staged { source, err := incoming.Open() if err != nil { _ = s.markProcessingBoxFailed(box, err) return UploadResult{}, err } file := box.Files[i] if err := s.writeUploadedObject(ctx, backend, file.ObjectKey, source, incoming.Size(), 0, incoming.ContentType()); err != nil { source.Close() _ = backend.Delete(context.Background(), file.ObjectKey) _ = s.markProcessingBoxFailed(box, err) return UploadResult{}, err } source.Close() box.Files[i].Processing = false box.Files[i].ProcessingError = "" box.Files[i].UploadedAt = time.Now().UTC() if err := s.saveBoxRecord(box); err != nil { return UploadResult{}, err } } if err := s.writeBoxMetadata(box); err != nil { s.logger.Warn("box metadata write failed after resumable processing", "source", "storage", "severity", "warn", "code", 4020, "box_id", box.ID, "error", err.Error()) } if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil { return UploadResult{}, err } session.Status = ResumableStatusCompleted session.UpdatedAt = time.Now().UTC() if err := s.saveResumableSession(session); err != nil { return UploadResult{}, err } return s.resultForBox(box, ""), nil } func (s *UploadService) markProcessingBoxFailed(box Box, cause error) error { message := "upload processing failed" if cause != nil && strings.TrimSpace(cause.Error()) != "" { message = cause.Error() } s.logger.Warn("resumable upload box marked failed", "source", "user-upload", "severity", "warn", "code", 4021, "box_id", box.ID, "backend_id", s.BoxStorageBackendID(box), "files", len(box.Files), "error", message) now := time.Now().UTC() box.Trouble = true box.TroubleReason = message for i := range box.Files { if box.Files[i].Processing || box.Files[i].ProcessingError == "" { box.Files[i].Processing = false box.Files[i].ProcessingError = message if box.Files[i].UploadedAt.IsZero() { box.Files[i].UploadedAt = now } } } if err := s.saveBoxRecord(box); err != nil { s.logger.Warn("failed to save failed upload box state", "source", "user-upload", "severity", "warn", "code", 4022, "box_id", box.ID, "backend_id", s.BoxStorageBackendID(box), "error", err.Error()) return err } if err := s.writeBoxMetadata(box); err != nil { s.logger.Warn("failed to write failed upload box metadata", "source", "user-upload", "severity", "warn", "code", 4023, "box_id", box.ID, "backend_id", s.BoxStorageBackendID(box), "error", err.Error()) return err } return nil } func (s *UploadService) CompleteUploadedResumableSession(ctx context.Context, sessionID string) (UploadResult, ResumableSession, error) { session, err := s.GetResumableSession(sessionID) if err != nil { return UploadResult{}, ResumableSession{}, err } if err := resumableSessionWritable(session); err != nil { return UploadResult{}, ResumableSession{}, err } completeFiles := make([]ResumableFile, 0, len(session.Files)) for _, file := range session.Files { if resumableFileComplete(file) { completeFiles = append(completeFiles, file) } } if len(completeFiles) == 0 { return UploadResult{}, ResumableSession{}, fmt.Errorf("no fully uploaded files to finish") } partial := session partial.Files = completeFiles staged, err := s.resumableIncomingFiles(partial) if err != nil { return UploadResult{}, ResumableSession{}, err } result, err := s.CreateBoxFromIncomingContext(ctx, staged, session.Options) if err != nil { return UploadResult{}, ResumableSession{}, err } if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil { return UploadResult{}, ResumableSession{}, err } session.Status = ResumableStatusCompleted session.BoxID = result.BoxID session.Files = completeFiles session.UpdatedAt = time.Now().UTC() if err := s.deleteResumableSession(session.ID); err != nil { return UploadResult{}, ResumableSession{}, err } return result, session, nil } func (s *UploadService) CancelResumableSession(sessionID string) error { session, err := s.GetResumableSession(sessionID) if err != nil { return err } if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil { return err } return s.deleteResumableSession(session.ID) } func (s *UploadService) CleanupExpiredResumableSessions(now time.Time) (int, error) { candidates := make([]ResumableSession, 0) err := s.db.View(func(tx *bbolt.Tx) error { bucket := tx.Bucket(resumableUploadsBucket) if bucket == nil { return nil } return bucket.ForEach(func(_, value []byte) error { var session ResumableSession if err := json.Unmarshal(value, &session); err != nil { return err } if session.Status == ResumableStatusCompleted || session.Status == ResumableStatusCancelled || (session.Status == ResumableStatusUploading && !session.ExpiresAt.After(now)) { candidates = append(candidates, session) } return nil }) }) if err != nil { return 0, err } for _, session := range candidates { if err := os.RemoveAll(s.resumableSessionDirFor(session)); err != nil { return 0, err } } err = s.db.Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(resumableUploadsBucket) if bucket == nil { return nil } for _, session := range candidates { if err := bucket.Delete([]byte(session.ID)); err != nil { return err } } return nil }) return len(candidates), err } func (s *UploadService) deleteResumableSession(sessionID string) error { return s.db.Update(func(tx *bbolt.Tx) error { bucket := tx.Bucket(resumableUploadsBucket) if bucket == nil { return nil } return bucket.Delete([]byte(sessionID)) }) } func (s *UploadService) saveResumableSession(session ResumableSession) error { if err := s.ensureResumableBucket(); err != nil { return err } return s.db.Update(func(tx *bbolt.Tx) error { data, err := json.Marshal(session) if err != nil { return err } return tx.Bucket(resumableUploadsBucket).Put([]byte(session.ID), data) }) } func (s *UploadService) resumableFilesFromInput(files []ResumableFileInput, opts UploadOptions, chunkSize int64, existing map[string]bool) ([]ResumableFile, error) { sessionFiles := make([]ResumableFile, 0, len(files)) for _, file := range files { file.Name = cleanUploadDisplayName(file.Name) if file.Name == "." || file.Name == "" { return nil, fmt.Errorf("file name is required") } if file.Size < 0 { return nil, fmt.Errorf("file size is invalid") } fingerprint := strings.TrimSpace(file.Fingerprint) key := resumableFileKey(file.Name, file.Size, fingerprint) if existing != nil && existing[key] { continue } if !opts.SkipSizeLimit { if err := s.ValidateSize(file.Size); err != nil { return nil, err } } chunks := int((file.Size + chunkSize - 1) / chunkSize) if chunks == 0 { chunks = 1 } sessionFiles = append(sessionFiles, ResumableFile{ ID: randomID(8), Name: file.Name, Size: file.Size, ContentType: strings.TrimSpace(file.ContentType), Fingerprint: fingerprint, ChunkCount: chunks, }) if existing != nil { existing[key] = true } } return sessionFiles, nil } func resumableFileKey(name string, size int64, fingerprint string) string { return strings.TrimSpace(fingerprint) + "|" + cleanUploadDisplayName(name) + "|" + fmt.Sprintf("%d", size) } type resumableIncomingFile struct { service *UploadService session ResumableSession file ResumableFile } func (f resumableIncomingFile) Name() string { return f.file.Name } func (f resumableIncomingFile) Size() int64 { return f.file.Size } func (f resumableIncomingFile) ContentType() string { return f.file.ContentType } func (f resumableIncomingFile) Open() (io.ReadCloser, error) { return &resumableChunkReader{ service: f.service, session: f.session, file: f.file, }, nil } type resumableChunkReader struct { service *UploadService session ResumableSession file ResumableFile index int current *os.File } func (r *resumableChunkReader) Read(p []byte) (int, error) { for { if r.current == nil { if r.index >= r.file.ChunkCount { return 0, io.EOF } chunk, err := os.Open(r.service.resumableChunkPathFor(r.session, r.file.ID, r.index)) if err != nil { return 0, err } r.current = chunk } n, err := r.current.Read(p) if err == io.EOF { if closeErr := r.current.Close(); closeErr != nil { r.current = nil return n, closeErr } r.current = nil r.index++ if n > 0 { return n, nil } continue } return n, err } } func (r *resumableChunkReader) Close() error { if r.current == nil { return nil } err := r.current.Close() r.current = nil return err } func (s *UploadService) resumableIncomingFiles(session ResumableSession) ([]IncomingFile, error) { staged := make([]IncomingFile, 0, len(session.Files)) for _, file := range session.Files { if len(file.UploadedChunks) != file.ChunkCount { return nil, fmt.Errorf("file %s is missing chunks", file.Name) } var written int64 for i := 0; i < file.ChunkCount; i++ { info, err := os.Stat(s.resumableChunkPathFor(session, file.ID, i)) if err != nil { return nil, fmt.Errorf("file %s is missing chunks", file.Name) } written += info.Size() } if written != file.Size { return nil, fmt.Errorf("chunk size total mismatch") } staged = append(staged, resumableIncomingFile{ service: s, session: session, file: file, }) } return staged, nil } func resumableSessionWritable(session ResumableSession) error { if session.Status != ResumableStatusUploading { return fmt.Errorf("upload session is not active") } if !session.ExpiresAt.After(time.Now().UTC()) { return fmt.Errorf("upload session expired") } return nil } func resumableFileComplete(file ResumableFile) bool { return file.ChunkCount > 0 && len(file.UploadedChunks) == file.ChunkCount } func expectedChunkSize(fileSize, chunkSize int64, index int) int64 { offset := int64(index) * chunkSize remaining := fileSize - offset if remaining < 0 { return 0 } if remaining > chunkSize { return chunkSize } return remaining } func addChunkIndex(chunks []int, index int) []int { for _, chunk := range chunks { if chunk == index { return chunks } } chunks = append(chunks, index) sort.Ints(chunks) return chunks } func resumableTokenHash(sessionID, token string) string { sum := sha256.Sum256([]byte("warpbox-resumable:" + sessionID + ":" + token)) return hex.EncodeToString(sum[:]) } func (s *UploadService) resumableSessionDir(sessionID string) string { return filepath.Join(s.dataDir, "tmp", "uploads", sessionID) } func (s *UploadService) resumableSessionDirFor(session ResumableSession) string { if strings.TrimSpace(session.ChunkRoot) != "" { return filepath.Join(session.ChunkRoot, session.ID) } return s.resumableSessionDir(session.ID) } func (s *UploadService) resumableFileDir(sessionID, fileID string) string { return filepath.Join(s.resumableSessionDir(sessionID), fileID) } func (s *UploadService) resumableFileDirFor(session ResumableSession, fileID string) string { return filepath.Join(s.resumableSessionDirFor(session), fileID) } func (s *UploadService) resumableChunkPath(sessionID, fileID string, index int) string { return filepath.Join(s.resumableFileDir(sessionID, fileID), fmt.Sprintf("%06d.part", index)) } func (s *UploadService) resumableChunkPathFor(session ResumableSession, fileID string, index int) string { return filepath.Join(s.resumableFileDirFor(session, fileID), fmt.Sprintf("%06d.part", index)) }