Files
warpbox-dev/backend/libs/handlers/upload.go

361 lines
14 KiB
Go
Raw Normal View History

package handlers
import (
"context"
"errors"
"fmt"
"mime/multipart"
"net/http"
"strconv"
"strings"
"time"
"warpbox.dev/backend/libs/helpers"
"warpbox.dev/backend/libs/jobs"
"warpbox.dev/backend/libs/services"
)
func (a *App) Upload(w http.ResponseWriter, r *http.Request) {
user, loggedIn, authErr := a.currentUserWithAuthError(r)
if authErr != nil {
a.logger.Warn("upload rejected invalid bearer token", "source", "user-upload", "severity", "warn", "code", 4010, "ip", uploadClientIP(r), "user_agent", r.UserAgent())
helpers.WriteJSONError(w, http.StatusUnauthorized, "invalid bearer token")
return
}
isAdminUpload := loggedIn && user.Role == services.UserRoleAdmin
settings, err := a.settingsService.UploadPolicy()
if err != nil {
a.logger.Error("failed to load upload policy", "source", "settings", "severity", "error", "code", 5005, "error", err.Error())
helpers.WriteJSONError(w, http.StatusInternalServerError, "upload policy could not be loaded")
return
}
if !loggedIn && !settings.AnonymousUploadsEnabled {
a.logger.Warn("anonymous upload rejected disabled", "source", "user-upload", "severity", "warn", "code", 4012, "ip", uploadClientIP(r))
helpers.WriteJSONError(w, http.StatusForbidden, "anonymous uploads are disabled")
return
}
effectivePolicy := a.effectiveUploadPolicy(settings, user, loggedIn)
rateKey := uploadRateKey(r, user, loggedIn)
if !isAdminUpload && !a.rateLimiter.Allow("upload:"+rateKey, effectivePolicy.ShortRequests, effectivePolicy.ShortWindow, time.Now().UTC()) {
a.logger.Warn("upload rate limited", "source", "user-upload", "severity", "warn", "code", 4290, "ip", uploadClientIP(r), "user_id", user.ID)
helpers.WriteJSONError(w, http.StatusTooManyRequests, "too many upload requests, please slow down")
return
}
parseLimit := uploadParseLimit(effectivePolicy, loggedIn, a.uploadService.MaxUploadSize())
if !isAdminUpload && parseLimit > 0 {
r.Body = http.MaxBytesReader(w, r.Body, parseLimit)
}
if isAdminUpload {
parseLimit = 32 << 20
} else if parseLimit <= 0 {
parseLimit = 32 << 20
}
if err := r.ParseMultipartForm(parseLimit); err != nil {
a.logger.Warn("upload form parse failed", "source", "user-upload", "severity", "warn", "code", 4000, "ip", uploadClientIP(r), "user_id", user.ID, "error", err.Error())
helpers.WriteJSONError(w, http.StatusBadRequest, "upload form could not be read")
return
}
files := uploadFiles(r)
totalBytes := totalUploadBytes(files)
var ownerID string
var collectionID string
if loggedIn {
ownerID = user.ID
collectionID = r.FormValue("collection_id")
if !a.authService.CollectionOwnedBy(collectionID, user.ID) {
a.logger.Warn("upload rejected invalid collection", "source", "user-upload", "severity", "warn", "code", 4030, "user_id", user.ID, "collection_id", collectionID)
helpers.WriteJSONError(w, http.StatusForbidden, "collection not found")
return
}
}
if !isAdminUpload {
if status, message := a.checkUploadPolicy(r, user, loggedIn, settings, effectivePolicy, files, totalBytes); message != "" {
a.logger.Warn("upload rejected by policy", "source", "quota", "severity", "warn", "code", status, "ip", uploadClientIP(r), "user_id", user.ID, "message", message, "bytes", totalBytes, "files", len(files))
helpers.WriteJSONError(w, status, message)
return
}
}
maxDays := parseInt(r.FormValue("max_days"))
if maxDays <= 0 {
maxDays = min(7, effectivePolicy.MaxDays)
}
if !isAdminUpload && maxDays > effectivePolicy.MaxDays {
a.logger.Warn("upload rejected expiration days", "source", "user-upload", "severity", "warn", "code", 4131, "ip", uploadClientIP(r), "user_id", user.ID, "requested_days", maxDays, "max_days", effectivePolicy.MaxDays)
helpers.WriteJSONError(w, http.StatusRequestEntityTooLarge, fmt.Sprintf("expiration cannot exceed %d days", effectivePolicy.MaxDays))
return
}
expiresMinutes := parseInt(r.FormValue("expires_minutes"))
if expiresMinutes > 0 && !isAdminUpload && expiresMinutes > effectivePolicy.MaxDays*24*60 {
a.logger.Warn("upload rejected expiration minutes", "source", "user-upload", "severity", "warn", "code", 4132, "ip", uploadClientIP(r), "user_id", user.ID, "requested_minutes", expiresMinutes, "max_days", effectivePolicy.MaxDays)
helpers.WriteJSONError(w, http.StatusRequestEntityTooLarge, fmt.Sprintf("expiration cannot exceed %d days", effectivePolicy.MaxDays))
return
}
opts := services.UploadOptions{
MaxDays: maxDays,
ExpiresInMinutes: expiresMinutes,
MaxDownloads: parseInt(r.FormValue("max_downloads")),
Password: r.FormValue("password"),
ObfuscateMetadata: r.FormValue("obfuscate_metadata") == "on",
OwnerID: ownerID,
CollectionID: collectionID,
SkipSizeLimit: isAdminUpload || effectivePolicy.MaxUploadMB < 0,
CreatorIP: uploadClientIP(r),
StorageBackendID: effectivePolicy.StorageBackendID,
}
result, boxesAdded, err := a.createOrAppendBox(r, user, loggedIn, files, opts)
if err != nil {
a.logger.Warn("upload failed", "source", "user-upload", "severity", "warn", "code", 4001, "ip", uploadClientIP(r), "user_id", user.ID, "error", err.Error())
helpers.WriteJSONError(w, http.StatusBadRequest, err.Error())
return
}
if !isAdminUpload {
if err := a.recordUploadUsage(r, user, loggedIn, totalBytes, boxesAdded); err != nil {
a.logger.Warn("failed to record upload usage", "source", "quota", "severity", "warn", "code", 4402, "error", err.Error())
}
if err := a.settingsService.CleanupUsage(time.Now().UTC(), settings.UsageRetentionDays); err != nil {
a.logger.Warn("failed to cleanup upload usage", "source", "quota", "severity", "warn", "code", 4403, "error", err.Error())
}
}
jobs.GenerateThumbnailsForBoxAsync(a.uploadService, a.logger, result.BoxID)
a.logger.Info("upload response sent", "source", "user-upload", "severity", "user_activity", "code", 2001, "ip", uploadClientIP(r), "user_id", user.ID, "box_id", result.BoxID, "files", len(files), "bytes", totalBytes, "admin", isAdminUpload)
if wantsJSON(r) {
helpers.WriteJSON(w, http.StatusCreated, result)
return
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
w.WriteHeader(http.StatusCreated)
_, _ = fmt.Fprintln(w, result.BoxURL)
}
// createOrAppendBox creates a new box. It only ever appends to an existing box
// when the request opts in via the X-Warpbox-Batch header: requests sharing the
// same batch value (per account, or per IP for anonymous) within
// uploadGroupWindow are folded into one box. Without the header the behaviour is
// identical to creating a fresh box every time. Returns the result and how many
// boxes were created (1 for a new box, 0 for an append) for usage accounting.
func (a *App) createOrAppendBox(r *http.Request, user services.User, loggedIn bool, files []*multipart.FileHeader, opts services.UploadOptions) (services.UploadResult, int, error) {
batch := strings.TrimSpace(r.Header.Get(uploadBatchHeader))
if batch == "" {
result, err := a.uploadService.CreateBox(files, opts)
if err != nil {
return services.UploadResult{}, 0, err
}
return result, 1, nil
}
// Group key is scoped to the uploader so batches never cross accounts/IPs.
identity := "ip:" + uploadClientIP(r)
if loggedIn {
identity = "user:" + user.ID
}
entry := a.uploadGroups.entryFor(identity + "|" + batch)
// Hold the per-key lock across the whole create/append so concurrent batched
// uploads serialise into the same box instead of racing.
entry.mu.Lock()
defer entry.mu.Unlock()
if entry.boxID != "" && time.Since(entry.at) < uploadGroupWindow {
if box, err := a.uploadService.GetBox(entry.boxID); err == nil && a.batchBoxMatches(box, user, loggedIn, r) && a.uploadService.CanDownload(box) == nil {
if result, err := a.uploadService.AppendFiles(entry.boxID, files, opts); err == nil {
// Re-attach the manage/delete URLs from the box's creation so every
// upload in the batch returns a working deletion URL.
result.ManageURL = entry.manageURL
result.DeleteURL = entry.deleteURL
entry.at = time.Now()
return result, 0, nil
}
}
}
result, err := a.uploadService.CreateBox(files, opts)
if err != nil {
return services.UploadResult{}, 0, err
}
entry.boxID = result.BoxID
entry.manageURL = result.ManageURL
entry.deleteURL = result.DeleteURL
entry.at = time.Now()
return result, 1, nil
}
// batchBoxMatches guards that a batched append only ever touches a box owned by
// the same uploader (account for logged-in users, creator IP for anonymous).
func (a *App) batchBoxMatches(box services.Box, user services.User, loggedIn bool, r *http.Request) bool {
if loggedIn {
return box.OwnerID == user.ID
}
return box.OwnerID == "" && box.CreatorIP == uploadClientIP(r)
}
func (a *App) checkUploadPolicy(r *http.Request, user services.User, loggedIn bool, settings services.UploadPolicySettings, policy services.EffectiveUploadPolicy, files []*multipart.FileHeader, totalBytes int64) (int, string) {
if len(files) == 0 {
return 0, ""
}
now := time.Now().UTC()
if policy.MaxUploadMB > 0 {
maxBytes := services.MegabytesToBytes(policy.MaxUploadMB)
for _, file := range files {
if file.Size > maxBytes {
return http.StatusRequestEntityTooLarge, "file exceeds upload size limit"
}
}
}
if !loggedIn {
usage, err := a.settingsService.UsageForIP(uploadClientIP(r), now)
if err != nil {
return http.StatusInternalServerError, "upload usage could not be checked"
}
if policy.DailyUploadMB > 0 && usage.UploadedBytes+totalBytes > services.MegabytesToBytes(policy.DailyUploadMB) {
return http.StatusTooManyRequests, "anonymous daily upload limit reached"
}
if usage.UploadedBoxes+1 > policy.DailyBoxes {
return http.StatusTooManyRequests, "anonymous daily box limit reached"
}
activeBoxes, err := a.uploadService.ActiveBoxCountForIP(uploadClientIP(r))
if err != nil {
return http.StatusInternalServerError, "active box limit could not be checked"
}
if activeBoxes+1 > policy.ActiveBoxes {
return http.StatusTooManyRequests, "anonymous active box limit reached"
}
if status, message := a.checkStorageBackendCapacity(policy.StorageBackendID, settings, totalBytes); message != "" {
return status, message
}
return 0, ""
}
usage, err := a.settingsService.UsageForUser(user.ID, now)
if err != nil {
return http.StatusInternalServerError, "upload usage could not be checked"
}
if policy.DailyUploadMB > 0 && usage.UploadedBytes+totalBytes > services.MegabytesToBytes(policy.DailyUploadMB) {
return http.StatusTooManyRequests, "daily upload limit reached"
}
if usage.UploadedBoxes+1 > policy.DailyBoxes {
return http.StatusTooManyRequests, "daily box limit reached"
}
activeBoxes, err := a.uploadService.ActiveBoxCountForUser(user.ID)
if err != nil {
return http.StatusInternalServerError, "active box limit could not be checked"
}
if activeBoxes+1 > policy.ActiveBoxes {
return http.StatusTooManyRequests, "active box limit reached"
}
activeStorage, err := a.uploadService.UserActiveStorageUsed(user.ID)
if err != nil {
return http.StatusInternalServerError, "storage quota could not be checked"
}
if policy.StorageQuotaSet && activeStorage+totalBytes > services.MegabytesToBytes(policy.StorageQuotaMB) {
return http.StatusRequestEntityTooLarge, "storage quota reached"
}
if status, message := a.checkStorageBackendCapacity(policy.StorageBackendID, settings, totalBytes); message != "" {
return status, message
}
return 0, ""
}
func (a *App) recordUploadUsage(r *http.Request, user services.User, loggedIn bool, totalBytes int64, boxes int) error {
now := time.Now().UTC()
if loggedIn {
return a.settingsService.AddUploadUsage("user", user.ID, totalBytes, boxes, now)
}
return a.settingsService.AddUploadUsage("ip", uploadClientIP(r), totalBytes, boxes, now)
}
func (a *App) effectiveUploadPolicy(settings services.UploadPolicySettings, user services.User, loggedIn bool) services.EffectiveUploadPolicy {
if loggedIn {
return a.settingsService.EffectivePolicyForUser(settings, user)
}
return a.settingsService.EffectivePolicyForAnonymous(settings)
}
func (a *App) checkStorageBackendCapacity(backendID string, settings services.UploadPolicySettings, totalBytes int64) (int, string) {
if backendID != services.StorageBackendLocal {
return 0, ""
}
backend, err := a.uploadService.Storage().Backend(services.StorageBackendLocal)
if err != nil {
return http.StatusInternalServerError, "storage backend could not be checked"
}
used, err := backend.Usage(context.Background())
if err != nil {
return http.StatusInternalServerError, "storage backend usage could not be checked"
}
if used+totalBytes > services.GigabytesToBytes(settings.LocalStorageMaxGB) {
return http.StatusRequestEntityTooLarge, "local storage limit reached"
}
return 0, ""
}
func uploadParseLimit(policy services.EffectiveUploadPolicy, loggedIn bool, fallback int64) int64 {
if policy.MaxUploadMB < 0 {
return -1
}
if loggedIn && policy.MaxUploadMB <= 0 {
return fallback * 8
}
if policy.MaxUploadMB > 0 {
return services.MegabytesToBytes(policy.MaxUploadMB) * 8
}
return fallback * 8
}
func uploadClientIP(r *http.Request) string {
if ip, ok := services.ClientIPFromContext(r); ok {
return ip
}
return services.ClientIP(r.RemoteAddr, r.Header.Get("X-Forwarded-For"), r.Header.Get("X-Real-IP"), nil)
}
func uploadRateKey(r *http.Request, user services.User, loggedIn bool) string {
if loggedIn {
return "user:" + user.ID
}
return "ip:" + uploadClientIP(r)
}
func totalUploadBytes(files []*multipart.FileHeader) int64 {
var total int64
for _, file := range files {
total += file.Size
}
return total
}
func parseInt(value string) int {
if value == "" {
return 0
}
parsed, err := strconv.Atoi(value)
if err != nil {
return 0
}
return parsed
}
func statusForDownloadError(err error) int {
if errors.Is(err, http.ErrMissingFile) {
return http.StatusNotFound
}
return http.StatusForbidden
}
func uploadFiles(r *http.Request) []*multipart.FileHeader {
if r.MultipartForm == nil {
return nil
}
files := make([]*multipart.FileHeader, 0)
files = append(files, r.MultipartForm.File["file"]...)
files = append(files, r.MultipartForm.File["sharex"]...)
return files
}
func wantsJSON(r *http.Request) bool {
return strings.Contains(strings.ToLower(r.Header.Get("Accept")), "application/json")
}