Transform and handle your data, line by line.
go get go.bobheadxi.dev/streamline
streamline
offers a variety of primitives to make working with data line by line a breeze:
streamline.Stream
offers the ability to add hooks that handle anio.Reader
line-by-line with(*Stream).Stream
,(*Stream).StreamBytes
, and other utilities.pipeline.Pipeline
offers a way to build pipelines that transform the data in astreamline.Stream
, such as cleaning, filtering, mapping, or sampling data.jq.Pipeline
can be used to map every line to the output of a JQ query, for example.streamline.Stream
implements standardio
interfaces likeio.Reader
, sopipeline.Pipeline
can be used for general-purpose data manipulation as well.
pipe.NewStream
offers a way to create a buffered pipe between a writer and aStream
.streamexec.Start
uses this to attach aStream
to anexec.Cmd
to work with command output.
When working with data streams in Go, you typically get an io.Reader
, which is great for arbitrary data - but in many cases, especially when scripting, it's common to either end up with data and outputs that are structured line by line, or want to handle data line by line, for example to send to a structured logging library. You can set up a bufio.Reader
or bufio.Scanner
to do this, but for cases like exec.Cmd
you will also need boilerplate to configure the command and set up pipes, and for additional functionality like transforming, filtering, or sampling output you will need to write your own additional handlers. streamline
aims to provide succint ways to do all of the above and more.
bufio.Scanner |
streamline/streamexec |
---|---|
func PrefixOutput(cmd *exec.Cmd) error {
reader, writer := io.Pipe()
cmd.Stdout = writer
cmd.Stderr = writer
if err := cmd.Start(); err != nil {
return err
}
errC := make(chan error)
go func() {
err := cmd.Wait()
writer.Close()
errC <- err
}()
s := bufio.NewScanner(reader)
for s.Scan() {
println("PREFIX: ", s.Text())
}
if err := s.Err(); err != nil {
return err
}
return <-errC
} |
func PrefixOutput(cmd *exec.Cmd) error {
stream, err := streamexec.Start(cmd)
if err != nil {
return err
}
return stream.Stream(func(line string) {
println("PREFIX: ", line)
})
} |
bufio.Scanner |
streamline |
---|---|
func GetMessages(r io.Reader) error {
s := bufio.NewScanner(r)
for s.Scan() {
var result bytes.Buffer
cmd := exec.Command("jq", ".msg")
cmd.Stdin = bytes.NewReader(s.Bytes())
cmd.Stdout = &result
if err := cmd.Run(); err != nil {
return err
}
print(result.String())
}
return s.Err()
} |
func GetMessages(r io.Reader) error {
return streamline.New(r).
WithPipeline(jq.Pipeline(".msg")).
Stream(func(line string) {
println(line)
})
} |
bufio.Scanner |
streamline |
---|---|
func PrintEvery10th(r io.Reader) error {
s := bufio.NewScanner(r)
var count int
for s.Scan() {
count
if count%10 != 0 {
continue
}
println(s.Text())
}
return s.Err()
} |
func PrintEvery10th(r io.Reader) error {
return streamline.New(r).
WithPipeline(pipeline.Sample(10)).
Stream(func(line string) {
println(line)
})
} |
This particular example is a somewhat realistic one - GCP Cloud SQL cannot accept pgdump
output that contains certain EXTENSION
-related statements, so to pgdump
a PostgreSQL database and upload the dump in a bucket for import into Cloud SQL, one must pre-process their dumps to remove offending statements.
bufio.Scanner |
streamline |
---|---|
var unwanted = []byte("COMMENT ON EXTENSION")
func Upload(pgdump *os.File, dst io.Writer) error {
s := bufio.NewScanner(pgdump)
for s.Scan() {
line := s.Bytes()
var err error
if bytes.Contains(line, unwanted) {
_, err = dst.Write(
// comment out this line
append([]byte("-- "), line...))
} else {
_, err = dst.Write(line)
}
if err != nil {
return err
}
}
return s.Err()
} |
var unwanted = []byte("COMMENT ON EXTENSION")
func Upload(pgdump *os.File, dst io.Writer) error {
_, err := streamline.New(pgdump).
WithPipeline(pipeline.Map(func(line []byte) []byte {
if bytes.Contains(line, unwanted) {
// comment out this line
return append([]byte("-- "), line...)
}
return line
})).
WriteTo(dst)
return err
} |
Some of the ideas in this package started in sourcegraph/run
, which started as a project trying to build utilities that made it easier to write bash-esque scripts using Go - namely being able to do things you would often to in scripts such as grepping and iterating over lines. streamline
generalizes on the ideas used in sourcegraph/run
for working with command output to work on arbitrary inputs, and sourcegraph/run
now uses streamline
internally.