Cleanup Job
This commit is contained in:
@@ -3,14 +3,18 @@ package state
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
maxActivityLogEntries = 400
|
||||
adminLogBroadcastLimit = 200
|
||||
staleRoomCleanupInterval = 5 * time.Minute
|
||||
staleRoomTTL = 30 * time.Minute
|
||||
)
|
||||
|
||||
type Manager struct {
|
||||
@@ -33,10 +37,70 @@ func NewManager(dataPath string) (*Manager, error) {
|
||||
if loadErr := manager.loadFromDisk(); loadErr != nil {
|
||||
return nil, loadErr
|
||||
}
|
||||
manager.startCleanupLoop()
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
func (m *Manager) startCleanupLoop() {
|
||||
ticker := time.NewTicker(staleRoomCleanupInterval)
|
||||
|
||||
go func() {
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
m.cleanupStaleRooms(nowUTC())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (m *Manager) cleanupStaleRooms(now time.Time) {
|
||||
m.mu.RLock()
|
||||
rooms := make([]*Room, 0, len(m.rooms))
|
||||
for _, room := range m.rooms {
|
||||
rooms = append(rooms, room)
|
||||
}
|
||||
m.mu.RUnlock()
|
||||
|
||||
for _, room := range rooms {
|
||||
room.mu.Lock()
|
||||
roomID := room.ID
|
||||
hasConnected := hasConnectedParticipantsLocked(room)
|
||||
recentlyActive := now.Sub(room.UpdatedAt) < staleRoomTTL
|
||||
hasSubscribers := len(room.subscribers) > 0
|
||||
room.mu.Unlock()
|
||||
|
||||
if hasConnected || recentlyActive || hasSubscribers {
|
||||
continue
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
current, ok := m.rooms[roomID]
|
||||
if !ok || current != room {
|
||||
m.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
current.mu.Lock()
|
||||
if hasConnectedParticipantsLocked(current) || now.Sub(current.UpdatedAt) < staleRoomTTL || len(current.subscribers) > 0 {
|
||||
current.mu.Unlock()
|
||||
m.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
delete(m.rooms, roomID)
|
||||
current.mu.Unlock()
|
||||
m.mu.Unlock()
|
||||
|
||||
if err := m.store.Delete(roomID); err != nil {
|
||||
log.Printf("failed to delete stale room %s: %v", roomID, err)
|
||||
m.mu.Lock()
|
||||
if _, exists := m.rooms[roomID]; !exists {
|
||||
m.rooms[roomID] = room
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) CreateRoom(input CreateRoomInput) (CreateRoomResult, error) {
|
||||
roomName := normalizeName(input.RoomName, 80)
|
||||
creatorUsername := normalizeName(input.CreatorUsername, 32)
|
||||
@@ -744,3 +808,13 @@ func (m *Manager) disconnectParticipantLocked(room *Room, participant *Participa
|
||||
participant.UpdatedAt = nowUTC()
|
||||
room.UpdatedAt = nowUTC()
|
||||
}
|
||||
|
||||
func hasConnectedParticipantsLocked(room *Room) bool {
|
||||
for _, participant := range room.Participants {
|
||||
if participant.Connected {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -75,3 +75,20 @@ func (ds *DiskStore) LoadAll() ([]persistedRoom, error) {
|
||||
|
||||
return rooms, nil
|
||||
}
|
||||
|
||||
func (ds *DiskStore) Delete(roomID string) error {
|
||||
ds.mu.Lock()
|
||||
defer ds.mu.Unlock()
|
||||
|
||||
finalPath := filepath.Join(ds.dataPath, roomID+".json")
|
||||
if err := os.Remove(finalPath); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return err
|
||||
}
|
||||
|
||||
tmpPath := finalPath + ".tmp"
|
||||
if err := os.Remove(tmpPath); err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user