Fragment Description:



Article by Samir Ajmani, see here:http://blog.golang.org/pipelines Go's concurrency primitives make it easy to construct streaming data pipelines that make efficient use of I/O and multiple CPUs.
This article presents examples of such pipelines, highlights subtleties that arise when operations fail, and introduces techniques for dealing with failures cleanly.
A must read along with the article about Context:http://blog.golang.org/context


streamingInParallel

Go Playground

Last update, on 2015, Fri 9 Oct, 16:15:30

/* ... <== see fragment description ... */

package main

import (
    "crypto/md5"
    "errors"
    "fmt"
    "io/ioutil"
    "os"
    "path/filepath"
    "sort"
    "sync"
    "time"
)

// A result is the product of reading and summing a file using MD5.
type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

// sumFiles starts goroutines to walk the directory tree at root and digest
// each
// regular file.  These goroutines send the results of the digests on the
// result
// channel and send the result of the walk on the error channel.  If done is
// closed, sumFiles abandons its work.
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() { // HL
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if info.IsDir() {
                return nil
            }
            wg.Add(1)
            go func() { // HL
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}: // HL
                case <-done: // HL
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done: // HL
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() { // HL
            wg.Wait()
            close(c) // HL
        }()
        // No select needed here, since errc is buffered.
        errc <- err // HL
    }()
    return c, errc
}

// MD5All reads all the files in the file tree rooted at root and returns a
// map
// from file path to the MD5 sum of the file's contents.  If the directory
// walk
// fails or any read operation fails, MD5All returns an error.  In that case,
// MD5All does not wait for inflight read operations to complete.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})     // HLdone
    defer close(done)               // HLdone
    c, errc := sumFiles(done, root) // HLdone
    m := make(map[string][md5.Size]byte)
    for r := range c { // HLrange
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}
func main() {
    defer timeTrack(time.Now(), "task duration:")
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All("./")
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

// /////////utility functions /////
func timeTrack(start time.Time, name string) {
    elapsed := time.Since(start)
    fmt.Printf("function %s took %v\n", name, elapsed)
}

/*  Expected Output:
37e8ee96751fc2b3d61e323f7173723c  parallel.exe
d7846e00676d933859d629678d67c8f7  parallel.go
function task duration: took 7.0004ms
*/



Comments