Fragment Description:



From a suggestion by 'tezeev' (reddit/golang) about the TeeReaderPiped scheme.
His suggestion:
'I think you can take your idea a bit further by better hiding complexity of Pipe/TeeReader abstractions by wrapping it up in your own 'mux' reader (DupReader) implementation'.
His version here:
'http://play.golang.org/p/gx_V3dgzL6'.
So here it is, slightly modified:
a variation of the TeeReaderPiped usage.
We also introduce a counter serialization, required to avoid data-race risks, since it is written within goroutines.


ioTeeReaderAbstracted

Go Playground

Last update, on 2016, Fri 29 Jan, 15:40:25

/* ... <== see fragment description ... */

package main

import (
    "bytes"
    "fmt"
    "io"
    "io/ioutil"
    "sync"
)

var (
    testString = "MiXeD CaPitALiZaTioN"
    // for the sake of the example output
    m       sync.Mutex // will serialize accesses to 'counter'
    counter int        = 2
)

func main() {
    // Duplicating readers...using io.Pipe().
    r1, R2 := DupReader(bytes.NewBufferString(testString))
    R3, R4 := DupReader(r1)
    // etc...
    var wg sync.WaitGroup
    // a Task: modifyString
    modifyString := func(r io.Reader, readItAll func([]byte) []byte) {
        wg.Add(1)

        out, err := readStream(r, readItAll)
        if err != nil {
            panic(err)
        }

        fmt.Printf("modifying [R%d] gives: %q\n", counter, string(out))

        // update of counter is multi-thread safe
        m.Lock()
        counter++
        m.Unlock()
        wg.Done()
    }
    // Display original string
    fmt.Println("Input a stream, and 'pipe it' to various processings\n===")
    fmt.Printf("original string: %q\n", testString)
    // Multiple io.Reader processing
    go modifyString(R2, bytes.ToLower)
    go modifyString(R3, bytes.ToUpper)
    modifyString(R4, func(s []byte) []byte { return bytes.Title(bytes.ToLower(s)) })
    go func() {
        wg.Wait()
    }()
}

// teeReaderPiped abstracted
func DupReader(r io.Reader) (ri io.Reader, rj io.Reader) {
    pr, pw := io.Pipe()
    tr := io.TeeReader(r, pw)
    ri = &readWriteCloser{reader: tr, writer: pw}
    rj = &readWriteCloser{reader: pr}
    return
}

// can we read the stream till the EOF?
func readStream(r io.Reader, readItAll func([]byte) []byte) (o []byte, err error) {
    // ReadAll reads from r until an error or EOF and returns the data it read
    o, err = ioutil.ReadAll(r)
    if err != nil {
        return nil, err
    }
    return readItAll(o), nil
}

type readWriteCloser struct {
    reader io.Reader
    writer io.WriteCloser
}

// readWriteClose implements io.Reader
func (mr readWriteCloser) Read(buf []byte) (n int, err error) {
    if mr.reader != nil {
        n, err = mr.reader.Read(buf)
        if err != nil && err == io.EOF {
            if mr.writer != nil {
                mr.writer.Close()
                mr.writer = nil // no longer needed
                mr.reader = nil // no longer needed
            }
        }
        return
    }
    return 0, io.EOF
}

/* Expected output:
Input a stream, and 'pipe it' to various processes
===
original string: "MiXeD CaPitALiZaTioN"
modifying [R2] gives: "MIXED CAPITALIZATION"
modifying [R3] gives: "mixed capitalization"
modifying [R4] gives: "Mixed Capitalization"
*/



Comments