Fragment Description:



Article by Samir Ajmani, see here:http://blog.golang.org/pipelines Go's concurrency primitives make it easy to construct streaming data pipelines that make efficient use of I/O and multiple CPUs.
This article presents examples of such pipelines, highlights subtleties that arise when operations fail, and introduces techniques for dealing with failures cleanly.
A must read along with the article about Context:http://blog.golang.org/context


streamingBounded

Go Playground

Last update, on 2015, Fri 9 Oct, 16:15:30

/* ... <== see fragment description ... */

package main

import (
    "crypto/md5"
    "errors"
    "fmt"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "runtime"
    "sort"
    "strconv"
    "sync"
    "time"
)

// walkFiles starts a goroutine to walk the directory tree at root and send
// the
// path of each regular file on the string channel.  It sends the result of
// the
// walk on the error channel.  If chanDone is closed, walkFiles abandons its
// work.
func walkFiles(chanDone <-chan struct{}, root string) (<-chan string, <-chan error) {
    chanPaths := make(chan string)
    chanErrc := make(chan error, 1)
    go func() { // HL
        // Close the chanPaths channel after Walk returns.
        defer close(chanPaths) // HL
        // No select needed for this send, since chanErrc is buffered.
        chanErrc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { // HL
            if err != nil {
                return err
            }
            if info.IsDir() {
                return nil
            }
            select {
            case chanPaths <- path: // HL
            case <-chanDone: // HL
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return chanPaths, chanErrc
}

// A result is the product of reading and summing a file using MD5.
type result struct {
    path string
    sum  [md5.Size]byte //an array of bytes with dimension "md5.Size"
    err  error
}

// digester reads path names from chanPaths and sends digests of the
// corresponding
// files on chanResult until either chanPaths or chanDone is closed.
func digester(chanDone <-chan struct{}, chanPaths <-chan string, chanDisgesterResult chan<- result) {
    for path := range chanPaths { // HLpaths
        data, err := ioutil.ReadFile(path)
        select {
        case chanDisgesterResult <- result{path, md5.Sum(data), err}:
        case <-chanDone:
            return
        }
    }
}

// MD5All reads all the files in the file tree rooted at root and returns a
// map
// from file path to the MD5 sum of the file's contents.  If the directory
// walk
// fails or any read operation fails, MD5All returns an error.  In that case,
// MD5All does not wait for inflight read operations to complete.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the chanDone channel when it returns; it may do so before
    // receiving all the values from chanDisgesterResult and chanErrc.
    chanDone := make(chan struct{})
    defer close(chanDone)
    chanPaths, chanErrc := walkFiles(chanDone, root)
    // Start a fixed number of goroutines to read and digest files.
    chanResult := make(chan result) // HLc
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(chanDone, chanPaths, chanResult) // HLc
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(chanResult) // HLc
    }()
    // End of pipeline. OMIT
    m := make(map[string][md5.Size]byte)
    for r := range chanResult {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-chanErrc; err != nil { // HLerrc
        return nil, err
    }
    return m, nil
}
func main() {
    NUMCPU := runtime.NumCPU()
    runtime.GOMAXPROCS(NUMCPU)
    runtime.Gosched()
    defer timeTrack(time.Now(), "task duration:")
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    // m, err := MD5All(os.Args[1])
    m, err := MD5All("./")
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fi, err := os.Stat(path)
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf("%x\t%q\t--> Size= %q bytes.\n", m[path], path, strconv.FormatInt(fi.Size(), 10))
    }
}

// /////////utility functions /////
func timeTrack(start time.Time, name string) {
    elapsed := time.Since(start)
    fmt.Printf("function %s took %v\n", name, elapsed)
}

/*  Expected Output:
0da9f96d15df41e7a2bda68032f4a459   "bounded.exe" --> Size= "2445312" bytes.
cb4a6efc9688d7956676b141ce5abacc   "bounded.go"  --> Size= "3762" bytes.
function task duration: took 8.0005ms
*/



Comments