Fragment Description:



Synchronization, study 8:
cyclic barrier.
A 'barrier' is a synchronization aid that allows a set of 'goroutines' to all wait for each other to reach a common barrier point.
CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
The barrier is called cyclic because it can be re-used after the waiting threads are released.
The barrier is reset automatically as soon as it is triggered.
Example of use:
when each worker (goroutine) processes a row of the matrix then waits at the barrier until all rows have been processed.
Each use of the barrier is represented as a generation instance.
The generation changes whenever the barrier is triggered (or is reset).
There can be many generations associated with threads using the barrier - due to the non-deterministic way the lock may be allocated to waiting threads - but only one of these can be active at a time (the one to which count applies) and all the rest are either broken or triggered.
There need not be an active generation if there has been a break but no subsequent reset.
This example doesn't deal with:
timeOut, interruptions, broken cycles (this is for another study).
To be noted:
about 'Condition Variables':
Condition Variables are also based on Dijkstra's semaphore semantics, with the exception that no stored value is associated with the operation.
This means condition variables do not contain the actual condition to test; a 'shared data state' (count in our example) is used instead to maintain the condition for threads.
A thread waits or wakes up other cooperative threads until a condition is satisfied.
Condition variables provide another way for threads to synchronize (besides 'semaphores' and 'locks').
While mutexes implement synchronization by controlling thread access to data, condition variables allow threads to synchronize based upon the actual value of data.
Without condition variables, the programmer would need to have threads continually polling (possibly in a critical section), to check if the condition is met (In our example we check the value of 'count').
This can be very resource consuming since the thread would be continuously busy in this activity.
A condition variable is a way to achieve the same goal without polling.
To operate on shared data, a condition variable Cv, uses a Locker L (go interface).
Three basic 'atomic' operations are performed on Cv:
Wait(), Signal() and Broadcast().
Go Wait():
Wait atomically unlocks c.L and suspends execution of the calling goroutine.
After later resuming execution, Wait locks c.L before returning.
Unlike in other systems, Wait cannot return unless awoken by Broadcast or Signal.
Because c.L is not locked when Wait first resumes, the caller typically cannot assume that the condition is true when Wait returns.
Instead, the caller should Wait in a loop.
Go Signal():
Signal wakes one goroutine waiting on c, if there is any.
It is allowed but not required for the caller to hold c.L during the call.
Go Broadcast():
Broadcast wakes all goroutines waiting on c.
It is allowed but not required for the caller to hold c.L during the call.


cyclicBarrierWithSyncCond

Go Playground

Last update, on 2015, Tue 17 Nov, 18:25:00

/* ... <== see fragment description ... */

package main

import (
    "fmt"
    "sync"
)

// ///////////// definition of the cyclic barrier ///////////
type CyclicBarrier struct {
    generation int
    count      int //  the condition that will trigger runs
    parties    int
    trigger    *sync.Cond
}

func NewCyclicBarrier(numberOfParties int) *CyclicBarrier {
    b := CyclicBarrier{}
    b.count = numberOfParties
    b.parties = numberOfParties
    // Initialization of the associated mutex
    b.trigger = sync.NewCond(&sync.Mutex{})
    return &b
}
func (b *CyclicBarrier) Awaiting() int {
    b.trigger.L.Lock()

    generation := b.generation
    // draining the barrier: our 'condition', i.e. the number of threads in a
    // generation. When 'exhausted' the barrier 'releases' the threads 'on wait'.
    // The 'cyBarrier' is 'automatically reset' to run the next generation
    b.count--
    if b.count == 0 {
        b.nextGeneration()
        b.trigger.L.Unlock() //not using defer for performance
        return b.generation
    }
    for generation == b.generation {
        // wait for current generation to complete
        b.trigger.Wait()
    }
    b.trigger.L.Unlock() //not using defer for performance
    return b.generation
}
func (b *CyclicBarrier) nextGeneration() {
    // signal completion of last generation: a collective release.
    b.trigger.Broadcast()
    b.count = b.parties
    // set up next generation
    b.generation++
}

// ///////// end of definition of the cyclic barrier ////////
func dispatch(wgControl *sync.WaitGroup, f func()) {
    wgControl.Add(1)
    go func() {
        f()
        wgControl.Done()
    }()
}
func main() {
    wgControl := &sync.WaitGroup{}
    defer wgControl.Wait()

    numberOfRuns := 3
    parties := 5 // in each run
    cyBarrier := NewCyclicBarrier(parties)
    fmt.Printf("generation[%d] is ready:\n\n", 1)
    // let's process each parties in the set but release them
    // only when all parties in that set are done (or arrived at thecyBarrier)
    for i := 0; i < parties; i++ {
        name := string('A' + i)
        dispatch(wgControl, func() { process(name, numberOfRuns, cyBarrier) })
    }
}

// each process will wait at the barrier until all parties in a 'generation'
// have arrived there
func process(name string, numberOfRuns int, cyBarrier *CyclicBarrier) {
    // the following reads are done 'safely',
    // if not a race condition would exist between
    // these reads and the cyBarrier.Awaiting() write.
    //
    // Comment the Lock() and Unlock() lines, re-build with '-race' and observe
    cyBarrier.trigger.L.Lock()
    taskID := cyBarrier.count
    cyBarrier.trigger.L.Unlock()

    for i := 0; i < numberOfRuns; i++ {

        // Awaiting() is checking the status of the condition ('count')
        // and will release all the waiting tasks, sending a signal
        // to all tasks when condition is satisfied, using BroadCast().
        // All goroutines will then go through this cyBarrier in the same time
        generation := cyBarrier.Awaiting()

        fmt.Printf("task[%d] named:%q in run[%d] is done, generation[%d]\n",
            taskID, name, i, generation)

        cyBarrier.trigger.L.Lock()
        if cyBarrier.count == 1 {
            fmt.Printf("\n------\ngeneration[%d] is ready "+
                "(i.e. cyBarrier is reset):\n\n", generation+1)
        }
        cyBarrier.trigger.L.Unlock()
    }

}



Comments