Fragment Description:



Synchronization, study 10:
cyclic barrier.
A 'barrier' is a synchronization aid that allows a set of 'goroutines' to all wait for each other to reach a common barrier point.
CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other.
The barrier is called cyclic because it can be re-used after the waiting threads are released.
The barrier is reset automatically as soon as it is triggered.
Example of use:
when each worker (goroutine) processes a row of the matrix then waits at the barrier until all rows have been processed.
Each use of the barrier is represented as a generation instance.
The generation changes whenever the barrier is triggered (or is reset).
There can be many generations associated with threads using the barrier - due to the non-deterministic way the lock may be allocated to waiting threads - but only one of these can be active at a time (the one to which count applies) and all the rest are either broken or triggered.
There need not be an active generation if there has been a break but no subsequent reset.
This example doesn't deal with:
timeOut, interruptions, broken cycles (this is for another study).
To be noted:
about 'Condition Variables':
Condition Variables are also based on Dijkstra's semaphore semantics, with the exception that no stored value is associated with the operation.
This means condition variables do not contain the actual condition to test; a 'shared data state' (count in our example) is used instead to maintain the condition for threads.
A thread waits or wakes up other cooperative threads until a condition is satisfied.
Condition variables provide another way for threads to synchronize (besides 'semaphores' and 'locks').
While mutexes implement synchronization by controlling thread access to data, condition variables allow threads to synchronize based upon the actual value of data.
Without condition variables, the programmer would need to have threads continually polling (possibly in a critical section), to check if the condition is met (In our example we check the value of 'count').
This can be very resource consuming since the thread would be continuously busy in this activity.
A condition variable is a way to achieve the same goal without polling.
To operate on shared data, a condition variable Cv, uses a Locker L (a go interface).
Three basic 'atomic' operations are performed on Cv:
Wait(), Signal() and Broadcast().
Go Wait():
Wait atomically unlocks c.L and suspends execution of the calling goroutine.
After later resuming execution, Wait locks c.L before returning.
Unlike in other systems, Wait cannot return unless awoken by Broadcast or Signal.
Because c.L is not locked when Wait first resumes, the caller typically cannot assume that the condition is true when Wait returns.
Instead, the caller should Wait in a loop.
Go Signal():
Signal wakes one goroutine waiting on c, if there is any.
It is allowed but not required for the caller to hold c.L during the call.
Go Broadcast():
Broadcast wakes all goroutines waiting on c.
It is allowed but not required for the caller to hold c.L during the call.


cyBarrierWithSyncCondAtomic

Go Playground

Last update, on 2015, Thu 26 Nov, 23:08:10

/* ... <== see fragment description ... */

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "sync/atomic"
    "time"
)

// The cyclic barrier
type CyclicBarrier struct {
    generation int
    count      int32
    parties    int32
    trigger    *sync.Cond
}

func NewCyclicBarrier(parties int32) *CyclicBarrier {
    b := CyclicBarrier{}
    b.count = parties
    b.parties = parties
    b.trigger = sync.NewCond(&sync.Mutex{})
    return &b
}
func (b *CyclicBarrier) waitUntil() int {
    b.trigger.L.Lock()
    generation := b.generation
    c := atomic.AddInt32(&b.count, int32(-1))
    // all concurrents are at the barrier, the wait ends, the barrier is reset
    // this is the event tested to open the barrier
    if c == 0 {
        fmt.Printf("\n=-=-=-=\nall concurrents on Wait() at Barrier,"+
            " Start race[%d]\n=-=\n\n= finish line order =\n", generation+1)
        b.trigger.Broadcast()
        b.count = b.parties
        b.generation++
        b.trigger.L.Unlock()
        return b.generation
    }
    // waiting until the previous test is satisfied and the broadcast received
    for generation == b.generation {
        b.trigger.Wait()
    }
    b.trigger.L.Unlock()
    return b.generation
}
func (b *CyclicBarrier) getCount() int {
    return int(atomic.LoadInt32(&b.count))
}
func (b *CyclicBarrier) checkCount(c int) bool {
    if int(atomic.LoadInt32(&b.count)) == c {
        return true
    }
    return false
}

// end of the cyclic barrier definition
// usage example (..an attempt).
func init() {
    rand.Seed(time.Now().UTC().UnixNano())
}
func random(min, max int) int {
    // rand.Seed(time.Now().UTC().UnixNano()) //done in init() once for all
    if max-min <= 0 {
        return 1
    }
    return rand.Intn(max-min) + min
}
func prepAndLaunch(wgControl *sync.WaitGroup, f func()) {
    wgControl.Add(1)
    go func() {
        // execute the parameter f
        f()
        wgControl.Done()
    }()
}
func concurrent(cyBarrier *CyclicBarrier, races int) {
    // each concurrent receives a number
    concurrentId := cyBarrier.getCount()
    // each concurrent participates to n races
    for i := 0; i < races; i++ {
        // each race starts when condition tested by waitUntil() is satisfied
        race := cyBarrier.waitUntil()
        t := time.Now()
        // each concurrent runs at a specific speed
        time.Sleep(9e7 + time.Duration(rand.Int63n(1e8)))
        dur := time.Now().Sub(t).Seconds() * 100
        fmt.Printf("concurrent[%d] is arrived in\t%.6fs, race[%d]\n",
            concurrentId, dur, race)
        if cyBarrier.checkCount(0) {
            fmt.Printf("\nnext race[%d] is ready "+
                "(i.e. cyBarrier is reset):\n\n", race+1)
        }
    }
}
func main() {
    // to control the goroutines created by prepAndLaunch()
    wgControl := &sync.WaitGroup{}
    defer wgControl.Wait()
    // a meeting with concurrents and races
    races := random(1, 4)
    concurrents := random(3, 7)
    // a barrier will hold/release all concurrents at the same time
    cyBarrier := NewCyclicBarrier(int32(concurrents))
    // bring each concurrent to the barrier
    for i := 0; i < concurrents; i++ {
        // a goroutine per concurrent
        prepAndLaunch(wgControl, func() { concurrent(cyBarrier, races) })
    }
}

/* Expected Output (numbers of concurrents and races are randomized:
=-=-=-=
all concurrents on Wait() at Barrier, Start race[1]
=-=
= finish line order =
concurrent[2] is arrived in    11.500650s, race[1]
concurrent[4] is arrived in    12.600720s, race[1]
concurrent[1] is arrived in    18.501050s, race[1]
concurrent[3] is arrived in    18.501050s, race[1]
=-=-=-=
all concurrents on Wait() at Barrier, Start race[2]
=-=
= finish line order =
concurrent[4] is arrived in    10.600610s, race[2]
concurrent[1] is arrived in    11.300650s, race[2]
concurrent[3] is arrived in    15.100870s, race[2]
concurrent[2] is arrived in    16.100920s, race[2]
*/



Comments