Files
spyda/internal/dispatcher.go

112 lines
2.5 KiB
Go

package internal
import (
"errors"
)
// Dispatcher maintains a pool for available workers
// and a task queue that workers will process
type Dispatcher struct {
maxWorkers int
maxQueue int
workers []*Worker
workerPool chan chan Task
taskQueue chan Task
taskMap map[string]Task
quit chan bool
active bool
}
// NewDispatcher creates a new dispatcher with the given
// number of workers and buffers the task queue based on maxQueue.
// It also initializes the channels for the worker pool and task queue
func NewDispatcher(maxWorkers int, maxQueue int) *Dispatcher {
return &Dispatcher{
maxWorkers: maxWorkers,
maxQueue: maxQueue,
}
}
// Start creates and starts workers, adding them to the worker pool.
// Then, it starts a select loop to wait for tasks to be dispatched
// to available workers
func (d *Dispatcher) Start() {
d.workers = []*Worker{}
d.workerPool = make(chan chan Task, d.maxWorkers)
d.taskQueue = make(chan Task, d.maxQueue)
d.taskMap = make(map[string]Task)
d.quit = make(chan bool)
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.workerPool)
worker.Start()
d.workers = append(d.workers, worker)
}
d.active = true
go func() {
for {
select {
case task := <-d.taskQueue:
go func(task Task) {
taskChannel := <-d.workerPool
taskChannel <- task
}(task)
case <-d.quit:
return
}
}
}()
}
// Stop ends execution for all workers and closes all channels, then removes
// all workers
func (d *Dispatcher) Stop() {
if !d.active {
return
}
d.active = false
for i := range d.workers {
d.workers[i].Stop()
}
d.workers = []*Worker{}
d.quit <- true
}
// Tasks returns all tasks
func (d *Dispatcher) Tasks() map[string]TaskResult {
tasks := make(map[string]TaskResult)
for id, task := range d.taskMap {
tasks[id] = task.Result()
}
return tasks
}
// Lookup returns the matching `Task` given its id
func (d *Dispatcher) Lookup(id string) (Task, bool) {
task, ok := d.taskMap[id]
return task, ok
}
// Dispatch pushes the given task into the task queue.
// The first available worker will perform the task
func (d *Dispatcher) Dispatch(task Task) (string, error) {
if !d.active {
return "", errors.New("dispatcher is not active")
}
d.taskQueue <- task
d.taskMap[task.ID()] = task
return task.ID(), nil
}
// DispatchFunc pushes the given func into the task queue by first wrapping
// it with a `TaskFunc` task.
func (d *Dispatcher) DispatchFunc(f func() error) (string, error) {
return d.Dispatch(NewFuncTask(f))
}