Fragment Description:



Synchronization, study 9:
cyclic barrier with a Mutex and a channel.
This example has been inspired from a documented comment by Tezeev (on reddit).
To be compared with the example using a sync.Cond based cyclic barrier.
Important:
All operations on a condition variable must be made while the current thread/goroutine has acquired a lock.
All accesses to a given condition variable must be protected by the same lock.


cyclicBarrierMutexChan

Go Playground

Last update, on 2015, Tue 17 Nov, 18:25:00

/* ... <== see fragment description ... */

package main

import (
    "fmt"
    "sync"
)

// /// definition of a cyclic barrier with a sync.Mutex and a channel
// ///////////
type CyclicBarrier struct {
    sync.Mutex
    parties int
    count   int
    signal  chan struct{}
}

func (b *CyclicBarrier) reset() {
    b.Lock()

    b.count = b.parties
    close(b.signal)
    b.signal = make(chan struct{})

    b.Unlock() //not using defer for performance
}
func (b *CyclicBarrier) Awaiting() {
    b.Lock()

    b.count--
    count := b.count
    signal := b.signal

    b.Unlock() //not using defer for performance
    if count > 0 {
        <-signal
    } else {
        fmt.Println("All parties have arrived to the barrier, lets run...")
        // The signal is not sent, the channel will be closed,
        // The next generation will be blocked at the barrier until reset.
        b.reset()
    }
}
func NewCyclicBarrier(numberOfParties int) *CyclicBarrier {
    b := &CyclicBarrier{
        parties: numberOfParties,
        count:   numberOfParties,
        signal:  make(chan struct{}),
    }
    return b
}

// /// end of definition of cyclic barrier ////////
func main() {
    wg := &sync.WaitGroup{}
    defer wg.Wait()

    numberOfRuns := 3
    parties := 5
    cyBarrier := NewCyclicBarrier(parties)
    // let's process each parties
    for i := 0; i < parties; i++ {
        name := string('A' + i)
        dispatch(wg, func() { process(name, numberOfRuns, cyBarrier) })
    }
}
func dispatch(wg *sync.WaitGroup, f func()) {
    wg.Add(1)
    go func() {
        f()
        wg.Done()
    }()
}
func process(name string, numberOfRuns int, cyBarrier *CyclicBarrier) {
    // the following reads must be done 'safely',
    // if not, there exists a race condition between
    // these reads and the cyBarrier.Awaiting() write.
    //
    // Comment the Lock() and Unlock() lines, re-build with '-race' and observe
    cyBarrier.Lock()
    taskID := cyBarrier.count
    cyBarrier.Unlock()

    for i := 0; i < numberOfRuns; i++ {

        cyBarrier.Lock()
        if cyBarrier.count == 1 {
            fmt.Printf("\n------\ngeneration[%d] is ready "+
                "(i.e. cyBarrier is reset):\n\n", i+1)
        }
        cyBarrier.Unlock()

        // all parties of a generation will go through this cyBarrier
        cyBarrier.Awaiting()

        fmt.Printf("task[%d] named:%q in run[%d] is done, generation[%d]\n",
            taskID, name, i, i+1)
    }
}

/* Expected Output:
------
generation[1] is ready (i.e. cyBarrier is reset):
All parties have arrived to the barrier, lets run...
task[1] named:"E" in run[0] is done, generation[1]
task[5] named:"A" in run[0] is done, generation[1]
task[4] named:"B" in run[0] is done, generation[1]
task[3] named:"C" in run[0] is done, generation[1]
task[2] named:"D" in run[0] is done, generation[1]
------
generation[2] is ready (i.e. cyBarrier is reset):
All parties have arrived to the barrier, lets run...
task[2] named:"D" in run[1] is done, generation[2]
task[1] named:"E" in run[1] is done, generation[2]
task[5] named:"A" in run[1] is done, generation[2]
task[4] named:"B" in run[1] is done, generation[2]
task[3] named:"C" in run[1] is done, generation[2]
------
generation[3] is ready (i.e. cyBarrier is reset):
All parties have arrived to the barrier, lets run...
task[3] named:"C" in run[2] is done, generation[3]
task[2] named:"D" in run[2] is done, generation[3]
task[1] named:"E" in run[2] is done, generation[3]
task[5] named:"A" in run[2] is done, generation[3]
task[4] named:"B" in run[2] is done, generation[3]
*/



Comments