201 lines
4.3 KiB
Go
201 lines
4.3 KiB
Go
|
|
package scraper
|
||
|
|
|
||
|
|
import (
|
||
|
|
"encoding/json"
|
||
|
|
"errors"
|
||
|
|
"os"
|
||
|
|
"path/filepath"
|
||
|
|
"sort"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"scrappr/internal/logx"
|
||
|
|
"scrappr/internal/model"
|
||
|
|
)
|
||
|
|
|
||
|
|
const checkpointVersion = 2
|
||
|
|
|
||
|
|
type checkpoint struct {
|
||
|
|
Version int `json:"version"`
|
||
|
|
SavedAt time.Time `json:"saved_at"`
|
||
|
|
Reason string `json:"reason"`
|
||
|
|
Dataset model.Dataset `json:"dataset"`
|
||
|
|
CompletedURLs []string `json:"completed_urls"`
|
||
|
|
Stats checkpointStats `json:"stats"`
|
||
|
|
}
|
||
|
|
|
||
|
|
type checkpointStats struct {
|
||
|
|
Completed int `json:"completed"`
|
||
|
|
Failed int `json:"failed"`
|
||
|
|
Retried int `json:"retried"`
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Scraper) SaveCheckpoint(reason string) error {
|
||
|
|
state := s.snapshotCheckpoint(reason)
|
||
|
|
|
||
|
|
if err := os.MkdirAll(s.cfg.CacheDir, 0o755); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
tempPath := s.cfg.CheckpointPath + ".tmp"
|
||
|
|
file, err := os.Create(tempPath)
|
||
|
|
if err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
encoder := json.NewEncoder(file)
|
||
|
|
encoder.SetIndent("", " ")
|
||
|
|
if err := encoder.Encode(state); err != nil {
|
||
|
|
file.Close()
|
||
|
|
_ = os.Remove(tempPath)
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
if err := file.Close(); err != nil {
|
||
|
|
_ = os.Remove(tempPath)
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
if err := os.Rename(tempPath, s.cfg.CheckpointPath); err != nil {
|
||
|
|
_ = os.Remove(tempPath)
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
logx.Eventf(
|
||
|
|
"cache",
|
||
|
|
"saved checkpoint (%s) items=%d effects=%d completed=%d -> %s",
|
||
|
|
reason,
|
||
|
|
len(state.Dataset.Items),
|
||
|
|
len(state.Dataset.Effects),
|
||
|
|
state.Stats.Completed,
|
||
|
|
s.cfg.CheckpointPath,
|
||
|
|
)
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Scraper) ClearCheckpoint() error {
|
||
|
|
err := os.Remove(s.cfg.CheckpointPath)
|
||
|
|
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
logx.Eventf("cache", "cleared checkpoint %s", s.cfg.CheckpointPath)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Scraper) loadCheckpoint() error {
|
||
|
|
file, err := os.Open(s.cfg.CheckpointPath)
|
||
|
|
if err != nil {
|
||
|
|
if errors.Is(err, os.ErrNotExist) {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
defer file.Close()
|
||
|
|
|
||
|
|
var state checkpoint
|
||
|
|
if err := json.NewDecoder(file).Decode(&state); err != nil {
|
||
|
|
return err
|
||
|
|
}
|
||
|
|
|
||
|
|
if state.Version != checkpointVersion {
|
||
|
|
logx.Eventf(
|
||
|
|
"cache",
|
||
|
|
"ignoring checkpoint %s with version=%d expected=%d",
|
||
|
|
s.cfg.CheckpointPath,
|
||
|
|
state.Version,
|
||
|
|
checkpointVersion,
|
||
|
|
)
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
s.mu.Lock()
|
||
|
|
defer s.mu.Unlock()
|
||
|
|
|
||
|
|
s.items = make(map[string]*model.Item, len(state.Dataset.Items))
|
||
|
|
for i := range state.Dataset.Items {
|
||
|
|
item := state.Dataset.Items[i]
|
||
|
|
s.items[item.URL] = &item
|
||
|
|
}
|
||
|
|
|
||
|
|
s.effects = make(map[string]*model.Effect, len(state.Dataset.Effects))
|
||
|
|
for i := range state.Dataset.Effects {
|
||
|
|
effect := state.Dataset.Effects[i]
|
||
|
|
s.effects[effect.URL] = &effect
|
||
|
|
}
|
||
|
|
|
||
|
|
s.completedURLs = make(map[string]bool, len(state.CompletedURLs))
|
||
|
|
for _, rawURL := range state.CompletedURLs {
|
||
|
|
if rawURL != "" {
|
||
|
|
s.completedURLs[rawURL] = true
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
s.completed = state.Stats.Completed
|
||
|
|
s.failed = state.Stats.Failed
|
||
|
|
s.retried = state.Stats.Retried
|
||
|
|
s.queued = map[string]bool{}
|
||
|
|
s.lastEvent = time.Now()
|
||
|
|
|
||
|
|
logx.Eventf(
|
||
|
|
"cache",
|
||
|
|
"loaded checkpoint from %s saved=%s items=%d effects=%d completed=%d",
|
||
|
|
s.cfg.CheckpointPath,
|
||
|
|
state.SavedAt.Format(time.RFC3339),
|
||
|
|
len(state.Dataset.Items),
|
||
|
|
len(state.Dataset.Effects),
|
||
|
|
state.Stats.Completed,
|
||
|
|
)
|
||
|
|
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Scraper) snapshotCheckpoint(reason string) checkpoint {
|
||
|
|
s.mu.Lock()
|
||
|
|
defer s.mu.Unlock()
|
||
|
|
|
||
|
|
items := make([]model.Item, 0, len(s.items))
|
||
|
|
for _, item := range s.items {
|
||
|
|
items = append(items, *item)
|
||
|
|
}
|
||
|
|
|
||
|
|
effects := make([]model.Effect, 0, len(s.effects))
|
||
|
|
for _, effect := range s.effects {
|
||
|
|
effects = append(effects, *effect)
|
||
|
|
}
|
||
|
|
|
||
|
|
completedURLs := make([]string, 0, len(s.completedURLs))
|
||
|
|
for rawURL := range s.completedURLs {
|
||
|
|
completedURLs = append(completedURLs, rawURL)
|
||
|
|
}
|
||
|
|
|
||
|
|
sort.Slice(items, func(i, j int) bool {
|
||
|
|
return strings.ToLower(items[i].Name) < strings.ToLower(items[j].Name)
|
||
|
|
})
|
||
|
|
sort.Slice(effects, func(i, j int) bool {
|
||
|
|
return strings.ToLower(effects[i].Name) < strings.ToLower(effects[j].Name)
|
||
|
|
})
|
||
|
|
sort.Strings(completedURLs)
|
||
|
|
|
||
|
|
return checkpoint{
|
||
|
|
Version: checkpointVersion,
|
||
|
|
SavedAt: time.Now(),
|
||
|
|
Reason: reason,
|
||
|
|
Dataset: model.Dataset{
|
||
|
|
Items: items,
|
||
|
|
Effects: effects,
|
||
|
|
},
|
||
|
|
CompletedURLs: completedURLs,
|
||
|
|
Stats: checkpointStats{
|
||
|
|
Completed: s.completed,
|
||
|
|
Failed: s.failed,
|
||
|
|
Retried: s.retried,
|
||
|
|
},
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
func (s *Scraper) CheckpointPath() string {
|
||
|
|
return filepath.Clean(s.cfg.CheckpointPath)
|
||
|
|
}
|