From c5a99e0a4e990a97c14405b8dd7dbe18fa75e141 Mon Sep 17 00:00:00 2001 From: "joy.zhou" Date: Mon, 15 Jul 2019 10:37:49 +0800 Subject: [PATCH] fixed --- pool/fixpool.go | 98 ++++++++-------- pool/pool.go | 296 ++++++++++++++++++++++++------------------------ 2 files changed, 197 insertions(+), 197 deletions(-) diff --git a/pool/fixpool.go b/pool/fixpool.go index 43f97ae..6dc39fa 100644 --- a/pool/fixpool.go +++ b/pool/fixpool.go @@ -1,58 +1,58 @@ package pool -import ( - "github.com/segmentio/fasthash/fnv1a" -) +// import ( +// "github.com/segmentio/fasthash/fnv1a" +// ) -type WorkerPool struct { - maxWorkers int - taskQueue []chan func() - stoppedChan chan struct{} -} +// type WorkerPool struct { +// maxWorkers int +// taskQueue []chan func() +// stoppedChan chan struct{} +// } -func New(maxWorkers int) *WorkerPool { - // There must be at least one worker. - if maxWorkers < 1 { - maxWorkers = 1 - } +// func New(maxWorkers int) *WorkerPool { +// // There must be at least one worker. +// if maxWorkers < 1 { +// maxWorkers = 1 +// } - // taskQueue is unbuffered since items are always removed immediately. - pool := &WorkerPool{ - taskQueue: make([]chan func(), maxWorkers), - maxWorkers: maxWorkers, - stoppedChan: make(chan struct{}), - } - // Start the task dispatcher. - pool.dispatch() +// // taskQueue is unbuffered since items are always removed immediately. +// pool := &WorkerPool{ +// taskQueue: make([]chan func(), maxWorkers), +// maxWorkers: maxWorkers, +// stoppedChan: make(chan struct{}), +// } +// // Start the task dispatcher. +// pool.dispatch() - return pool -} +// return pool +// } -func (p *WorkerPool) Submit(uid string, task func()) { - idx := fnv1a.HashString64(uid) % uint64(p.maxWorkers) - if task != nil { - p.taskQueue[idx] <- task - } -} +// func (p *WorkerPool) Submit(uid string, task func()) { +// idx := fnv1a.HashString64(uid) % uint64(p.maxWorkers) +// if task != nil { +// p.taskQueue[idx] <- task +// } +// } -func (p *WorkerPool) dispatch() { - for i := 0; i < p.maxWorkers; i++ { - p.taskQueue[i] = make(chan func()) - go startWorker(p.taskQueue[i]) - } -} +// func (p *WorkerPool) dispatch() { +// for i := 0; i < p.maxWorkers; i++ { +// p.taskQueue[i] = make(chan func()) +// go startWorker(p.taskQueue[i]) +// } +// } -func startWorker(taskChan chan func()) { - go func() { - var task func() - var ok bool - for { - task, ok = <-taskChan - if !ok { - break - } - // Execute the task. - task() - } - }() -} +// func startWorker(taskChan chan func()) { +// go func() { +// var task func() +// var ok bool +// for { +// task, ok = <-taskChan +// if !ok { +// break +// } +// // Execute the task. +// task() +// } +// }() +// } diff --git a/pool/pool.go b/pool/pool.go index 8f0ac13..f7ca486 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -1,166 +1,166 @@ package pool -// import "time" +import "time" -// const ( -// // This value is the size of the queue that workers register their -// // availability to the dispatcher. There may be hundreds of workers, but -// // only a small channel is needed to register some of the workers. -// readyQueueSize = 16 +const ( + // This value is the size of the queue that workers register their + // availability to the dispatcher. There may be hundreds of workers, but + // only a small channel is needed to register some of the workers. + readyQueueSize = 64 -// // If worker pool receives no new work for this period of time, then stop -// // a worker goroutine. -// idleTimeoutSec = 5 -// ) + // If worker pool receives no new work for this period of time, then stop + // a worker goroutine. + idleTimeoutSec = 5 +) -// type WorkerPool struct { -// maxWorkers int -// timeout time.Duration -// taskQueue chan func() -// readyWorkers chan chan func() -// stoppedChan chan struct{} -// } +type WorkerPool struct { + maxWorkers int + timeout time.Duration + taskQueue chan func() + readyWorkers chan chan func() + stoppedChan chan struct{} +} -// func New(maxWorkers int) *WorkerPool { -// // There must be at least one worker. -// if maxWorkers < 1 { -// maxWorkers = 1 -// } +func New(maxWorkers int) *WorkerPool { + // There must be at least one worker. + if maxWorkers < 1 { + maxWorkers = 1 + } -// // taskQueue is unbuffered since items are always removed immediately. -// pool := &WorkerPool{ -// taskQueue: make(chan func()), -// maxWorkers: maxWorkers, -// readyWorkers: make(chan chan func(), readyQueueSize), -// timeout: time.Second * idleTimeoutSec, -// stoppedChan: make(chan struct{}), -// } + // taskQueue is unbuffered since items are always removed immediately. + pool := &WorkerPool{ + taskQueue: make(chan func()), + maxWorkers: maxWorkers, + readyWorkers: make(chan chan func(), readyQueueSize), + timeout: time.Second * idleTimeoutSec, + stoppedChan: make(chan struct{}), + } -// // Start the task dispatcher. -// go pool.dispatch() + // Start the task dispatcher. + go pool.dispatch() -// return pool -// } + return pool +} -// func (p *WorkerPool) Stop() { -// if p.Stopped() { -// return -// } -// close(p.taskQueue) -// <-p.stoppedChan -// } +func (p *WorkerPool) Stop() { + if p.Stopped() { + return + } + close(p.taskQueue) + <-p.stoppedChan +} -// func (p *WorkerPool) Stopped() bool { -// select { -// case <-p.stoppedChan: -// return true -// default: -// } -// return false -// } +func (p *WorkerPool) Stopped() bool { + select { + case <-p.stoppedChan: + return true + default: + } + return false +} -// func (p *WorkerPool) Submit(task func()) { -// if task != nil { -// p.taskQueue <- task -// } -// } +func (p *WorkerPool) Submit(task func()) { + if task != nil { + p.taskQueue <- task + } +} -// func (p *WorkerPool) SubmitWait(task func()) { -// if task == nil { -// return -// } -// doneChan := make(chan struct{}) -// p.taskQueue <- func() { -// task() -// close(doneChan) -// } -// <-doneChan -// } +func (p *WorkerPool) SubmitWait(task func()) { + if task == nil { + return + } + doneChan := make(chan struct{}) + p.taskQueue <- func() { + task() + close(doneChan) + } + <-doneChan +} -// func (p *WorkerPool) dispatch() { -// defer close(p.stoppedChan) -// timeout := time.NewTimer(p.timeout) -// var workerCount int -// var task func() -// var ok bool -// var workerTaskChan chan func() -// startReady := make(chan chan func()) -// Loop: -// for { -// timeout.Reset(p.timeout) -// select { -// case task, ok = <-p.taskQueue: -// if !ok { -// break Loop -// } -// // Got a task to do. -// select { -// case workerTaskChan = <-p.readyWorkers: -// // A worker is ready, so give task to worker. -// workerTaskChan <- task -// default: -// // No workers ready. -// // Create a new worker, if not at max. -// if workerCount < p.maxWorkers { -// workerCount++ -// go func(t func()) { -// startWorker(startReady, p.readyWorkers) -// // Submit the task when the new worker. -// taskChan := <-startReady -// taskChan <- t -// }(task) -// } else { -// // Start a goroutine to submit the task when an existing -// // worker is ready. -// go func(t func()) { -// taskChan := <-p.readyWorkers -// taskChan <- t -// }(task) -// } -// } -// case <-timeout.C: -// // Timed out waiting for work to arrive. Kill a ready worker. -// if workerCount > 0 { -// select { -// case workerTaskChan = <-p.readyWorkers: -// // A worker is ready, so kill. -// close(workerTaskChan) -// workerCount-- -// default: -// // No work, but no ready workers. All workers are busy. -// } -// } -// } -// } +func (p *WorkerPool) dispatch() { + defer close(p.stoppedChan) + timeout := time.NewTimer(p.timeout) + var workerCount int + var task func() + var ok bool + var workerTaskChan chan func() + startReady := make(chan chan func()) +Loop: + for { + timeout.Reset(p.timeout) + select { + case task, ok = <-p.taskQueue: + if !ok { + break Loop + } + // Got a task to do. + select { + case workerTaskChan = <-p.readyWorkers: + // A worker is ready, so give task to worker. + workerTaskChan <- task + default: + // No workers ready. + // Create a new worker, if not at max. + if workerCount < p.maxWorkers { + workerCount++ + go func(t func()) { + startWorker(startReady, p.readyWorkers) + // Submit the task when the new worker. + taskChan := <-startReady + taskChan <- t + }(task) + } else { + // Start a goroutine to submit the task when an existing + // worker is ready. + go func(t func()) { + taskChan := <-p.readyWorkers + taskChan <- t + }(task) + } + } + case <-timeout.C: + // Timed out waiting for work to arrive. Kill a ready worker. + if workerCount > 0 { + select { + case workerTaskChan = <-p.readyWorkers: + // A worker is ready, so kill. + close(workerTaskChan) + workerCount-- + default: + // No work, but no ready workers. All workers are busy. + } + } + } + } -// // Stop all remaining workers as they become ready. -// for workerCount > 0 { -// workerTaskChan = <-p.readyWorkers -// close(workerTaskChan) -// workerCount-- -// } -// } + // Stop all remaining workers as they become ready. + for workerCount > 0 { + workerTaskChan = <-p.readyWorkers + close(workerTaskChan) + workerCount-- + } +} -// func startWorker(startReady, readyWorkers chan chan func()) { -// go func() { -// taskChan := make(chan func()) -// var task func() -// var ok bool -// // Register availability on starReady channel. -// startReady <- taskChan -// for { -// // Read task from dispatcher. -// task, ok = <-taskChan -// if !ok { -// // Dispatcher has told worker to stop. -// break -// } +func startWorker(startReady, readyWorkers chan chan func()) { + go func() { + taskChan := make(chan func()) + var task func() + var ok bool + // Register availability on starReady channel. + startReady <- taskChan + for { + // Read task from dispatcher. + task, ok = <-taskChan + if !ok { + // Dispatcher has told worker to stop. + break + } -// // Execute the task. -// task() + // Execute the task. + task() -// // Register availability on readyWorkers channel. -// readyWorkers <- taskChan -// } -// }() -// } + // Register availability on readyWorkers channel. + readyWorkers <- taskChan + } + }() +}