package pool // import "time" // const ( // // This value is the size of the queue that workers register their // // availability to the dispatcher. There may be hundreds of workers, but // // only a small channel is needed to register some of the workers. // readyQueueSize = 16 // // If worker pool receives no new work for this period of time, then stop // // a worker goroutine. // idleTimeoutSec = 5 // ) // type WorkerPool struct { // maxWorkers int // timeout time.Duration // taskQueue chan func() // readyWorkers chan chan func() // stoppedChan chan struct{} // } // func New(maxWorkers int) *WorkerPool { // // There must be at least one worker. // if maxWorkers < 1 { // maxWorkers = 1 // } // // taskQueue is unbuffered since items are always removed immediately. // pool := &WorkerPool{ // taskQueue: make(chan func()), // maxWorkers: maxWorkers, // readyWorkers: make(chan chan func(), readyQueueSize), // timeout: time.Second * idleTimeoutSec, // stoppedChan: make(chan struct{}), // } // // Start the task dispatcher. // go pool.dispatch() // return pool // } // func (p *WorkerPool) Stop() { // if p.Stopped() { // return // } // close(p.taskQueue) // <-p.stoppedChan // } // func (p *WorkerPool) Stopped() bool { // select { // case <-p.stoppedChan: // return true // default: // } // return false // } // func (p *WorkerPool) Submit(task func()) { // if task != nil { // p.taskQueue <- task // } // } // func (p *WorkerPool) SubmitWait(task func()) { // if task == nil { // return // } // doneChan := make(chan struct{}) // p.taskQueue <- func() { // task() // close(doneChan) // } // <-doneChan // } // func (p *WorkerPool) dispatch() { // defer close(p.stoppedChan) // timeout := time.NewTimer(p.timeout) // var workerCount int // var task func() // var ok bool // var workerTaskChan chan func() // startReady := make(chan chan func()) // Loop: // for { // timeout.Reset(p.timeout) // select { // case task, ok = <-p.taskQueue: // if !ok { // break Loop // } // // Got a task to do. // select { // case workerTaskChan = <-p.readyWorkers: // // A worker is ready, so give task to worker. // workerTaskChan <- task // default: // // No workers ready. // // Create a new worker, if not at max. // if workerCount < p.maxWorkers { // workerCount++ // go func(t func()) { // startWorker(startReady, p.readyWorkers) // // Submit the task when the new worker. // taskChan := <-startReady // taskChan <- t // }(task) // } else { // // Start a goroutine to submit the task when an existing // // worker is ready. // go func(t func()) { // taskChan := <-p.readyWorkers // taskChan <- t // }(task) // } // } // case <-timeout.C: // // Timed out waiting for work to arrive. Kill a ready worker. // if workerCount > 0 { // select { // case workerTaskChan = <-p.readyWorkers: // // A worker is ready, so kill. // close(workerTaskChan) // workerCount-- // default: // // No work, but no ready workers. All workers are busy. // } // } // } // } // // Stop all remaining workers as they become ready. // for workerCount > 0 { // workerTaskChan = <-p.readyWorkers // close(workerTaskChan) // workerCount-- // } // } // func startWorker(startReady, readyWorkers chan chan func()) { // go func() { // taskChan := make(chan func()) // var task func() // var ok bool // // Register availability on starReady channel. // startReady <- taskChan // for { // // Read task from dispatcher. // task, ok = <-taskChan // if !ok { // // Dispatcher has told worker to stop. // break // } // // Execute the task. // task() // // Register availability on readyWorkers channel. // readyWorkers <- taskChan // } // }() // }