From 3923ff91ba0e70cc7b5edab9207e16b23d54cd71 Mon Sep 17 00:00:00 2001 From: James Mills Date: Tue, 2 Feb 2021 14:58:52 +1000 Subject: [PATCH] Add a /tasks endpoing and extra metdata to crawl tasks --- internal/crawl_task.go | 27 ++++++++++++++++++- internal/dispatcher.go | 9 +++++++ internal/server.go | 1 + .../{task_handler.go => task_handlers.go} | 16 +++++++++++ 4 files changed, 52 insertions(+), 1 deletion(-) rename internal/{task_handler.go => task_handlers.go} (67%) diff --git a/internal/crawl_task.go b/internal/crawl_task.go index 732f86e..99f6b02 100644 --- a/internal/crawl_task.go +++ b/internal/crawl_task.go @@ -31,9 +31,31 @@ func NewCrawlTask(conf *Config, db Store, indexer Indexer, url string) *CrawlTas func (t *CrawlTask) String() string { return fmt.Sprintf("%T: %s", t, t.ID()) } func (t *CrawlTask) Run() error { - defer t.Done() + var ( + nLinks int + nCrawled int + nScraped int + sTime time.Time + eTime time.Time + ) + + sTime = time.Now() t.SetState(TaskStateRunning) + defer func() { + eTime = time.Now() + + t.SetData("links", fmt.Sprintf("%d", nLinks)) + t.SetData("crawled", fmt.Sprintf("%d", nCrawled)) + t.SetData("scraped", fmt.Sprintf("%d", nScraped)) + t.SetData("start_time", sTime.String()) + t.SetData("eid_time", eTime.String()) + t.SetData("duration", fmt.Sprintf("%0.2f", eTime.Sub(sTime).Seconds())) + + t.Done() + }() + + t.SetData("url", t.url) log.Infof("starting crawl task for %s", t.url) log.Debugf("crawling %s", t.url) @@ -45,6 +67,7 @@ func (t *CrawlTask) Run() error { } for link := range links { + nLinks++ hash := HashURL(link) if t.db.HasURL(hash) { @@ -54,6 +77,7 @@ func (t *CrawlTask) Run() error { log.Debugf("found %s", link) + nCrawled++ metrics.Counter("crawler", "crawled").Inc() url := NewURL(link) @@ -74,6 +98,7 @@ func (t *CrawlTask) Run() error { log.WithError(err).Warn("error recording url %s", link) } + nScraped++ metrics.Counter("crawler", "scraped").Inc() } diff --git a/internal/dispatcher.go b/internal/dispatcher.go index 51c6f98..8474254 100644 --- a/internal/dispatcher.go +++ b/internal/dispatcher.go @@ -77,6 +77,15 @@ func (d *Dispatcher) Stop() { d.quit <- true } +// Tasks returns all tasks +func (d *Dispatcher) Tasks() map[string]TaskResult { + tasks := make(map[string]TaskResult) + for id, task := range d.taskMap { + tasks[id] = task.Result() + } + return tasks +} + // Lookup returns the matching `Task` given its id func (d *Dispatcher) Lookup(id string) (Task, bool) { task, ok := d.taskMap[id] diff --git a/internal/server.go b/internal/server.go index b7e30e3..e7c8c8e 100644 --- a/internal/server.go +++ b/internal/server.go @@ -306,6 +306,7 @@ func (s *Server) initRoutes() { s.router.POST("/chpasswd", s.NewPasswordHandler()) // Task State + s.router.GET("/tasks", s.TasksHandler()) s.router.GET("/task/:uuid", s.TaskHandler()) s.router.GET("/add", s.AddHandler()) diff --git a/internal/task_handler.go b/internal/task_handlers.go similarity index 67% rename from internal/task_handler.go rename to internal/task_handlers.go index 65bf75e..fdb3d2e 100644 --- a/internal/task_handler.go +++ b/internal/task_handlers.go @@ -8,6 +8,22 @@ import ( log "github.com/sirupsen/logrus" ) +// TasksHandler ... +func (s *Server) TasksHandler() httprouter.Handle { + return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + tasks := s.tasks.Tasks() + + data, err := json.Marshal(tasks) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(data) + } +} + // TaskHandler ... func (s *Server) TaskHandler() httprouter.Handle { return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {