Initial COmmit
This commit is contained in:
411
internal/scraper/scraper.go
Normal file
411
internal/scraper/scraper.go
Normal file
@@ -0,0 +1,411 @@
|
||||
package scraper
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/PuerkitoBio/goquery"
|
||||
"github.com/gocolly/colly/v2"
|
||||
|
||||
"scrappr/internal/logx"
|
||||
"scrappr/internal/model"
|
||||
)
|
||||
|
||||
type Scraper struct {
|
||||
cfg Config
|
||||
collector *colly.Collector
|
||||
|
||||
mu sync.Mutex
|
||||
items map[string]*model.Item
|
||||
effects map[string]*model.Effect
|
||||
queued map[string]bool
|
||||
completed int
|
||||
failed int
|
||||
retried int
|
||||
requestSeq int
|
||||
spinnerIndex int
|
||||
activeURL string
|
||||
activeSince time.Time
|
||||
lastEvent time.Time
|
||||
}
|
||||
|
||||
func New(cfg Config) *Scraper {
|
||||
return &Scraper{
|
||||
cfg: cfg,
|
||||
items: map[string]*model.Item{},
|
||||
effects: map[string]*model.Effect{},
|
||||
queued: map[string]bool{},
|
||||
lastEvent: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scraper) Run() (model.Dataset, error) {
|
||||
s.collector = colly.NewCollector(
|
||||
colly.AllowedDomains(s.cfg.AllowedDomain),
|
||||
colly.MaxDepth(s.cfg.MaxDepth),
|
||||
colly.Async(true),
|
||||
)
|
||||
|
||||
s.collector.SetRequestTimeout(s.cfg.RequestTimeout)
|
||||
s.collector.ParseHTTPErrorResponse = true
|
||||
|
||||
if err := s.collector.Limit(&colly.LimitRule{
|
||||
DomainGlob: "*" + s.cfg.AllowedDomain + "*",
|
||||
Parallelism: 1,
|
||||
Delay: s.cfg.RequestDelay,
|
||||
RandomDelay: s.cfg.RequestJitter,
|
||||
}); err != nil {
|
||||
return model.Dataset{}, err
|
||||
}
|
||||
|
||||
s.registerHandlers()
|
||||
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
s.startStatusLoop(done)
|
||||
|
||||
for _, seed := range append(append([]string{}, s.cfg.ItemSeeds...), s.cfg.CraftingSeeds...) {
|
||||
s.queueVisit("seed", seed)
|
||||
}
|
||||
|
||||
s.collector.Wait()
|
||||
|
||||
return model.Dataset{
|
||||
Items: s.flattenItems(),
|
||||
Effects: s.flattenEffects(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Scraper) registerHandlers() {
|
||||
s.collector.OnRequest(func(r *colly.Request) {
|
||||
s.applyBrowserHeaders(r)
|
||||
|
||||
startedAt := time.Now()
|
||||
fromURL := r.Ctx.Get("from_url")
|
||||
if fromURL == "" {
|
||||
fromURL = "seed"
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
s.requestSeq++
|
||||
reqID := s.requestSeq
|
||||
s.activeURL = r.URL.String()
|
||||
s.activeSince = startedAt
|
||||
s.lastEvent = startedAt
|
||||
s.mu.Unlock()
|
||||
|
||||
r.Ctx.Put("request_id", strconv.Itoa(reqID))
|
||||
r.Ctx.Put("started_at_unix_nano", strconv.FormatInt(startedAt.UnixNano(), 10))
|
||||
|
||||
logx.Eventf(
|
||||
"visit",
|
||||
"#%d depth=%d attempt=%d from=%s to=%s",
|
||||
reqID,
|
||||
r.Depth,
|
||||
s.retryAttempt(r.Ctx)+1,
|
||||
s.debugURLName(fromURL),
|
||||
r.URL.String(),
|
||||
)
|
||||
})
|
||||
|
||||
s.collector.OnError(func(r *colly.Response, err error) {
|
||||
if r == nil || r.Request == nil {
|
||||
logx.Eventf("error", "request failed before response: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
duration := s.requestDuration(r.Request)
|
||||
|
||||
s.mu.Lock()
|
||||
s.failed++
|
||||
s.activeURL = ""
|
||||
s.activeSince = time.Time{}
|
||||
s.lastEvent = time.Now()
|
||||
s.mu.Unlock()
|
||||
|
||||
logx.Eventf(
|
||||
"error",
|
||||
"#%s status=%d after=%s url=%s: %v",
|
||||
r.Request.Ctx.Get("request_id"),
|
||||
r.StatusCode,
|
||||
s.durationString(duration),
|
||||
r.Request.URL.String(),
|
||||
err,
|
||||
)
|
||||
|
||||
if s.shouldRetry(r.StatusCode) {
|
||||
s.mu.Lock()
|
||||
s.retried++
|
||||
s.mu.Unlock()
|
||||
s.retryRequest(r, err)
|
||||
}
|
||||
})
|
||||
|
||||
s.collector.OnResponse(func(r *colly.Response) {
|
||||
if r.StatusCode >= 400 {
|
||||
logx.Eventf("skip", "#%s status=%d url=%s", r.Request.Ctx.Get("request_id"), r.StatusCode, r.Request.URL.String())
|
||||
return
|
||||
}
|
||||
|
||||
doc, err := goquery.NewDocumentFromReader(bytes.NewReader(r.Body))
|
||||
if err != nil {
|
||||
logx.Eventf("warn", "parse error for %s: %v", r.Request.URL.String(), err)
|
||||
return
|
||||
}
|
||||
|
||||
title := s.clean(doc.Find("h1.page-header__title").First().Text())
|
||||
if title == "" {
|
||||
title = s.clean(doc.Find("h1").First().Text())
|
||||
}
|
||||
if title == "" {
|
||||
logx.Eventf("warn", "empty title for %s", r.Request.URL.String())
|
||||
return
|
||||
}
|
||||
|
||||
pageURL := r.Request.URL.String()
|
||||
pagePath := r.Request.URL.Path
|
||||
categories := s.parseCategories(doc)
|
||||
lcCats := s.lowerSlice(categories)
|
||||
pageKind := s.pageKindForPath(pagePath)
|
||||
|
||||
switch {
|
||||
case s.isEffectPage(pagePath, title, lcCats):
|
||||
effect := s.parseEffectPage(doc, title, pageURL, categories)
|
||||
if effect.Name != "" {
|
||||
s.mu.Lock()
|
||||
s.effects[effect.URL] = &effect
|
||||
s.mu.Unlock()
|
||||
|
||||
pageKind = "effect"
|
||||
logx.Eventf("parsed", "#%s effect=%q", r.Request.Ctx.Get("request_id"), effect.Name)
|
||||
}
|
||||
case s.isLikelyItemPage(pagePath, title, lcCats, doc):
|
||||
item := s.parseItemPage(doc, title, pageURL, categories)
|
||||
|
||||
s.mu.Lock()
|
||||
s.items[item.URL] = &item
|
||||
s.mu.Unlock()
|
||||
|
||||
pageKind = "item"
|
||||
logx.Eventf(
|
||||
"parsed",
|
||||
"#%s item=%q effects=%d recipes=%d",
|
||||
r.Request.Ctx.Get("request_id"),
|
||||
item.Name,
|
||||
len(item.Effects),
|
||||
len(item.Recipes),
|
||||
)
|
||||
|
||||
for _, effectLink := range item.EffectLinks {
|
||||
link := s.absoluteWikiURL(effectLink)
|
||||
if link == "" {
|
||||
continue
|
||||
}
|
||||
s.queueVisit(pageURL, link)
|
||||
}
|
||||
default:
|
||||
logx.Eventf("skip", "#%s page=%q kind=%s", r.Request.Ctx.Get("request_id"), title, pageKind)
|
||||
}
|
||||
|
||||
logx.Eventf(
|
||||
"recv",
|
||||
"#%s status=%d bytes=%d kind=%s title=%q after=%s",
|
||||
r.Request.Ctx.Get("request_id"),
|
||||
r.StatusCode,
|
||||
len(r.Body),
|
||||
pageKind,
|
||||
title,
|
||||
s.durationString(s.requestDuration(r.Request)),
|
||||
)
|
||||
})
|
||||
|
||||
s.collector.OnScraped(func(r *colly.Response) {
|
||||
s.mu.Lock()
|
||||
s.completed++
|
||||
s.activeURL = ""
|
||||
s.activeSince = time.Time{}
|
||||
s.lastEvent = time.Now()
|
||||
doneCount := s.completed
|
||||
queueLen := len(s.queued)
|
||||
s.mu.Unlock()
|
||||
|
||||
logx.Eventf("done", "#%s total=%d queued=%d url=%s", r.Request.Ctx.Get("request_id"), doneCount, queueLen, r.Request.URL.String())
|
||||
})
|
||||
|
||||
s.collector.OnHTML(".mw-parser-output table a[href]", func(e *colly.HTMLElement) {
|
||||
href := e.Attr("href")
|
||||
link := e.Request.AbsoluteURL(href)
|
||||
if !s.shouldVisit(link) {
|
||||
return
|
||||
}
|
||||
|
||||
if s.shouldQueueFromPage(e.Request.URL.Path, link) && s.shouldQueueTableLink(e) {
|
||||
s.queueVisit(e.Request.URL.String(), link)
|
||||
return
|
||||
}
|
||||
|
||||
if s.looksLikeEffectLink(href, e.Text) {
|
||||
s.queueVisit(e.Request.URL.String(), link)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Scraper) startStatusLoop(done <-chan struct{}) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(s.cfg.ProgressEvery)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
s.mu.Lock()
|
||||
queueLen := len(s.queued)
|
||||
itemLen := len(s.items)
|
||||
effectLen := len(s.effects)
|
||||
currentURL := s.activeURL
|
||||
currentName := "-"
|
||||
currentFor := s.durationString(time.Since(s.activeSince))
|
||||
idleFor := s.durationString(time.Since(s.lastEvent))
|
||||
completedCount := s.completed
|
||||
failedCount := s.failed
|
||||
retriedCount := s.retried
|
||||
frame := s.spinnerFrame()
|
||||
s.mu.Unlock()
|
||||
|
||||
if currentURL == "" {
|
||||
currentFor = "-"
|
||||
} else {
|
||||
currentName = s.debugURLName(currentURL)
|
||||
}
|
||||
|
||||
logx.Statusf(
|
||||
frame,
|
||||
"queued=%d completed=%d failed=%d retries=%d items=%d effects=%d active=%s active_for=%s idle=%s",
|
||||
queueLen,
|
||||
completedCount,
|
||||
failedCount,
|
||||
retriedCount,
|
||||
itemLen,
|
||||
effectLen,
|
||||
currentName,
|
||||
currentFor,
|
||||
idleFor,
|
||||
)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Scraper) queueVisit(fromURL, toURL string) {
|
||||
if toURL == "" {
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
switch {
|
||||
case s.queued[toURL]:
|
||||
s.mu.Unlock()
|
||||
return
|
||||
case len(s.queued) >= s.cfg.MaxQueuedPages:
|
||||
s.mu.Unlock()
|
||||
logx.Eventf("skip", "queue budget reached from=%s to=%s", s.debugURLName(fromURL), toURL)
|
||||
return
|
||||
default:
|
||||
s.queued[toURL] = true
|
||||
queueLen := len(s.queued)
|
||||
s.mu.Unlock()
|
||||
|
||||
ctx := colly.NewContext()
|
||||
ctx.Put("from_url", fromURL)
|
||||
|
||||
logx.Eventf("queue", "%d from=%s to=%s", queueLen, s.debugURLName(fromURL), toURL)
|
||||
if err := s.collector.Request("GET", toURL, nil, ctx, nil); err != nil {
|
||||
logx.Eventf("warn", "queue failed from=%s to=%s: %v", s.debugURLName(fromURL), toURL, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scraper) spinnerFrame() string {
|
||||
if len(s.cfg.SpinnerFrames) == 0 {
|
||||
return "🌀"
|
||||
}
|
||||
|
||||
frame := s.cfg.SpinnerFrames[s.spinnerIndex%len(s.cfg.SpinnerFrames)]
|
||||
s.spinnerIndex++
|
||||
return frame
|
||||
}
|
||||
|
||||
func (s *Scraper) shouldRetry(statusCode int) bool {
|
||||
return statusCode == 0 || statusCode == 408 || statusCode == 425 || statusCode == 429 || statusCode >= 500
|
||||
}
|
||||
|
||||
func (s *Scraper) retryRequest(r *colly.Response, err error) {
|
||||
attempt := s.retryAttempt(r.Request.Ctx)
|
||||
|
||||
if attempt >= s.cfg.MaxRetries {
|
||||
logx.Eventf("giveup", "url=%s attempts=%d: %v", r.Request.URL.String(), attempt, err)
|
||||
return
|
||||
}
|
||||
|
||||
attempt++
|
||||
r.Request.Ctx.Put("retry_count", strconv.Itoa(attempt))
|
||||
|
||||
wait := s.retryDelay(attempt)
|
||||
logx.Eventf("retry", "%d/%d %s after %s", attempt, s.cfg.MaxRetries, r.Request.URL.String(), wait)
|
||||
time.Sleep(wait)
|
||||
|
||||
if retryErr := r.Request.Retry(); retryErr != nil {
|
||||
logx.Eventf("error", "retry failed for %s: %v (original error: %v)", r.Request.URL.String(), retryErr, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scraper) retryAttempt(ctx *colly.Context) int {
|
||||
attempt := 0
|
||||
if raw := ctx.Get("retry_count"); raw != "" {
|
||||
parsed, err := strconv.Atoi(raw)
|
||||
if err == nil {
|
||||
attempt = parsed
|
||||
}
|
||||
}
|
||||
return attempt
|
||||
}
|
||||
|
||||
func (s *Scraper) retryDelay(attempt int) time.Duration {
|
||||
backoff := s.cfg.RetryBaseDelay * time.Duration(1<<(attempt-1))
|
||||
return backoff + s.jitter(500*time.Millisecond)
|
||||
}
|
||||
|
||||
func (s *Scraper) jitter(max time.Duration) time.Duration {
|
||||
if max <= 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
return time.Duration(rand.Int63n(int64(max)))
|
||||
}
|
||||
|
||||
func (s *Scraper) requestDuration(r *colly.Request) time.Duration {
|
||||
raw := r.Ctx.Get("started_at_unix_nano")
|
||||
if raw == "" {
|
||||
return 0
|
||||
}
|
||||
|
||||
startedAtUnixNano, err := strconv.ParseInt(raw, 10, 64)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
|
||||
return time.Since(time.Unix(0, startedAtUnixNano))
|
||||
}
|
||||
|
||||
func (s *Scraper) durationString(d time.Duration) string {
|
||||
if d <= 0 {
|
||||
return "0s"
|
||||
}
|
||||
|
||||
return d.Round(100 * time.Millisecond).String()
|
||||
}
|
||||
Reference in New Issue
Block a user