Fragment Description:



Synchonization study 3:
Counting Semaphore.
Binary Semaphore (Edsger Dijkstra) and Counting Semaphore (Carel S.
Scholten) are 2 variants of the Semaphore scheme.
A good and short (critical) presentation here:
(http://bit.ly/1OCBY9t).
The example below, is a study of the 'effective go' example of a channel used as a semaphore (see https://golang.org/doc/effective_go.html#channels):
##### func Serve(queue chan *Request) { for req := range queue { sem <- 1 // Wait for active queue to drain the buffer of chan sem.
go func(req *Request) { process(req) <-sem // Done; enable next request to run.
}(req) } } ##### An alternative solution, was proposed by D.Vyukov:
It initiates a fixed number of Signals (sem <- 1), that limit the number of started goroutines by construction.
func Serve(queue chan *Request) { sem := make(chan int, MaxOutstanding) for i := 0; i < MaxOutstanding; i++ { sem <- 1 } for req := range queue { // acquisition of the semaphore must be on a channel receive, not a send.
// <-sem go func() { process(req) sem <- 1 } } } ##### In our example, this second approach is implemented using a Counting Semaphore.
See in the code below:
semaphore(n), Acquire(1) and Release(1).
To be note:
we are using the robust Semaphore implementation by R.Obryk, available here:
github.com/robryk/semaphore.


countingSemaphore

Go Playground

Last update, on 2015, Tue 17 Nov, 18:24:59

/* ... <== see fragment description ... */

package main

import (
    "fmt"
    "github.com/semaphore"
    "sync"
    "time"
)

var (
    // max number of goroutines launched concurrently
    MaxOutstanding int = 3
    // a queue to process
    queue = []int{}
)

func main() {
    t := time.Now()
    for i := 1; i <= 15; i++ {
        queue = append(queue, i)
    }
    Serve(queue)
    fmt.Printf("-----\nduration = %v\n", time.Now().Sub(t).Seconds())
}

// Observe the difference with the 1st Effective Go example (in comment
// above).
// To prevent overloading, a 'Wait' is organized limiting the number of
// concurrent goroutines: a throttle.
//
// Comment the 4 lines defining the Semaphore synchronization
// and observe the result.
func Serve(queue []int) {
    var wg sync.WaitGroup
    wg.Add(len(queue))

    // Wait for goroutines to drain the 'counting semaphore',
    // when drained: blocks until a 'next release'!
    s := semaphore.NewSemaphore(MaxOutstanding)
    msg := make(chan string)
    // start n goroutines until the s.Acquire() has drained s
    for i, item := range queue {
        go func(i, item int) {
            // will not block as long s.value > 0,
            // then will be unblocked when a goroutine terminates
            // and signals a release()
            s.Acquire(1) // decrease 's.value' by 1 starting from 'MaxOustanding'

            // any processing here
            text := fmt.Sprintf("from goroutine[%d]= %d\n", i, item)
            msg <- text

            // just to have a nice output
            time.Sleep(1 * time.Second)

            // When done; signal another goroutine to run, increases s.value by 1
            s.Release(1) // increment
            wg.Done()
        }(i, item)
    }
    for x := 0; x < len(queue); x++ {
        // just to have a nice output
        if x%MaxOutstanding == 0 {
            fmt.Println("-----")
        }
        fmt.Printf(<-msg)
    }
    go func() {
        wg.Wait()
        close(msg)
    }()

}

/* Expected Output:
-----
from goroutine[7]= 8
from goroutine[1]= 2
from goroutine[0]= 1
-----
from goroutine[3]= 4
from goroutine[2]= 3
from goroutine[5]= 6
-----
from goroutine[6]= 7
from goroutine[12]= 13
from goroutine[10]= 11
-----
from goroutine[13]= 14
from goroutine[4]= 5
from goroutine[9]= 10
-----
from goroutine[11]= 12
from goroutine[8]= 9
from goroutine[14]= 15
-----
duration = 4.003229
*/



Comments