Fragment Description:



To set up a 'Dispatcher' with a finite number of 'Worker' goroutines.
The goal is to throttle working on an unknown number of tasks, and then time out cleanly.
For the purpose of limiting concurrency to N concurrent operations, the simplest solution is a channel of size N.
Semaphore using channels:
You do:
sem := make(chan int, N).
to 'enter' (acquire) the semaphore:
sem <- 1.
to 'leave' (release) the semaphore:
<-sem.
and enter will block if there's more trying to enter than N.
Example based on a discussion between Robryk and Vyukov, a lot to mull over it.
Building this type of concurrent program with the option '-race' is mandatory.
The '-race' initially indicated a data-race possibility:
the use of the var 'wi' eliminates it (see below).


workersManager

Go Playground

Last update, on 2016, Thu 28 Jan, 23:31:50

/* ... <== see fragment description ... */

package main

import (
    "fmt"
    "time"
)

type Dispatcher struct {
    Input   chan func()
    done    chan error
    timeout chan struct{}
}

// this dispatches any f() tasks to n workers for a define duration
func NewDispatcher(workers int, timeout time.Duration) *Dispatcher {
    d := &Dispatcher{
        // buffered channels will only block when the buffer fills up
        Input:   make(chan func(), 8*workers),
        done:    make(chan error, workers),
        timeout: make(chan struct{}),
    }
    time.AfterFunc(timeout, func() {
        close(d.timeout)
    })
    for w := 0; w < workers; w++ {
        // each worker is processing a f()
        go func() {
            for {
                select {
                case f, ok := <-d.Input: //receive from channel Input, a func
                    if !ok {
                        d.done <- nil
                        return
                    }
                    // do here whatever with f and its result(s)
                    f()
                    ////
                case <-d.timeout:
                    //semaphore: to enter it, 'acquire'
                    d.done <- fmt.Errorf("timed out as planned after %v", timeout)
                }
            }
        }()
    }
    return d
}

// semaphore: to leave it.
func (d *Dispatcher) Leave() error {
    var err error
    for w := 0; w < cap(d.done); w++ {
        err1 := <-d.done // 'release'
        if err1 != nil {
            err = err1
        }
    }
    close(d.Input)
    return err
}
func main() {
    // create a dispatcher with 6 workers and a time.duration, t is required too
    t := time.Now()
    nWorkers := 6
    duration := time.Duration(2e8 * time.Nanosecond) // 200ms

    d := NewDispatcher(nWorkers, duration)

    // send n tasks (units of work is i below)
    words := []string{"Here", "is", "Go", "and", "for", "a", "while"}

    for i, w := range words {
        // avoiding a data-race between the 'w' write above and the worker's read
        // as indicate by -race if we use directly w and not wi.
        wi := w
        ii := i
        // each job consists in processing f()
        //
        d.Input <- func() {
            fmt.Printf("working on task [%v] ---> on %v WORD IS: %s\n",
                ii, t.Format("Jan 2006, Mon _2, 15h04m05s"), wi)
        }

        // a nap between each job
        time.Sleep(2000 * time.Nanosecond)
    }

    if err := d.Leave(); err != nil {
        fmt.Printf("Wait error: %v\n", err)
    }
}

/* Expected output:
working on task [0] ---> on Jan 2016, Thu 28, 22h40m34s WORD IS: Here
working on task [1] ---> on Jan 2016, Thu 28, 22h40m34s WORD IS: is
working on task [2] ---> on Jan 2016, Thu 28, 22h40m34s WORD IS: Go
working on task [3] ---> on Jan 2016, Thu 28, 22h40m34s WORD IS: and
working on task [4] ---> on Jan 2016, Thu 28, 22h40m34s WORD IS: for
working on task [5] ---> on Jan 2016, Thu 28, 22h40m34s WORD IS: a
working on task [6] ---> on Jan 2016, Thu 28, 22h40m34s WORD IS: while
Wait error: timed out as planned after 200ms
*/



Comments