perf(backend): optimize ban lookups and prune upload group map
- Optimize the ban matching middleware by using a read-only transaction (`db.View`) for the initial scan, avoiding the single bbolt write lock on every request when no ban matches. - Implement periodic pruning of stale entries in the upload grouper map to prevent unbounded memory growth over time. - Avoid redundant parsing of the `max_days` form value in the upload handler.
This commit is contained in:
@@ -80,7 +80,8 @@ func (a *App) Upload(w http.ResponseWriter, r *http.Request) {
|
|||||||
// Unlimited expiry: admins, or users whose effective MaxDays is negative.
|
// Unlimited expiry: admins, or users whose effective MaxDays is negative.
|
||||||
unlimitedExpiry := isAdminUpload || effectivePolicy.MaxDays < 0
|
unlimitedExpiry := isAdminUpload || effectivePolicy.MaxDays < 0
|
||||||
|
|
||||||
maxDays := parseInt(r.FormValue("max_days"))
|
rawMaxDays := parseInt(r.FormValue("max_days"))
|
||||||
|
maxDays := rawMaxDays
|
||||||
if maxDays <= 0 {
|
if maxDays <= 0 {
|
||||||
maxDays = 7
|
maxDays = 7
|
||||||
if effectivePolicy.MaxDays > 0 && effectivePolicy.MaxDays < maxDays {
|
if effectivePolicy.MaxDays > 0 && effectivePolicy.MaxDays < maxDays {
|
||||||
@@ -96,7 +97,7 @@ func (a *App) Upload(w http.ResponseWriter, r *http.Request) {
|
|||||||
expiresMinutes := parseInt(r.FormValue("expires_minutes"))
|
expiresMinutes := parseInt(r.FormValue("expires_minutes"))
|
||||||
// A negative expires_minutes (or max_days) is the "never expires" request.
|
// A negative expires_minutes (or max_days) is the "never expires" request.
|
||||||
// Only honour it for unlimited uploaders; otherwise it's an invalid value.
|
// Only honour it for unlimited uploaders; otherwise it's an invalid value.
|
||||||
if expiresMinutes < 0 || parseInt(r.FormValue("max_days")) < 0 {
|
if expiresMinutes < 0 || rawMaxDays < 0 {
|
||||||
if !unlimitedExpiry {
|
if !unlimitedExpiry {
|
||||||
a.logger.Warn("upload rejected unlimited expiration", "source", "user-upload", "severity", "warn", "code", 4133, "ip", uploadClientIP(r), "user_id", user.ID)
|
a.logger.Warn("upload rejected unlimited expiration", "source", "user-upload", "severity", "warn", "code", 4133, "ip", uploadClientIP(r), "user_id", user.ID)
|
||||||
helpers.WriteJSONError(w, http.StatusRequestEntityTooLarge, fmt.Sprintf("expiration cannot exceed %d days", effectivePolicy.MaxDays))
|
helpers.WriteJSONError(w, http.StatusRequestEntityTooLarge, fmt.Sprintf("expiration cannot exceed %d days", effectivePolicy.MaxDays))
|
||||||
|
|||||||
@@ -16,6 +16,10 @@ const uploadGroupWindow = 20 * time.Second
|
|||||||
// value (per account/IP) within uploadGroupWindow are grouped into one box.
|
// value (per account/IP) within uploadGroupWindow are grouped into one box.
|
||||||
const uploadBatchHeader = "X-Warpbox-Batch"
|
const uploadBatchHeader = "X-Warpbox-Batch"
|
||||||
|
|
||||||
|
// uploadGroupPruneInterval is how often entryFor drops stale entries so the map
|
||||||
|
// can't grow without bound (one key per account/IP + batch value otherwise).
|
||||||
|
const uploadGroupPruneInterval = 5 * time.Minute
|
||||||
|
|
||||||
// uploadGrouper tracks the most recent box per batch key so opt-in batched
|
// uploadGrouper tracks the most recent box per batch key so opt-in batched
|
||||||
// uploads land in a single box. Each key has its own lock, which also serialises
|
// uploads land in a single box. Each key has its own lock, which also serialises
|
||||||
// that key's concurrent uploads so they append to the same box instead of racing
|
// that key's concurrent uploads so they append to the same box instead of racing
|
||||||
@@ -23,6 +27,7 @@ const uploadBatchHeader = "X-Warpbox-Batch"
|
|||||||
type uploadGrouper struct {
|
type uploadGrouper struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
entries map[string]*uploadGroupEntry
|
entries map[string]*uploadGroupEntry
|
||||||
|
lastPrune time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type uploadGroupEntry struct {
|
type uploadGroupEntry struct {
|
||||||
@@ -40,6 +45,7 @@ func newUploadGrouper() *uploadGrouper {
|
|||||||
func (g *uploadGrouper) entryFor(key string) *uploadGroupEntry {
|
func (g *uploadGrouper) entryFor(key string) *uploadGroupEntry {
|
||||||
g.mu.Lock()
|
g.mu.Lock()
|
||||||
defer g.mu.Unlock()
|
defer g.mu.Unlock()
|
||||||
|
g.pruneLocked(time.Now())
|
||||||
entry, ok := g.entries[key]
|
entry, ok := g.entries[key]
|
||||||
if !ok {
|
if !ok {
|
||||||
entry = &uploadGroupEntry{}
|
entry = &uploadGroupEntry{}
|
||||||
@@ -47,3 +53,24 @@ func (g *uploadGrouper) entryFor(key string) *uploadGroupEntry {
|
|||||||
}
|
}
|
||||||
return entry
|
return entry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pruneLocked drops entries whose last use is well past the grouping window so
|
||||||
|
// the map stays bounded to recently-active keys. Callers must hold g.mu. Entries
|
||||||
|
// currently in use, or freshly created but not yet used (zero timestamp), are
|
||||||
|
// kept to avoid removing one a request is about to populate.
|
||||||
|
func (g *uploadGrouper) pruneLocked(now time.Time) {
|
||||||
|
if now.Sub(g.lastPrune) < uploadGroupPruneInterval {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
g.lastPrune = now
|
||||||
|
for key, entry := range g.entries {
|
||||||
|
if !entry.mu.TryLock() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
stale := !entry.at.IsZero() && now.Sub(entry.at) > 2*uploadGroupWindow
|
||||||
|
entry.mu.Unlock()
|
||||||
|
if stale {
|
||||||
|
delete(g.entries, key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -312,7 +312,11 @@ func (s *BanService) Match(ip string, now time.Time) (MatchedBan, bool, error) {
|
|||||||
}
|
}
|
||||||
now = now.UTC()
|
now = now.UTC()
|
||||||
var matched BanRecord
|
var matched BanRecord
|
||||||
err := s.db.Update(func(tx *bbolt.Tx) error {
|
var matchedKey []byte
|
||||||
|
// Read-only scan first: the common case (no match) only takes a concurrent
|
||||||
|
// read transaction, instead of grabbing the single bbolt write lock on every
|
||||||
|
// request that flows through the ban middleware.
|
||||||
|
err := s.db.View(func(tx *bbolt.Tx) error {
|
||||||
bucket := tx.Bucket(bansBucket)
|
bucket := tx.Bucket(bansBucket)
|
||||||
return bucket.ForEach(func(key, value []byte) error {
|
return bucket.ForEach(func(key, value []byte) error {
|
||||||
if matched.ID != "" {
|
if matched.ID != "" {
|
||||||
@@ -325,20 +329,37 @@ func (s *BanService) Match(ip string, now time.Time) (MatchedBan, bool, error) {
|
|||||||
if !record.Active(now) || !banTargetMatches(record.Normalized, parsed) {
|
if !record.Active(now) || !banTargetMatches(record.Normalized, parsed) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
matched = record
|
||||||
|
matchedKey = append([]byte(nil), key...) // key bytes are only valid within the txn
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
})
|
||||||
|
if err != nil || matched.ID == "" {
|
||||||
|
return MatchedBan{Ban: matched, IP: ip}, matched.ID != "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// On a hit, record the match time in a short write transaction.
|
||||||
|
matched.LastMatchedAt = &now
|
||||||
|
matched.UpdatedAt = now
|
||||||
|
_ = s.db.Update(func(tx *bbolt.Tx) error {
|
||||||
|
bucket := tx.Bucket(bansBucket)
|
||||||
|
data := bucket.Get(matchedKey)
|
||||||
|
if data == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var record BanRecord
|
||||||
|
if err := json.Unmarshal(data, &record); err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
record.LastMatchedAt = &now
|
record.LastMatchedAt = &now
|
||||||
record.UpdatedAt = now
|
record.UpdatedAt = now
|
||||||
next, err := json.Marshal(record)
|
next, err := json.Marshal(record)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := bucket.Put(key, next); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
matched = record
|
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
|
return bucket.Put(matchedKey, next)
|
||||||
})
|
})
|
||||||
})
|
return MatchedBan{Ban: matched, IP: ip}, true, nil
|
||||||
return MatchedBan{Ban: matched, IP: ip}, matched.ID != "", err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r BanRecord) Active(now time.Time) bool {
|
func (r BanRecord) Active(now time.Time) bool {
|
||||||
|
|||||||
Reference in New Issue
Block a user