Skip to content

bobheadxi/streamline

Repository files navigation

streamline go reference Sourcegraph

pipeline codecov Go Report Card benchmarks

Transform and handle your data, line by line.

go get go.bobheadxi.dev/streamline

Overview

streamline offers a variety of primitives to make working with data line by line a breeze:

  • streamline.Stream offers the ability to add hooks that handle an io.Reader line-by-line with (*Stream).Stream, (*Stream).StreamBytes, and other utilities.
  • pipeline.Pipeline offers a way to build pipelines that transform the data in a streamline.Stream, such as cleaning, filtering, mapping, or sampling data.
  • pipe.NewStream offers a way to create a buffered pipe between a writer and a Stream.
    • streamexec.Start uses this to attach a Stream to an exec.Cmd to work with command output.

When working with data streams in Go, you typically get an io.Reader, which is great for arbitrary data - but in many cases, especially when scripting, it's common to either end up with data and outputs that are structured line by line, or want to handle data line by line, for example to send to a structured logging library. You can set up a bufio.Reader or bufio.Scanner to do this, but for cases like exec.Cmd you will also need boilerplate to configure the command and set up pipes, and for additional functionality like transforming, filtering, or sampling output you will need to write your own additional handlers. streamline aims to provide succint ways to do all of the above and more.

Add prefixes to command output

bufio.Scanner streamline/streamexec
func PrefixOutput(cmd *exec.Cmd) error {
    reader, writer := io.Pipe()
    cmd.Stdout = writer
    cmd.Stderr = writer
    if err := cmd.Start(); err != nil {
        return err
    }
    errC := make(chan error)
    go func() {
        err := cmd.Wait()
        writer.Close()
        errC <- err
    }()
    s := bufio.NewScanner(reader)
    for s.Scan() {
        println("PREFIX: ", s.Text())
    }
    if err := s.Err(); err != nil {
        return err
    }
    return <-errC
}
func PrefixOutput(cmd *exec.Cmd) error {
    stream, err := streamexec.Start(cmd)
    if err != nil {
        return err
    }
    return stream.Stream(func(line string) {
        println("PREFIX: ", line)
    })
}

Process JSON on the fly

bufio.Scanner streamline
func GetMessages(r io.Reader) error {
    s := bufio.NewScanner(r)
    for s.Scan() {
        var result bytes.Buffer
        cmd := exec.Command("jq", ".msg")
        cmd.Stdin = bytes.NewReader(s.Bytes())
        cmd.Stdout = &result
        if err := cmd.Run(); err != nil {
            return err
        }
        print(result.String())
    }
    return s.Err()
}
func GetMessages(r io.Reader) error {
    return streamline.New(r).
        WithPipeline(jq.Pipeline(".msg")).
        Stream(func(line string) {
            println(line)
        })
}

Sample noisy output

bufio.Scanner streamline
func PrintEvery10th(r io.Reader) error {
    s := bufio.NewScanner(r)
    var count int
    for s.Scan() {
        count  
        if count%10 != 0 {
            continue
        }
        println(s.Text())
    }
    return s.Err()
}
func PrintEvery10th(r io.Reader) error {
    return streamline.New(r).
        WithPipeline(pipeline.Sample(10)).
        Stream(func(line string) {
            println(line)
        })
}

Transform specific lines

This particular example is a somewhat realistic one - GCP Cloud SQL cannot accept pgdump output that contains certain EXTENSION-related statements, so to pgdump a PostgreSQL database and upload the dump in a bucket for import into Cloud SQL, one must pre-process their dumps to remove offending statements.

bufio.Scanner streamline
var unwanted = []byte("COMMENT ON EXTENSION")

func Upload(pgdump *os.File, dst io.Writer) error {
    s := bufio.NewScanner(pgdump)
    for s.Scan() {
        line := s.Bytes()
        var err error
        if bytes.Contains(line, unwanted) {
            _, err = dst.Write(
                // comment out this line
                append([]byte("-- "), line...))
        } else {
            _, err = dst.Write(line)
        }
        if err != nil {
            return err
        }
    }
    return s.Err()
}
var unwanted = []byte("COMMENT ON EXTENSION")

func Upload(pgdump *os.File, dst io.Writer) error {
    _, err := streamline.New(pgdump).
        WithPipeline(pipeline.Map(func(line []byte) []byte {
            if bytes.Contains(line, unwanted) {
                // comment out this line
                return append([]byte("-- "), line...)
            }
            return line
        })).
        WriteTo(dst)
    return err
}

Background

Some of the ideas in this package started in sourcegraph/run, which started as a project trying to build utilities that made it easier to write bash-esque scripts using Go - namely being able to do things you would often to in scripts such as grepping and iterating over lines. streamline generalizes on the ideas used in sourcegraph/run for working with command output to work on arbitrary inputs, and sourcegraph/run now uses streamline internally.