2026-06-02 17:41:41 +03:00
package services
import (
"context"
2026-06-02 22:13:54 +03:00
"crypto/sha256"
"crypto/subtle"
"encoding/hex"
2026-06-02 17:41:41 +03:00
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"
"go.etcd.io/bbolt"
)
var resumableUploadsBucket = [ ] byte ( "resumable_uploads" )
const (
2026-06-02 22:13:54 +03:00
ResumableStatusUploading = "uploading"
ResumableStatusProcessing = "processing"
ResumableStatusCompleted = "completed"
ResumableStatusCancelled = "cancelled"
2026-06-02 17:41:41 +03:00
)
type ResumableFileInput struct {
Name string ` json:"name" `
Size int64 ` json:"size" `
ContentType string ` json:"contentType" `
Fingerprint string ` json:"fingerprint,omitempty" `
}
type ResumableSession struct {
2026-06-02 22:13:54 +03:00
ID string ` json:"id" `
Options UploadOptions ` json:"options" `
Files [ ] ResumableFile ` json:"files" `
ChunkSize int64 ` json:"chunkSize" `
Status string ` json:"status" `
BoxID string ` json:"boxId,omitempty" `
ResumeTokenHash string ` json:"resumeTokenHash,omitempty" `
ResumeToken string ` json:"-" `
ChunkRoot string ` json:"chunkRoot,omitempty" `
CreatedAt time . Time ` json:"createdAt" `
UpdatedAt time . Time ` json:"updatedAt" `
ExpiresAt time . Time ` json:"expiresAt" `
2026-06-02 17:41:41 +03:00
}
type ResumableFile struct {
ID string ` json:"id" `
Name string ` json:"name" `
Size int64 ` json:"size" `
ContentType string ` json:"contentType" `
Fingerprint string ` json:"fingerprint,omitempty" `
ChunkCount int ` json:"chunkCount" `
UploadedChunks [ ] int ` json:"uploadedChunks" `
}
func ( s * UploadService ) ensureResumableBucket ( ) error {
return s . db . Update ( func ( tx * bbolt . Tx ) error {
_ , err := tx . CreateBucketIfNotExists ( resumableUploadsBucket )
return err
} )
}
2026-06-02 22:13:54 +03:00
func ( s * UploadService ) CreateResumableSession ( files [ ] ResumableFileInput , opts UploadOptions , chunkSize int64 , retention time . Duration , chunkRoot string ) ( ResumableSession , error ) {
2026-06-02 17:41:41 +03:00
if len ( files ) == 0 {
return ResumableSession { } , fmt . Errorf ( "no files were uploaded" )
}
if chunkSize <= 0 {
return ResumableSession { } , fmt . Errorf ( "chunk size must be positive" )
}
if retention <= 0 {
return ResumableSession { } , fmt . Errorf ( "retention must be positive" )
}
if strings . TrimSpace ( opts . Password ) != "" {
opts . PasswordSalt , opts . PasswordHash = hashPassword ( opts . Password )
opts . Password = ""
}
sessionFiles , err := s . resumableFilesFromInput ( files , opts , chunkSize , nil )
if err != nil {
return ResumableSession { } , err
}
now := time . Now ( ) . UTC ( )
2026-06-02 22:13:54 +03:00
resumeToken := randomID ( 32 )
sessionID := randomID ( 12 )
2026-06-02 17:41:41 +03:00
session := ResumableSession {
2026-06-02 22:13:54 +03:00
ID : sessionID ,
Options : opts ,
Files : sessionFiles ,
ChunkSize : chunkSize ,
Status : ResumableStatusUploading ,
ResumeTokenHash : resumableTokenHash ( sessionID , resumeToken ) ,
ResumeToken : resumeToken ,
ChunkRoot : strings . TrimSpace ( chunkRoot ) ,
CreatedAt : now ,
UpdatedAt : now ,
ExpiresAt : now . Add ( retention ) ,
2026-06-02 17:41:41 +03:00
}
if err := s . saveResumableSession ( session ) ; err != nil {
return ResumableSession { } , err
}
return session , nil
}
2026-06-02 22:13:54 +03:00
func ( s * UploadService ) VerifyResumableToken ( session ResumableSession , token string ) bool {
if session . ResumeTokenHash == "" || strings . TrimSpace ( token ) == "" {
return false
}
hash := resumableTokenHash ( session . ID , token )
return subtle . ConstantTimeCompare ( [ ] byte ( hash ) , [ ] byte ( session . ResumeTokenHash ) ) == 1
}
2026-06-02 17:41:41 +03:00
func ( s * UploadService ) AddResumableFiles ( sessionID string , files [ ] ResumableFileInput ) ( ResumableSession , error ) {
if len ( files ) == 0 {
return s . GetResumableSession ( sessionID )
}
session , err := s . GetResumableSession ( sessionID )
if err != nil {
return ResumableSession { } , err
}
if err := resumableSessionWritable ( session ) ; err != nil {
return ResumableSession { } , err
}
existing := make ( map [ string ] bool )
for _ , file := range session . Files {
existing [ resumableFileKey ( file . Name , file . Size , file . Fingerprint ) ] = true
}
newFiles , err := s . resumableFilesFromInput ( files , session . Options , session . ChunkSize , existing )
if err != nil {
return ResumableSession { } , err
}
if len ( newFiles ) == 0 {
return session , nil
}
session . Files = append ( session . Files , newFiles ... )
session . UpdatedAt = time . Now ( ) . UTC ( )
if err := s . saveResumableSession ( session ) ; err != nil {
return ResumableSession { } , err
}
return session , nil
}
func ( s * UploadService ) GetResumableSession ( id string ) ( ResumableSession , error ) {
var session ResumableSession
err := s . db . View ( func ( tx * bbolt . Tx ) error {
bucket := tx . Bucket ( resumableUploadsBucket )
if bucket == nil {
return os . ErrNotExist
}
data := bucket . Get ( [ ] byte ( id ) )
if data == nil {
return os . ErrNotExist
}
return json . Unmarshal ( data , & session )
} )
if err != nil {
return ResumableSession { } , err
}
return session , nil
}
func ( s * UploadService ) PutResumableChunk ( ctx context . Context , sessionID , fileID string , index int , body io . Reader ) ( ResumableSession , error ) {
session , err := s . GetResumableSession ( sessionID )
if err != nil {
return ResumableSession { } , err
}
if err := resumableSessionWritable ( session ) ; err != nil {
return ResumableSession { } , err
}
fileIndex := - 1
for i , file := range session . Files {
if file . ID == fileID {
fileIndex = i
break
}
}
if fileIndex < 0 {
return ResumableSession { } , os . ErrNotExist
}
file := session . Files [ fileIndex ]
if index < 0 || index >= file . ChunkCount {
return ResumableSession { } , fmt . Errorf ( "chunk index is invalid" )
}
expectedSize := expectedChunkSize ( file . Size , session . ChunkSize , index )
2026-06-02 22:13:54 +03:00
chunkDir := s . resumableFileDirFor ( session , file . ID )
2026-06-02 17:41:41 +03:00
if err := os . MkdirAll ( chunkDir , 0 o755 ) ; err != nil {
return ResumableSession { } , err
}
2026-06-02 22:13:54 +03:00
chunkPath := s . resumableChunkPathFor ( session , file . ID , index )
2026-06-02 17:41:41 +03:00
tempPath := chunkPath + ".tmp"
target , err := os . OpenFile ( tempPath , os . O_CREATE | os . O_TRUNC | os . O_WRONLY , 0 o600 )
if err != nil {
return ResumableSession { } , err
}
written , copyErr := io . Copy ( target , io . LimitReader ( body , expectedSize + 1 ) )
closeErr := target . Close ( )
if copyErr != nil {
_ = os . Remove ( tempPath )
return ResumableSession { } , copyErr
}
if closeErr != nil {
_ = os . Remove ( tempPath )
return ResumableSession { } , closeErr
}
if written != expectedSize {
_ = os . Remove ( tempPath )
return ResumableSession { } , fmt . Errorf ( "chunk size mismatch" )
}
if err := os . Rename ( tempPath , chunkPath ) ; err != nil {
_ = os . Remove ( tempPath )
return ResumableSession { } , err
}
session . Files [ fileIndex ] . UploadedChunks = addChunkIndex ( session . Files [ fileIndex ] . UploadedChunks , index )
session . UpdatedAt = time . Now ( ) . UTC ( )
if err := s . saveResumableSession ( session ) ; err != nil {
return ResumableSession { } , err
}
return session , nil
}
func ( s * UploadService ) CompleteResumableSession ( ctx context . Context , sessionID string ) ( UploadResult , ResumableSession , error ) {
session , err := s . GetResumableSession ( sessionID )
if err != nil {
return UploadResult { } , ResumableSession { } , err
}
2026-06-02 22:13:54 +03:00
if ( session . Status == ResumableStatusCompleted || session . Status == ResumableStatusProcessing ) && session . BoxID != "" {
box , err := s . GetBox ( session . BoxID )
if err != nil {
return UploadResult { } , ResumableSession { } , err
}
return s . resultForBox ( box , "" ) , session , nil
}
2026-06-02 17:41:41 +03:00
if err := resumableSessionWritable ( session ) ; err != nil {
return UploadResult { } , ResumableSession { } , err
}
2026-06-02 22:13:54 +03:00
staged , err := s . resumableIncomingFiles ( session )
2026-06-02 17:41:41 +03:00
if err != nil {
return UploadResult { } , ResumableSession { } , err
}
2026-06-02 22:13:54 +03:00
result , err := s . CreateBoxFromIncomingContext ( ctx , staged , session . Options )
2026-06-02 17:41:41 +03:00
if err != nil {
return UploadResult { } , ResumableSession { } , err
}
2026-06-02 22:13:54 +03:00
if err := os . RemoveAll ( s . resumableSessionDirFor ( session ) ) ; err != nil {
2026-06-02 17:41:41 +03:00
return UploadResult { } , ResumableSession { } , err
}
session . Status = ResumableStatusCompleted
session . BoxID = result . BoxID
session . UpdatedAt = time . Now ( ) . UTC ( )
if err := s . saveResumableSession ( session ) ; err != nil {
return UploadResult { } , ResumableSession { } , err
}
return result , session , nil
}
2026-06-02 22:13:54 +03:00
func ( s * UploadService ) CreateProcessingBoxFromResumable ( sessionID string ) ( UploadResult , ResumableSession , error ) {
2026-06-02 17:41:41 +03:00
session , err := s . GetResumableSession ( sessionID )
if err != nil {
2026-06-02 22:13:54 +03:00
return UploadResult { } , ResumableSession { } , err
}
if ( session . Status == ResumableStatusCompleted || session . Status == ResumableStatusProcessing ) && session . BoxID != "" {
box , err := s . GetBox ( session . BoxID )
if err != nil {
return UploadResult { } , ResumableSession { } , err
}
return s . resultForBox ( box , "" ) , session , nil
}
if err := resumableSessionWritable ( session ) ; err != nil {
return UploadResult { } , ResumableSession { } , err
}
if _ , err := s . resumableIncomingFiles ( session ) ; err != nil {
return UploadResult { } , ResumableSession { } , err
}
now := time . Now ( ) . UTC ( )
expiresAt := now . AddDate ( 0 , 0 , 7 )
if session . Options . ExpiresInMinutes < 0 || session . Options . MaxDays < 0 {
expiresAt = now . AddDate ( 100 , 0 , 0 )
} else if session . Options . ExpiresInMinutes > 0 {
expiresAt = now . Add ( time . Duration ( session . Options . ExpiresInMinutes ) * time . Minute )
} else if session . Options . MaxDays > 0 {
expiresAt = now . Add ( time . Duration ( session . Options . MaxDays ) * 24 * time . Hour )
}
box := Box {
ID : randomID ( 10 ) ,
OwnerID : strings . TrimSpace ( session . Options . OwnerID ) ,
CollectionID : strings . TrimSpace ( session . Options . CollectionID ) ,
CreatorIP : strings . TrimSpace ( session . Options . CreatorIP ) ,
StorageBackendID : normalizeBackendID ( session . Options . StorageBackendID ) ,
CreatedAt : now ,
ExpiresAt : expiresAt ,
MaxDownloads : session . Options . MaxDownloads ,
Obfuscate : session . Options . ObfuscateMetadata && ( strings . TrimSpace ( session . Options . Password ) != "" || strings . TrimSpace ( session . Options . PasswordHash ) != "" ) ,
Files : make ( [ ] File , 0 , len ( session . Files ) ) ,
}
deleteToken := randomID ( 32 )
box . DeleteTokenHash = deleteTokenHash ( box . ID , deleteToken )
if strings . TrimSpace ( session . Options . PasswordHash ) != "" {
box . PasswordSalt = session . Options . PasswordSalt
box . PasswordHash = session . Options . PasswordHash
} else if strings . TrimSpace ( session . Options . Password ) != "" {
salt , hash := hashPassword ( session . Options . Password )
box . PasswordSalt = salt
box . PasswordHash = hash
}
for _ , incoming := range session . Files {
fileID := randomID ( 8 )
storedName := "@each@" + fileID + strings . ToLower ( filepath . Ext ( incoming . Name ) )
objectKey := boxObjectKey ( box . ID , storedName )
contentType := incoming . ContentType
if contentType == "" {
contentType = "application/octet-stream"
}
box . Files = append ( box . Files , File {
ID : fileID ,
2026-06-10 18:14:29 +03:00
Name : cleanUploadDisplayName ( incoming . Name ) ,
2026-06-02 22:13:54 +03:00
StoredName : storedName ,
Size : incoming . Size ,
ContentType : contentType ,
PreviewKind : previewKind ( contentType ) ,
ObjectKey : objectKey ,
Processing : true ,
UploadedAt : now ,
} )
}
if err := s . saveBoxRecord ( box ) ; err != nil {
return UploadResult { } , ResumableSession { } , err
2026-06-02 17:41:41 +03:00
}
2026-06-02 22:13:54 +03:00
session . Status = ResumableStatusProcessing
session . BoxID = box . ID
2026-06-02 17:41:41 +03:00
session . UpdatedAt = time . Now ( ) . UTC ( )
if err := s . saveResumableSession ( session ) ; err != nil {
2026-06-02 22:13:54 +03:00
return UploadResult { } , ResumableSession { } , err
}
return s . resultForBox ( box , deleteToken ) , session , nil
}
func ( s * UploadService ) FinalizeProcessingResumableSession ( ctx context . Context , sessionID string ) ( UploadResult , error ) {
session , err := s . GetResumableSession ( sessionID )
if err != nil {
return UploadResult { } , err
}
if session . Status == ResumableStatusCompleted && session . BoxID != "" {
box , err := s . GetBox ( session . BoxID )
if err != nil {
return UploadResult { } , err
}
return s . resultForBox ( box , "" ) , nil
}
if session . Status != ResumableStatusProcessing || session . BoxID == "" {
return UploadResult { } , fmt . Errorf ( "upload session is not processing" )
}
box , err := s . GetBox ( session . BoxID )
if err != nil {
return UploadResult { } , err
}
staged , err := s . resumableIncomingFiles ( session )
if err != nil {
return UploadResult { } , err
}
if len ( staged ) != len ( box . Files ) {
return UploadResult { } , fmt . Errorf ( "processing file count mismatch" )
}
backend , err := s . storage . Backend ( box . StorageBackendID )
if err != nil {
2026-06-08 11:53:37 +03:00
_ = s . markProcessingBoxFailed ( box , err )
2026-06-02 22:13:54 +03:00
return UploadResult { } , err
}
for i , incoming := range staged {
source , err := incoming . Open ( )
if err != nil {
2026-06-08 11:53:37 +03:00
_ = s . markProcessingBoxFailed ( box , err )
2026-06-02 22:13:54 +03:00
return UploadResult { } , err
}
file := box . Files [ i ]
if err := s . writeUploadedObject ( ctx , backend , file . ObjectKey , source , incoming . Size ( ) , 0 , incoming . ContentType ( ) ) ; err != nil {
source . Close ( )
_ = backend . Delete ( context . Background ( ) , file . ObjectKey )
2026-06-08 11:53:37 +03:00
_ = s . markProcessingBoxFailed ( box , err )
2026-06-02 22:13:54 +03:00
return UploadResult { } , err
}
source . Close ( )
box . Files [ i ] . Processing = false
box . Files [ i ] . ProcessingError = ""
box . Files [ i ] . UploadedAt = time . Now ( ) . UTC ( )
if err := s . saveBoxRecord ( box ) ; err != nil {
return UploadResult { } , err
}
}
if err := s . writeBoxMetadata ( box ) ; err != nil {
s . logger . Warn ( "box metadata write failed after resumable processing" , "source" , "storage" , "severity" , "warn" , "code" , 4020 , "box_id" , box . ID , "error" , err . Error ( ) )
}
if err := os . RemoveAll ( s . resumableSessionDirFor ( session ) ) ; err != nil {
return UploadResult { } , err
}
session . Status = ResumableStatusCompleted
session . UpdatedAt = time . Now ( ) . UTC ( )
if err := s . saveResumableSession ( session ) ; err != nil {
return UploadResult { } , err
}
return s . resultForBox ( box , "" ) , nil
}
2026-06-08 11:53:37 +03:00
func ( s * UploadService ) markProcessingBoxFailed ( box Box , cause error ) error {
message := "upload processing failed"
if cause != nil && strings . TrimSpace ( cause . Error ( ) ) != "" {
message = cause . Error ( )
}
s . logger . Warn ( "resumable upload box marked failed" , "source" , "user-upload" , "severity" , "warn" , "code" , 4021 , "box_id" , box . ID , "backend_id" , s . BoxStorageBackendID ( box ) , "files" , len ( box . Files ) , "error" , message )
now := time . Now ( ) . UTC ( )
box . Trouble = true
box . TroubleReason = message
for i := range box . Files {
if box . Files [ i ] . Processing || box . Files [ i ] . ProcessingError == "" {
box . Files [ i ] . Processing = false
box . Files [ i ] . ProcessingError = message
if box . Files [ i ] . UploadedAt . IsZero ( ) {
box . Files [ i ] . UploadedAt = now
}
}
}
if err := s . saveBoxRecord ( box ) ; err != nil {
s . logger . Warn ( "failed to save failed upload box state" , "source" , "user-upload" , "severity" , "warn" , "code" , 4022 , "box_id" , box . ID , "backend_id" , s . BoxStorageBackendID ( box ) , "error" , err . Error ( ) )
return err
}
if err := s . writeBoxMetadata ( box ) ; err != nil {
s . logger . Warn ( "failed to write failed upload box metadata" , "source" , "user-upload" , "severity" , "warn" , "code" , 4023 , "box_id" , box . ID , "backend_id" , s . BoxStorageBackendID ( box ) , "error" , err . Error ( ) )
return err
}
return nil
}
2026-06-02 22:13:54 +03:00
func ( s * UploadService ) CompleteUploadedResumableSession ( ctx context . Context , sessionID string ) ( UploadResult , ResumableSession , error ) {
session , err := s . GetResumableSession ( sessionID )
if err != nil {
return UploadResult { } , ResumableSession { } , err
}
if err := resumableSessionWritable ( session ) ; err != nil {
return UploadResult { } , ResumableSession { } , err
}
completeFiles := make ( [ ] ResumableFile , 0 , len ( session . Files ) )
for _ , file := range session . Files {
if resumableFileComplete ( file ) {
completeFiles = append ( completeFiles , file )
}
}
if len ( completeFiles ) == 0 {
return UploadResult { } , ResumableSession { } , fmt . Errorf ( "no fully uploaded files to finish" )
}
partial := session
partial . Files = completeFiles
staged , err := s . resumableIncomingFiles ( partial )
if err != nil {
return UploadResult { } , ResumableSession { } , err
}
result , err := s . CreateBoxFromIncomingContext ( ctx , staged , session . Options )
if err != nil {
return UploadResult { } , ResumableSession { } , err
}
if err := os . RemoveAll ( s . resumableSessionDirFor ( session ) ) ; err != nil {
return UploadResult { } , ResumableSession { } , err
}
session . Status = ResumableStatusCompleted
session . BoxID = result . BoxID
session . Files = completeFiles
session . UpdatedAt = time . Now ( ) . UTC ( )
if err := s . deleteResumableSession ( session . ID ) ; err != nil {
return UploadResult { } , ResumableSession { } , err
}
return result , session , nil
}
func ( s * UploadService ) CancelResumableSession ( sessionID string ) error {
session , err := s . GetResumableSession ( sessionID )
if err != nil {
return err
}
if err := os . RemoveAll ( s . resumableSessionDirFor ( session ) ) ; err != nil {
2026-06-02 17:41:41 +03:00
return err
}
2026-06-02 22:13:54 +03:00
return s . deleteResumableSession ( session . ID )
2026-06-02 17:41:41 +03:00
}
func ( s * UploadService ) CleanupExpiredResumableSessions ( now time . Time ) ( int , error ) {
candidates := make ( [ ] ResumableSession , 0 )
err := s . db . View ( func ( tx * bbolt . Tx ) error {
bucket := tx . Bucket ( resumableUploadsBucket )
if bucket == nil {
return nil
}
return bucket . ForEach ( func ( _ , value [ ] byte ) error {
var session ResumableSession
if err := json . Unmarshal ( value , & session ) ; err != nil {
return err
}
2026-06-02 22:13:54 +03:00
if session . Status == ResumableStatusCompleted ||
session . Status == ResumableStatusCancelled ||
( session . Status == ResumableStatusUploading && ! session . ExpiresAt . After ( now ) ) {
2026-06-02 17:41:41 +03:00
candidates = append ( candidates , session )
}
return nil
} )
} )
if err != nil {
return 0 , err
}
for _ , session := range candidates {
2026-06-02 22:13:54 +03:00
if err := os . RemoveAll ( s . resumableSessionDirFor ( session ) ) ; err != nil {
2026-06-02 17:41:41 +03:00
return 0 , err
}
}
err = s . db . Update ( func ( tx * bbolt . Tx ) error {
bucket := tx . Bucket ( resumableUploadsBucket )
if bucket == nil {
return nil
}
for _ , session := range candidates {
if err := bucket . Delete ( [ ] byte ( session . ID ) ) ; err != nil {
return err
}
}
return nil
} )
return len ( candidates ) , err
}
2026-06-02 22:13:54 +03:00
func ( s * UploadService ) deleteResumableSession ( sessionID string ) error {
return s . db . Update ( func ( tx * bbolt . Tx ) error {
bucket := tx . Bucket ( resumableUploadsBucket )
if bucket == nil {
return nil
}
return bucket . Delete ( [ ] byte ( sessionID ) )
} )
}
2026-06-02 17:41:41 +03:00
func ( s * UploadService ) saveResumableSession ( session ResumableSession ) error {
if err := s . ensureResumableBucket ( ) ; err != nil {
return err
}
return s . db . Update ( func ( tx * bbolt . Tx ) error {
data , err := json . Marshal ( session )
if err != nil {
return err
}
return tx . Bucket ( resumableUploadsBucket ) . Put ( [ ] byte ( session . ID ) , data )
} )
}
func ( s * UploadService ) resumableFilesFromInput ( files [ ] ResumableFileInput , opts UploadOptions , chunkSize int64 , existing map [ string ] bool ) ( [ ] ResumableFile , error ) {
sessionFiles := make ( [ ] ResumableFile , 0 , len ( files ) )
for _ , file := range files {
2026-06-10 18:14:29 +03:00
file . Name = cleanUploadDisplayName ( file . Name )
2026-06-02 17:41:41 +03:00
if file . Name == "." || file . Name == "" {
return nil , fmt . Errorf ( "file name is required" )
}
if file . Size < 0 {
return nil , fmt . Errorf ( "file size is invalid" )
}
fingerprint := strings . TrimSpace ( file . Fingerprint )
key := resumableFileKey ( file . Name , file . Size , fingerprint )
if existing != nil && existing [ key ] {
continue
}
if ! opts . SkipSizeLimit {
if err := s . ValidateSize ( file . Size ) ; err != nil {
return nil , err
}
}
chunks := int ( ( file . Size + chunkSize - 1 ) / chunkSize )
if chunks == 0 {
chunks = 1
}
sessionFiles = append ( sessionFiles , ResumableFile {
ID : randomID ( 8 ) ,
Name : file . Name ,
Size : file . Size ,
ContentType : strings . TrimSpace ( file . ContentType ) ,
Fingerprint : fingerprint ,
ChunkCount : chunks ,
} )
if existing != nil {
existing [ key ] = true
}
}
return sessionFiles , nil
}
func resumableFileKey ( name string , size int64 , fingerprint string ) string {
2026-06-10 18:14:29 +03:00
return strings . TrimSpace ( fingerprint ) + "|" + cleanUploadDisplayName ( name ) + "|" + fmt . Sprintf ( "%d" , size )
2026-06-02 17:41:41 +03:00
}
2026-06-02 22:13:54 +03:00
type resumableIncomingFile struct {
service * UploadService
session ResumableSession
file ResumableFile
}
func ( f resumableIncomingFile ) Name ( ) string {
return f . file . Name
}
func ( f resumableIncomingFile ) Size ( ) int64 {
return f . file . Size
}
func ( f resumableIncomingFile ) ContentType ( ) string {
return f . file . ContentType
}
func ( f resumableIncomingFile ) Open ( ) ( io . ReadCloser , error ) {
return & resumableChunkReader {
service : f . service ,
session : f . session ,
file : f . file ,
} , nil
}
type resumableChunkReader struct {
service * UploadService
session ResumableSession
file ResumableFile
index int
current * os . File
}
func ( r * resumableChunkReader ) Read ( p [ ] byte ) ( int , error ) {
for {
if r . current == nil {
if r . index >= r . file . ChunkCount {
return 0 , io . EOF
}
chunk , err := os . Open ( r . service . resumableChunkPathFor ( r . session , r . file . ID , r . index ) )
if err != nil {
return 0 , err
}
r . current = chunk
}
n , err := r . current . Read ( p )
if err == io . EOF {
if closeErr := r . current . Close ( ) ; closeErr != nil {
r . current = nil
return n , closeErr
}
r . current = nil
r . index ++
if n > 0 {
return n , nil
}
continue
}
return n , err
2026-06-02 17:41:41 +03:00
}
2026-06-02 22:13:54 +03:00
}
func ( r * resumableChunkReader ) Close ( ) error {
if r . current == nil {
return nil
2026-06-02 17:41:41 +03:00
}
2026-06-02 22:13:54 +03:00
err := r . current . Close ( )
r . current = nil
return err
}
func ( s * UploadService ) resumableIncomingFiles ( session ResumableSession ) ( [ ] IncomingFile , error ) {
2026-06-02 17:41:41 +03:00
staged := make ( [ ] IncomingFile , 0 , len ( session . Files ) )
for _ , file := range session . Files {
if len ( file . UploadedChunks ) != file . ChunkCount {
2026-06-02 22:13:54 +03:00
return nil , fmt . Errorf ( "file %s is missing chunks" , file . Name )
2026-06-02 17:41:41 +03:00
}
var written int64
for i := 0 ; i < file . ChunkCount ; i ++ {
2026-06-02 22:13:54 +03:00
info , err := os . Stat ( s . resumableChunkPathFor ( session , file . ID , i ) )
2026-06-02 17:41:41 +03:00
if err != nil {
2026-06-02 22:13:54 +03:00
return nil , fmt . Errorf ( "file %s is missing chunks" , file . Name )
2026-06-02 17:41:41 +03:00
}
2026-06-02 22:13:54 +03:00
written += info . Size ( )
2026-06-02 17:41:41 +03:00
}
if written != file . Size {
2026-06-02 22:13:54 +03:00
return nil , fmt . Errorf ( "chunk size total mismatch" )
2026-06-02 17:41:41 +03:00
}
2026-06-02 22:13:54 +03:00
staged = append ( staged , resumableIncomingFile {
service : s ,
session : session ,
file : file ,
2026-06-02 17:41:41 +03:00
} )
}
2026-06-02 22:13:54 +03:00
return staged , nil
2026-06-02 17:41:41 +03:00
}
func resumableSessionWritable ( session ResumableSession ) error {
if session . Status != ResumableStatusUploading {
return fmt . Errorf ( "upload session is not active" )
}
if ! session . ExpiresAt . After ( time . Now ( ) . UTC ( ) ) {
return fmt . Errorf ( "upload session expired" )
}
return nil
}
2026-06-02 22:13:54 +03:00
func resumableFileComplete ( file ResumableFile ) bool {
return file . ChunkCount > 0 && len ( file . UploadedChunks ) == file . ChunkCount
}
2026-06-02 17:41:41 +03:00
func expectedChunkSize ( fileSize , chunkSize int64 , index int ) int64 {
offset := int64 ( index ) * chunkSize
remaining := fileSize - offset
if remaining < 0 {
return 0
}
if remaining > chunkSize {
return chunkSize
}
return remaining
}
func addChunkIndex ( chunks [ ] int , index int ) [ ] int {
for _ , chunk := range chunks {
if chunk == index {
return chunks
}
}
chunks = append ( chunks , index )
sort . Ints ( chunks )
return chunks
}
2026-06-02 22:13:54 +03:00
func resumableTokenHash ( sessionID , token string ) string {
sum := sha256 . Sum256 ( [ ] byte ( "warpbox-resumable:" + sessionID + ":" + token ) )
return hex . EncodeToString ( sum [ : ] )
}
2026-06-02 17:41:41 +03:00
func ( s * UploadService ) resumableSessionDir ( sessionID string ) string {
return filepath . Join ( s . dataDir , "tmp" , "uploads" , sessionID )
}
2026-06-02 22:13:54 +03:00
func ( s * UploadService ) resumableSessionDirFor ( session ResumableSession ) string {
if strings . TrimSpace ( session . ChunkRoot ) != "" {
return filepath . Join ( session . ChunkRoot , session . ID )
}
return s . resumableSessionDir ( session . ID )
}
2026-06-02 17:41:41 +03:00
func ( s * UploadService ) resumableFileDir ( sessionID , fileID string ) string {
return filepath . Join ( s . resumableSessionDir ( sessionID ) , fileID )
}
2026-06-02 22:13:54 +03:00
func ( s * UploadService ) resumableFileDirFor ( session ResumableSession , fileID string ) string {
return filepath . Join ( s . resumableSessionDirFor ( session ) , fileID )
}
2026-06-02 17:41:41 +03:00
func ( s * UploadService ) resumableChunkPath ( sessionID , fileID string , index int ) string {
return filepath . Join ( s . resumableFileDir ( sessionID , fileID ) , fmt . Sprintf ( "%06d.part" , index ) )
}
2026-06-02 22:13:54 +03:00
func ( s * UploadService ) resumableChunkPathFor ( session ResumableSession , fileID string , index int ) string {
return filepath . Join ( s . resumableFileDirFor ( session , fileID ) , fmt . Sprintf ( "%06d.part" , index ) )
}