Files
outward-scrapper/internal/scraper/checkpoint.go

201 lines
4.3 KiB
Go
Raw Normal View History

package scraper
import (
"encoding/json"
"errors"
"os"
"path/filepath"
"sort"
"strings"
"time"
"scrappr/internal/logx"
"scrappr/internal/model"
)
const checkpointVersion = 2
type checkpoint struct {
Version int `json:"version"`
SavedAt time.Time `json:"saved_at"`
Reason string `json:"reason"`
Dataset model.Dataset `json:"dataset"`
CompletedURLs []string `json:"completed_urls"`
Stats checkpointStats `json:"stats"`
}
type checkpointStats struct {
Completed int `json:"completed"`
Failed int `json:"failed"`
Retried int `json:"retried"`
}
func (s *Scraper) SaveCheckpoint(reason string) error {
state := s.snapshotCheckpoint(reason)
if err := os.MkdirAll(s.cfg.CacheDir, 0o755); err != nil {
return err
}
tempPath := s.cfg.CheckpointPath + ".tmp"
file, err := os.Create(tempPath)
if err != nil {
return err
}
encoder := json.NewEncoder(file)
encoder.SetIndent("", " ")
if err := encoder.Encode(state); err != nil {
file.Close()
_ = os.Remove(tempPath)
return err
}
if err := file.Close(); err != nil {
_ = os.Remove(tempPath)
return err
}
if err := os.Rename(tempPath, s.cfg.CheckpointPath); err != nil {
_ = os.Remove(tempPath)
return err
}
logx.Eventf(
"cache",
"saved checkpoint (%s) items=%d effects=%d completed=%d -> %s",
reason,
len(state.Dataset.Items),
len(state.Dataset.Effects),
state.Stats.Completed,
s.cfg.CheckpointPath,
)
return nil
}
func (s *Scraper) ClearCheckpoint() error {
err := os.Remove(s.cfg.CheckpointPath)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
logx.Eventf("cache", "cleared checkpoint %s", s.cfg.CheckpointPath)
return nil
}
func (s *Scraper) loadCheckpoint() error {
file, err := os.Open(s.cfg.CheckpointPath)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
}
return err
}
defer file.Close()
var state checkpoint
if err := json.NewDecoder(file).Decode(&state); err != nil {
return err
}
if state.Version != checkpointVersion {
logx.Eventf(
"cache",
"ignoring checkpoint %s with version=%d expected=%d",
s.cfg.CheckpointPath,
state.Version,
checkpointVersion,
)
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.items = make(map[string]*model.Item, len(state.Dataset.Items))
for i := range state.Dataset.Items {
item := state.Dataset.Items[i]
s.items[item.URL] = &item
}
s.effects = make(map[string]*model.Effect, len(state.Dataset.Effects))
for i := range state.Dataset.Effects {
effect := state.Dataset.Effects[i]
s.effects[effect.URL] = &effect
}
s.completedURLs = make(map[string]bool, len(state.CompletedURLs))
for _, rawURL := range state.CompletedURLs {
if rawURL != "" {
s.completedURLs[rawURL] = true
}
}
s.completed = state.Stats.Completed
s.failed = state.Stats.Failed
s.retried = state.Stats.Retried
s.queued = map[string]bool{}
s.lastEvent = time.Now()
logx.Eventf(
"cache",
"loaded checkpoint from %s saved=%s items=%d effects=%d completed=%d",
s.cfg.CheckpointPath,
state.SavedAt.Format(time.RFC3339),
len(state.Dataset.Items),
len(state.Dataset.Effects),
state.Stats.Completed,
)
return nil
}
func (s *Scraper) snapshotCheckpoint(reason string) checkpoint {
s.mu.Lock()
defer s.mu.Unlock()
items := make([]model.Item, 0, len(s.items))
for _, item := range s.items {
items = append(items, *item)
}
effects := make([]model.Effect, 0, len(s.effects))
for _, effect := range s.effects {
effects = append(effects, *effect)
}
completedURLs := make([]string, 0, len(s.completedURLs))
for rawURL := range s.completedURLs {
completedURLs = append(completedURLs, rawURL)
}
sort.Slice(items, func(i, j int) bool {
return strings.ToLower(items[i].Name) < strings.ToLower(items[j].Name)
})
sort.Slice(effects, func(i, j int) bool {
return strings.ToLower(effects[i].Name) < strings.ToLower(effects[j].Name)
})
sort.Strings(completedURLs)
return checkpoint{
Version: checkpointVersion,
SavedAt: time.Now(),
Reason: reason,
Dataset: model.Dataset{
Items: items,
Effects: effects,
},
CompletedURLs: completedURLs,
Stats: checkpointStats{
Completed: s.completed,
Failed: s.failed,
Retried: s.retried,
},
}
}
func (s *Scraper) CheckpointPath() string {
return filepath.Clean(s.cfg.CheckpointPath)
}