feat(scraper): add checkpointing and richer page extraction
Add resumable checkpoint support so long scrapes can recover from interruptions instead of restarting from scratch. - introduce autosave/load/clear checkpoint flow in `.cache/scrape-state.json`, including SIGINT/SIGTERM save-on-exit handling - expand parsing/model output to capture legacy and portable infobox fields, primary image URLs, effects, recipes, raw tables, and improved category extraction - skip infobox tables during recipe parsing to avoid false recipe matches - add cache log event type, ignore cache/output artifacts, and document new autosave tuning options in READMEfeat(scraper): add checkpointing and richer page extraction Add resumable checkpoint support so long scrapes can recover from interruptions instead of restarting from scratch. - introduce autosave/load/clear checkpoint flow in `.cache/scrape-state.json`, including SIGINT/SIGTERM save-on-exit handling - expand parsing/model output to capture legacy and portable infobox fields, primary image URLs, effects, recipes, raw tables, and improved category extraction - skip infobox tables during recipe parsing to avoid false recipe matches - add cache log event type, ignore cache/output artifacts, and document new autosave tuning options in README
This commit is contained in:
@@ -18,31 +18,37 @@ type Scraper struct {
|
||||
cfg Config
|
||||
collector *colly.Collector
|
||||
|
||||
mu sync.Mutex
|
||||
items map[string]*model.Item
|
||||
effects map[string]*model.Effect
|
||||
queued map[string]bool
|
||||
completed int
|
||||
failed int
|
||||
retried int
|
||||
requestSeq int
|
||||
spinnerIndex int
|
||||
activeURL string
|
||||
activeSince time.Time
|
||||
lastEvent time.Time
|
||||
mu sync.Mutex
|
||||
items map[string]*model.Item
|
||||
effects map[string]*model.Effect
|
||||
queued map[string]bool
|
||||
completedURLs map[string]bool
|
||||
completed int
|
||||
failed int
|
||||
retried int
|
||||
requestSeq int
|
||||
spinnerIndex int
|
||||
activeURL string
|
||||
activeSince time.Time
|
||||
lastEvent time.Time
|
||||
}
|
||||
|
||||
func New(cfg Config) *Scraper {
|
||||
return &Scraper{
|
||||
cfg: cfg,
|
||||
items: map[string]*model.Item{},
|
||||
effects: map[string]*model.Effect{},
|
||||
queued: map[string]bool{},
|
||||
lastEvent: time.Now(),
|
||||
cfg: cfg,
|
||||
items: map[string]*model.Item{},
|
||||
effects: map[string]*model.Effect{},
|
||||
queued: map[string]bool{},
|
||||
completedURLs: map[string]bool{},
|
||||
lastEvent: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scraper) Run() (model.Dataset, error) {
|
||||
if err := s.loadCheckpoint(); err != nil {
|
||||
return model.Dataset{}, err
|
||||
}
|
||||
|
||||
s.collector = colly.NewCollector(
|
||||
colly.AllowedDomains(s.cfg.AllowedDomain),
|
||||
colly.MaxDepth(s.cfg.MaxDepth),
|
||||
@@ -66,13 +72,16 @@ func (s *Scraper) Run() (model.Dataset, error) {
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
s.startStatusLoop(done)
|
||||
s.startAutosaveLoop(done)
|
||||
|
||||
for _, seed := range append(append([]string{}, s.cfg.ItemSeeds...), s.cfg.CraftingSeeds...) {
|
||||
s.queueVisit("seed", seed)
|
||||
}
|
||||
s.resumeQueue()
|
||||
|
||||
s.collector.Wait()
|
||||
|
||||
if err := s.SaveCheckpoint("final"); err != nil {
|
||||
return model.Dataset{}, err
|
||||
}
|
||||
|
||||
return model.Dataset{
|
||||
Items: s.flattenItems(),
|
||||
Effects: s.flattenEffects(),
|
||||
@@ -225,6 +234,9 @@ func (s *Scraper) registerHandlers() {
|
||||
s.collector.OnScraped(func(r *colly.Response) {
|
||||
s.mu.Lock()
|
||||
s.completed++
|
||||
if r != nil && r.Request != nil && r.StatusCode < 400 {
|
||||
s.completedURLs[r.Request.URL.String()] = true
|
||||
}
|
||||
s.activeURL = ""
|
||||
s.activeSince = time.Time{}
|
||||
s.lastEvent = time.Now()
|
||||
@@ -233,6 +245,12 @@ func (s *Scraper) registerHandlers() {
|
||||
s.mu.Unlock()
|
||||
|
||||
logx.Eventf("done", "#%s total=%d queued=%d url=%s", r.Request.Ctx.Get("request_id"), doneCount, queueLen, r.Request.URL.String())
|
||||
|
||||
if s.cfg.AutosavePages > 0 && doneCount%s.cfg.AutosavePages == 0 {
|
||||
if err := s.SaveCheckpoint("progress"); err != nil {
|
||||
logx.Eventf("warn", "autosave failed after %d pages: %v", doneCount, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
s.collector.OnHTML(".mw-parser-output table a[href]", func(e *colly.HTMLElement) {
|
||||
@@ -301,6 +319,28 @@ func (s *Scraper) startStatusLoop(done <-chan struct{}) {
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Scraper) startAutosaveLoop(done <-chan struct{}) {
|
||||
if s.cfg.AutosaveEvery <= 0 {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
ticker := time.NewTicker(s.cfg.AutosaveEvery)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
if err := s.SaveCheckpoint("timer"); err != nil {
|
||||
logx.Eventf("warn", "autosave failed: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Scraper) queueVisit(fromURL, toURL string) {
|
||||
if toURL == "" {
|
||||
return
|
||||
@@ -311,6 +351,9 @@ func (s *Scraper) queueVisit(fromURL, toURL string) {
|
||||
case s.queued[toURL]:
|
||||
s.mu.Unlock()
|
||||
return
|
||||
case s.completedURLs[toURL]:
|
||||
s.mu.Unlock()
|
||||
return
|
||||
case len(s.queued) >= s.cfg.MaxQueuedPages:
|
||||
s.mu.Unlock()
|
||||
logx.Eventf("skip", "queue budget reached from=%s to=%s", s.debugURLName(fromURL), toURL)
|
||||
@@ -340,6 +383,52 @@ func (s *Scraper) spinnerFrame() string {
|
||||
return frame
|
||||
}
|
||||
|
||||
func (s *Scraper) resumeQueue() {
|
||||
for _, seed := range append(append([]string{}, s.cfg.ItemSeeds...), s.cfg.CraftingSeeds...) {
|
||||
s.queueSeed(seed)
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
items := make([]model.Item, 0, len(s.items))
|
||||
for _, item := range s.items {
|
||||
items = append(items, *item)
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
for _, item := range items {
|
||||
for _, effectLink := range item.EffectLinks {
|
||||
link := s.absoluteWikiURL(effectLink)
|
||||
if link == "" {
|
||||
continue
|
||||
}
|
||||
s.queueVisit(item.URL, link)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scraper) queueSeed(toURL string) {
|
||||
if toURL == "" {
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
if s.queued[toURL] {
|
||||
s.mu.Unlock()
|
||||
return
|
||||
}
|
||||
s.queued[toURL] = true
|
||||
queueLen := len(s.queued)
|
||||
s.mu.Unlock()
|
||||
|
||||
ctx := colly.NewContext()
|
||||
ctx.Put("from_url", "seed")
|
||||
|
||||
logx.Eventf("queue", "%d from=%s to=%s", queueLen, "seed", toURL)
|
||||
if err := s.collector.Request("GET", toURL, nil, ctx, nil); err != nil {
|
||||
logx.Eventf("warn", "queue failed from=%s to=%s: %v", "seed", toURL, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scraper) shouldRetry(statusCode int) bool {
|
||||
return statusCode == 0 || statusCode == 408 || statusCode == 425 || statusCode == 429 || statusCode >= 500
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user