Fragment Description:



Using goroutines, channels and sync.Waitgroup to build a 'jobs dispatcher' and a 'workers pool' to execute the work in concurrency.


jobsDispatcherToWorkersPool

Go Playground

Last update, on 2015, Fri 9 Oct, 16:15:29

/* ... <== see fragment description ... */

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// This function starts n workers (goroutines) and is ready to receive work.
// It returns a work and a wait!
func NewWorkersPool(n int) (chan<- func(int), *sync.WaitGroup) {
    work := make(chan func(int), n)
    var wait sync.WaitGroup
    wait.Add(n)
    for ; n > 0; n-- {
        // idiom: passing a parameter to the "anonymous closure" function
        go func(id int) {
            for x := range work {
                x(id)
            }
            wait.Done()
        }(n)
    }
    return work, &wait
}

// Example use: 4 workers, 10 jobs
func main() {
    rand.Seed(time.Now().UnixNano())
    // set the number of workers
    work, wait := NewWorkersPool(4)
    // dispatching some work
    for i := 0; i < 10; i++ {
        v := i
        work <- func(id int) {
            time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
            fmt.Println("worker:", id, "\tdoing job:", v)
        }
    }
    // To close down the work pool, just close the chan that is returned by
    // NewWorkerPool.
    close(work)
    // To ensure all workers have finished, Wait() on the returned WaitGroup.
    wait.Wait()
}

/* Expected Output:
worker: 2  doing job: 2
worker: 3  doing job: 1
worker: 2  doing job: 4
worker: 1  doing job: 3
worker: 4  doing job: 0
worker: 2  doing job: 6
worker: 2  doing job: 9
worker: 3  doing job: 5
worker: 4  doing job: 8
worker: 1  doing job: 7
*/



Comments