All checks were successful
Build and Publish Docker Image / deploy (push) Successful in 1m42s
Update `createOrAppendBox` to accept the upload policy and admin status, allowing policy enforcement to be handled during the box creation/append decision process. This ensures that appending files to an existing batch does not incorrectly trigger daily or active box creation limits, as no new box is being created. Also, add unit tests to verify that batched uploads successfully bypass both daily and active box creation caps.
409 lines
16 KiB
Go
409 lines
16 KiB
Go
package handlers
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"mime/multipart"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"warpbox.dev/backend/libs/helpers"
|
|
"warpbox.dev/backend/libs/jobs"
|
|
"warpbox.dev/backend/libs/services"
|
|
)
|
|
|
|
func (a *App) Upload(w http.ResponseWriter, r *http.Request) {
|
|
user, loggedIn, authErr := a.currentUserWithAuthError(r)
|
|
if authErr != nil {
|
|
a.logger.Warn("upload rejected invalid bearer token", "source", "user-upload", "severity", "warn", "code", 4010, "ip", uploadClientIP(r), "user_agent", r.UserAgent())
|
|
helpers.WriteJSONError(w, http.StatusUnauthorized, "invalid bearer token")
|
|
return
|
|
}
|
|
isAdminUpload := loggedIn && user.Role == services.UserRoleAdmin
|
|
settings, err := a.settingsService.UploadPolicy()
|
|
if err != nil {
|
|
a.logger.Error("failed to load upload policy", "source", "settings", "severity", "error", "code", 5005, "error", err.Error())
|
|
helpers.WriteJSONError(w, http.StatusInternalServerError, "upload policy could not be loaded")
|
|
return
|
|
}
|
|
if !loggedIn && !settings.AnonymousUploadsEnabled {
|
|
a.logger.Warn("anonymous upload rejected disabled", "source", "user-upload", "severity", "warn", "code", 4012, "ip", uploadClientIP(r))
|
|
helpers.WriteJSONError(w, http.StatusForbidden, "anonymous uploads are disabled")
|
|
return
|
|
}
|
|
effectivePolicy := a.effectiveUploadPolicy(settings, user, loggedIn)
|
|
rateKey := uploadRateKey(r, user, loggedIn)
|
|
if !isAdminUpload && effectivePolicy.ShortRequests > 0 && !a.rateLimiter.Allow("upload:"+rateKey, effectivePolicy.ShortRequests, effectivePolicy.ShortWindow, time.Now().UTC()) {
|
|
a.logger.Warn("upload rate limited", "source", "user-upload", "severity", "warn", "code", 4290, "ip", uploadClientIP(r), "user_id", user.ID)
|
|
helpers.WriteJSONError(w, http.StatusTooManyRequests, "too many upload requests, please slow down")
|
|
return
|
|
}
|
|
|
|
parseLimit := uploadParseLimit(effectivePolicy, loggedIn, a.uploadService.MaxUploadSize())
|
|
if !isAdminUpload && parseLimit > 0 {
|
|
r.Body = http.MaxBytesReader(w, r.Body, parseLimit)
|
|
}
|
|
if isAdminUpload {
|
|
parseLimit = 32 << 20
|
|
} else if parseLimit <= 0 {
|
|
parseLimit = 32 << 20
|
|
}
|
|
if err := r.ParseMultipartForm(parseLimit); err != nil {
|
|
a.logger.Warn("upload form parse failed", "source", "user-upload", "severity", "warn", "code", 4000, "ip", uploadClientIP(r), "user_id", user.ID, "error", err.Error())
|
|
helpers.WriteJSONError(w, http.StatusBadRequest, "upload form could not be read")
|
|
return
|
|
}
|
|
|
|
files := uploadFiles(r)
|
|
totalBytes := totalUploadBytes(files)
|
|
var ownerID string
|
|
var collectionID string
|
|
if loggedIn {
|
|
ownerID = user.ID
|
|
collectionID = r.FormValue("collection_id")
|
|
if !a.authService.CollectionOwnedBy(collectionID, user.ID) {
|
|
a.logger.Warn("upload rejected invalid collection", "source", "user-upload", "severity", "warn", "code", 4030, "user_id", user.ID, "collection_id", collectionID)
|
|
helpers.WriteJSONError(w, http.StatusForbidden, "collection not found")
|
|
return
|
|
}
|
|
}
|
|
if !isAdminUpload {
|
|
if status, message := a.checkUploadPolicy(r, user, loggedIn, settings, effectivePolicy, files, totalBytes); message != "" {
|
|
a.logger.Warn("upload rejected by policy", "source", "quota", "severity", "warn", "code", status, "ip", uploadClientIP(r), "user_id", user.ID, "message", message, "bytes", totalBytes, "files", len(files))
|
|
helpers.WriteJSONError(w, status, message)
|
|
return
|
|
}
|
|
}
|
|
// Unlimited expiry: admins, or users whose effective MaxDays is negative.
|
|
unlimitedExpiry := isAdminUpload || effectivePolicy.MaxDays < 0
|
|
|
|
rawMaxDays := parseInt(r.FormValue("max_days"))
|
|
maxDays := rawMaxDays
|
|
if maxDays <= 0 {
|
|
maxDays = 7
|
|
if effectivePolicy.MaxDays > 0 && effectivePolicy.MaxDays < maxDays {
|
|
maxDays = effectivePolicy.MaxDays
|
|
}
|
|
}
|
|
if !unlimitedExpiry && maxDays > effectivePolicy.MaxDays {
|
|
a.logger.Warn("upload rejected expiration days", "source", "user-upload", "severity", "warn", "code", 4131, "ip", uploadClientIP(r), "user_id", user.ID, "requested_days", maxDays, "max_days", effectivePolicy.MaxDays)
|
|
helpers.WriteJSONError(w, http.StatusRequestEntityTooLarge, fmt.Sprintf("expiration cannot exceed %d days", effectivePolicy.MaxDays))
|
|
return
|
|
}
|
|
|
|
expiresMinutes := parseInt(r.FormValue("expires_minutes"))
|
|
// A negative expires_minutes (or max_days) is the "never expires" request.
|
|
// Only honour it for unlimited uploaders; otherwise it's an invalid value.
|
|
if expiresMinutes < 0 || rawMaxDays < 0 {
|
|
if !unlimitedExpiry {
|
|
a.logger.Warn("upload rejected unlimited expiration", "source", "user-upload", "severity", "warn", "code", 4133, "ip", uploadClientIP(r), "user_id", user.ID)
|
|
helpers.WriteJSONError(w, http.StatusRequestEntityTooLarge, fmt.Sprintf("expiration cannot exceed %d days", effectivePolicy.MaxDays))
|
|
return
|
|
}
|
|
expiresMinutes = -1
|
|
} else if expiresMinutes > 0 && !unlimitedExpiry && expiresMinutes > effectivePolicy.MaxDays*24*60 {
|
|
a.logger.Warn("upload rejected expiration minutes", "source", "user-upload", "severity", "warn", "code", 4132, "ip", uploadClientIP(r), "user_id", user.ID, "requested_minutes", expiresMinutes, "max_days", effectivePolicy.MaxDays)
|
|
helpers.WriteJSONError(w, http.StatusRequestEntityTooLarge, fmt.Sprintf("expiration cannot exceed %d days", effectivePolicy.MaxDays))
|
|
return
|
|
}
|
|
opts := services.UploadOptions{
|
|
MaxDays: maxDays,
|
|
ExpiresInMinutes: expiresMinutes,
|
|
MaxDownloads: parseInt(r.FormValue("max_downloads")),
|
|
Password: r.FormValue("password"),
|
|
ObfuscateMetadata: r.FormValue("obfuscate_metadata") == "on",
|
|
OwnerID: ownerID,
|
|
CollectionID: collectionID,
|
|
SkipSizeLimit: isAdminUpload || effectivePolicy.MaxUploadMB < 0,
|
|
CreatorIP: uploadClientIP(r),
|
|
StorageBackendID: effectivePolicy.StorageBackendID,
|
|
}
|
|
result, boxesAdded, status, policyMessage, err := a.createOrAppendBox(r, user, loggedIn, effectivePolicy, files, opts, !isAdminUpload)
|
|
if policyMessage != "" {
|
|
a.logger.Warn("upload rejected by policy", "source", "quota", "severity", "warn", "code", status, "ip", uploadClientIP(r), "user_id", user.ID, "message", policyMessage, "bytes", totalBytes, "files", len(files))
|
|
helpers.WriteJSONError(w, status, policyMessage)
|
|
return
|
|
}
|
|
if err != nil {
|
|
a.logger.Warn("upload failed", "source", "user-upload", "severity", "warn", "code", 4001, "ip", uploadClientIP(r), "user_id", user.ID, "error", err.Error())
|
|
helpers.WriteJSONError(w, http.StatusBadRequest, err.Error())
|
|
return
|
|
}
|
|
if !isAdminUpload {
|
|
if err := a.recordUploadUsage(r, user, loggedIn, totalBytes, boxesAdded); err != nil {
|
|
a.logger.Warn("failed to record upload usage", "source", "quota", "severity", "warn", "code", 4402, "error", err.Error())
|
|
}
|
|
if err := a.settingsService.CleanupUsage(time.Now().UTC(), settings.UsageRetentionDays); err != nil {
|
|
a.logger.Warn("failed to cleanup upload usage", "source", "quota", "severity", "warn", "code", 4403, "error", err.Error())
|
|
}
|
|
}
|
|
jobs.GenerateThumbnailsForBoxAsync(a.uploadService, a.logger, result.BoxID)
|
|
a.logger.Info("upload response sent", "source", "user-upload", "severity", "user_activity", "code", 2001, "ip", uploadClientIP(r), "user_id", user.ID, "box_id", result.BoxID, "files", len(files), "bytes", totalBytes, "admin", isAdminUpload)
|
|
|
|
if wantsJSON(r) {
|
|
helpers.WriteJSON(w, http.StatusCreated, result)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
|
w.WriteHeader(http.StatusCreated)
|
|
_, _ = fmt.Fprintln(w, result.BoxURL)
|
|
}
|
|
|
|
// createOrAppendBox creates a new box. It only ever appends to an existing box
|
|
// when the request opts in via the X-Warpbox-Batch header: requests sharing the
|
|
// same batch value (per account, or per IP for anonymous) within
|
|
// uploadGroupWindow are folded into one box. Without the header the behaviour is
|
|
// identical to creating a fresh box every time. Returns the result and how many
|
|
// boxes were created (1 for a new box, 0 for an append) for usage accounting.
|
|
func (a *App) createOrAppendBox(r *http.Request, user services.User, loggedIn bool, policy services.EffectiveUploadPolicy, files []*multipart.FileHeader, opts services.UploadOptions, enforceBoxLimits bool) (services.UploadResult, int, int, string, error) {
|
|
batch := strings.TrimSpace(r.Header.Get(uploadBatchHeader))
|
|
if batch == "" {
|
|
if enforceBoxLimits {
|
|
if status, message := a.checkBoxCreationPolicy(r, user, loggedIn, policy); message != "" {
|
|
return services.UploadResult{}, 0, status, message, nil
|
|
}
|
|
}
|
|
result, err := a.uploadService.CreateBox(files, opts)
|
|
if err != nil {
|
|
return services.UploadResult{}, 0, 0, "", err
|
|
}
|
|
return result, 1, 0, "", nil
|
|
}
|
|
|
|
// Group key is scoped to the uploader so batches never cross accounts/IPs.
|
|
identity := "ip:" + uploadClientIP(r)
|
|
if loggedIn {
|
|
identity = "user:" + user.ID
|
|
}
|
|
entry := a.uploadGroups.entryFor(identity + "|" + batch)
|
|
|
|
// Hold the per-key lock across the whole create/append so concurrent batched
|
|
// uploads serialise into the same box instead of racing.
|
|
entry.mu.Lock()
|
|
defer entry.mu.Unlock()
|
|
|
|
if entry.boxID != "" && time.Since(entry.at) < uploadGroupWindow {
|
|
if box, err := a.uploadService.GetBox(entry.boxID); err == nil && a.batchBoxMatches(box, user, loggedIn, r) && a.uploadService.CanDownload(box) == nil {
|
|
if result, err := a.uploadService.AppendFiles(entry.boxID, files, opts); err == nil {
|
|
// Re-attach the manage/delete URLs from the box's creation so every
|
|
// upload in the batch returns a working deletion URL.
|
|
result.ManageURL = entry.manageURL
|
|
result.DeleteURL = entry.deleteURL
|
|
entry.at = time.Now()
|
|
return result, 0, 0, "", nil
|
|
}
|
|
}
|
|
}
|
|
|
|
if enforceBoxLimits {
|
|
if status, message := a.checkBoxCreationPolicy(r, user, loggedIn, policy); message != "" {
|
|
return services.UploadResult{}, 0, status, message, nil
|
|
}
|
|
}
|
|
result, err := a.uploadService.CreateBox(files, opts)
|
|
if err != nil {
|
|
return services.UploadResult{}, 0, 0, "", err
|
|
}
|
|
entry.boxID = result.BoxID
|
|
entry.manageURL = result.ManageURL
|
|
entry.deleteURL = result.DeleteURL
|
|
entry.at = time.Now()
|
|
return result, 1, 0, "", nil
|
|
}
|
|
|
|
// batchBoxMatches guards that a batched append only ever touches a box owned by
|
|
// the same uploader (account for logged-in users, creator IP for anonymous).
|
|
func (a *App) batchBoxMatches(box services.Box, user services.User, loggedIn bool, r *http.Request) bool {
|
|
if loggedIn {
|
|
return box.OwnerID == user.ID
|
|
}
|
|
return box.OwnerID == "" && box.CreatorIP == uploadClientIP(r)
|
|
}
|
|
|
|
func (a *App) checkUploadPolicy(r *http.Request, user services.User, loggedIn bool, settings services.UploadPolicySettings, policy services.EffectiveUploadPolicy, files []*multipart.FileHeader, totalBytes int64) (int, string) {
|
|
if len(files) == 0 {
|
|
return 0, ""
|
|
}
|
|
now := time.Now().UTC()
|
|
if policy.MaxUploadMB > 0 {
|
|
maxBytes := services.MegabytesToBytes(policy.MaxUploadMB)
|
|
for _, file := range files {
|
|
if file.Size > maxBytes {
|
|
return http.StatusRequestEntityTooLarge, "file exceeds upload size limit"
|
|
}
|
|
}
|
|
}
|
|
if !loggedIn {
|
|
usage, err := a.settingsService.UsageForIP(uploadClientIP(r), now)
|
|
if err != nil {
|
|
return http.StatusInternalServerError, "upload usage could not be checked"
|
|
}
|
|
if policy.DailyUploadMB > 0 && usage.UploadedBytes+totalBytes > services.MegabytesToBytes(policy.DailyUploadMB) {
|
|
return http.StatusTooManyRequests, "anonymous daily upload limit reached"
|
|
}
|
|
if status, message := a.checkStorageBackendCapacity(policy.StorageBackendID, settings, totalBytes); message != "" {
|
|
return status, message
|
|
}
|
|
return 0, ""
|
|
}
|
|
|
|
usage, err := a.settingsService.UsageForUser(user.ID, now)
|
|
if err != nil {
|
|
return http.StatusInternalServerError, "upload usage could not be checked"
|
|
}
|
|
if policy.DailyUploadMB > 0 && usage.UploadedBytes+totalBytes > services.MegabytesToBytes(policy.DailyUploadMB) {
|
|
return http.StatusTooManyRequests, "daily upload limit reached"
|
|
}
|
|
activeStorage, err := a.uploadService.UserActiveStorageUsed(user.ID)
|
|
if err != nil {
|
|
return http.StatusInternalServerError, "storage quota could not be checked"
|
|
}
|
|
if policy.StorageQuotaSet && activeStorage+totalBytes > services.MegabytesToBytes(policy.StorageQuotaMB) {
|
|
return http.StatusRequestEntityTooLarge, "storage quota reached"
|
|
}
|
|
if status, message := a.checkStorageBackendCapacity(policy.StorageBackendID, settings, totalBytes); message != "" {
|
|
return status, message
|
|
}
|
|
return 0, ""
|
|
}
|
|
|
|
func (a *App) checkBoxCreationPolicy(r *http.Request, user services.User, loggedIn bool, policy services.EffectiveUploadPolicy) (int, string) {
|
|
now := time.Now().UTC()
|
|
if !loggedIn {
|
|
usage, err := a.settingsService.UsageForIP(uploadClientIP(r), now)
|
|
if err != nil {
|
|
return http.StatusInternalServerError, "upload usage could not be checked"
|
|
}
|
|
if policy.DailyBoxes > 0 && usage.UploadedBoxes+1 > policy.DailyBoxes {
|
|
return http.StatusTooManyRequests, "anonymous daily box limit reached"
|
|
}
|
|
activeBoxes, err := a.uploadService.ActiveBoxCountForIP(uploadClientIP(r))
|
|
if err != nil {
|
|
return http.StatusInternalServerError, "active box limit could not be checked"
|
|
}
|
|
if policy.ActiveBoxes > 0 && activeBoxes+1 > policy.ActiveBoxes {
|
|
return http.StatusTooManyRequests, "anonymous active box limit reached"
|
|
}
|
|
return 0, ""
|
|
}
|
|
usage, err := a.settingsService.UsageForUser(user.ID, now)
|
|
if err != nil {
|
|
return http.StatusInternalServerError, "upload usage could not be checked"
|
|
}
|
|
if policy.DailyBoxes > 0 && usage.UploadedBoxes+1 > policy.DailyBoxes {
|
|
return http.StatusTooManyRequests, "daily box limit reached"
|
|
}
|
|
activeBoxes, err := a.uploadService.ActiveBoxCountForUser(user.ID)
|
|
if err != nil {
|
|
return http.StatusInternalServerError, "active box limit could not be checked"
|
|
}
|
|
if policy.ActiveBoxes > 0 && activeBoxes+1 > policy.ActiveBoxes {
|
|
return http.StatusTooManyRequests, "active box limit reached"
|
|
}
|
|
return 0, ""
|
|
}
|
|
|
|
func (a *App) recordUploadUsage(r *http.Request, user services.User, loggedIn bool, totalBytes int64, boxes int) error {
|
|
now := time.Now().UTC()
|
|
if loggedIn {
|
|
return a.settingsService.AddUploadUsage("user", user.ID, totalBytes, boxes, now)
|
|
}
|
|
return a.settingsService.AddUploadUsage("ip", uploadClientIP(r), totalBytes, boxes, now)
|
|
}
|
|
|
|
func (a *App) effectiveUploadPolicy(settings services.UploadPolicySettings, user services.User, loggedIn bool) services.EffectiveUploadPolicy {
|
|
if loggedIn {
|
|
return a.settingsService.EffectivePolicyForUser(settings, user)
|
|
}
|
|
return a.settingsService.EffectivePolicyForAnonymous(settings)
|
|
}
|
|
|
|
func (a *App) checkStorageBackendCapacity(backendID string, settings services.UploadPolicySettings, totalBytes int64) (int, string) {
|
|
if backendID != services.StorageBackendLocal {
|
|
return 0, ""
|
|
}
|
|
backend, err := a.uploadService.Storage().Backend(services.StorageBackendLocal)
|
|
if err != nil {
|
|
return http.StatusInternalServerError, "storage backend could not be checked"
|
|
}
|
|
used, err := backend.Usage(context.Background())
|
|
if err != nil {
|
|
return http.StatusInternalServerError, "storage backend usage could not be checked"
|
|
}
|
|
if used+totalBytes > services.GigabytesToBytes(settings.LocalStorageMaxGB) {
|
|
return http.StatusRequestEntityTooLarge, "local storage limit reached"
|
|
}
|
|
return 0, ""
|
|
}
|
|
|
|
func uploadParseLimit(policy services.EffectiveUploadPolicy, loggedIn bool, fallback int64) int64 {
|
|
if policy.MaxUploadMB < 0 {
|
|
return -1
|
|
}
|
|
if loggedIn && policy.MaxUploadMB <= 0 {
|
|
return fallback * 8
|
|
}
|
|
if policy.MaxUploadMB > 0 {
|
|
return services.MegabytesToBytes(policy.MaxUploadMB) * 8
|
|
}
|
|
return fallback * 8
|
|
}
|
|
|
|
func uploadClientIP(r *http.Request) string {
|
|
if ip, ok := services.ClientIPFromContext(r); ok {
|
|
return ip
|
|
}
|
|
return services.ClientIP(r.RemoteAddr, r.Header.Get("X-Forwarded-For"), r.Header.Get("X-Real-IP"), nil)
|
|
}
|
|
|
|
func uploadRateKey(r *http.Request, user services.User, loggedIn bool) string {
|
|
if loggedIn {
|
|
return "user:" + user.ID
|
|
}
|
|
return "ip:" + uploadClientIP(r)
|
|
}
|
|
|
|
func totalUploadBytes(files []*multipart.FileHeader) int64 {
|
|
var total int64
|
|
for _, file := range files {
|
|
total += file.Size
|
|
}
|
|
return total
|
|
}
|
|
|
|
func parseInt(value string) int {
|
|
if value == "" {
|
|
return 0
|
|
}
|
|
parsed, err := strconv.Atoi(value)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return parsed
|
|
}
|
|
|
|
func statusForDownloadError(err error) int {
|
|
if errors.Is(err, http.ErrMissingFile) {
|
|
return http.StatusNotFound
|
|
}
|
|
return http.StatusForbidden
|
|
}
|
|
|
|
func uploadFiles(r *http.Request) []*multipart.FileHeader {
|
|
if r.MultipartForm == nil {
|
|
return nil
|
|
}
|
|
files := make([]*multipart.FileHeader, 0)
|
|
files = append(files, r.MultipartForm.File["file"]...)
|
|
files = append(files, r.MultipartForm.File["sharex"]...)
|
|
return files
|
|
}
|
|
|
|
func wantsJSON(r *http.Request) bool {
|
|
return strings.Contains(strings.ToLower(r.Header.Get("Accept")), "application/json")
|
|
}
|