Introducing Go-Pipeline

Introducing Go-Pipeline

The title may be a bit grandiose, but after playing around with a concept of “channel-middleware”, much back and forth, and polishing of the idea, we can now introduce Go-Pipeline as an open-source library.

“Great!” I can hear you say, “but what is Go-Pipeline?” Go-Pipeline simply provides some assisting interfaces and helper methods for writing go code that relies heavily on channels and chaining channels. As an approach, it certainly isn’t a one-size-fits-all solution, or even a one-size-fits-most. Its use of channels may actually introduce some performance hit from the underlying locking, it can sometimes be hard to reason about non-synchronous code, and, perhaps most importantly, the use of interface places the burden of type safety on the developer rather than the compiler, which is almost certainly the biggest drawback of this library.

After such rave reviews, I’m sure you’re anxious to hear more and get started, but allow me to say that the real utility of the pipeline actually comes in when you’re dealing with a multiplicity of streaming data that needs varying degrees of preprocessing before coalescing into similar outputs. It’s upside isn’t in performance, or safety, but in legibility.

The goal, in short, is to build data pipelines.

The interfaces

// Runner interface exposes functions that take in a chan interface{}
// and outputs to a chan interface{}
type Runner interface {
	Run(chan interface{}) chan interface{}
}

// Operator aliases a function that takes one input channel and one output channel
type Operator func(chan interface{}, chan interface{})
// Flow is a slice of Operators that can be applied in sequence
type Flow []Operator

The Runner interface is the “heart” of the library. It’s simply anything with a Run method that takes and emits a channel of interface{}. The Operator type takes two channels of interface{}, one input, and one output, while Flow is a slice of Operator.

The secret sauce

The secret sauce that ties these together is really Operator’s Run method below.

// Run takes an input channel, and a series of operators, and uses the output
// of each successive operator as the input for the next. This makes the Operator
// implement the Runner interface
func (o Operator) Run(in chan interface{}) chan interface{} {
	out := make(chan interface{})
	go func() {
		o(in, out)
		close(out)
	}()
	return out
}

Pretty simple right? Now for a look at Flow’s Run method below:

// Run takes an input channel and runs the operators in the slice in order.
// This makes Flow implement the Runner interface
func (f Flow) Run(in chan interface{}) chan interface{} {
	for _, m := range f {
		in = m.Run(in)
	}
	return in
}

I’m going to walk through these, as simplistic as they may be. In Flow.Run for the first Operator in the underlying slice, it is applied to the input, and its output channel interface{} is then assigned to be the input for the next operator in the slice, with the final output corresponding to the output of the method.

In the case of Operator it is simply applied to the input, and it returns output channel, after which it closes the output channel. This last piece is in fact crucial, because it is what allows the channels to ultimately run in concert by simply closing the original input channel.

Example

Let’s look at a simple example from the project tests to see how these constructs would work in practice.

Word Count

If you have ever played with Hadoop you should be familiar with the classic “word count”. It is basically the “hello world” of the map-reduce variety.

It takes a “document” (collection of words) and outputs each word along with how many times it occurred.

First we need to create some intermediary type for our Operator to operate on. This will hold the word along with how many times we saw it.

type tuple struct {
	token string
	count int
}

Next we create the operator that implements the “mapper” portion of word count.

// wordCount is an operator that takes in strings (words) and emits a tuple of (word, 1)
wordCount := pipeline.Operator(func(in chan interface{}, out chan interface{}) {
	for word := range in {
		out <- tuple{word.(string), 1}
	}
})

Then we create the operator that aggregates all of these counts together (the “reducer”)

// countAggregator takes in tuples and aggregates their counts. Outputs
// the word and count output as a string.
countAggregator := pipeline.Operator(func(in chan interface{}, out chan interface{}) {
	counts := make(map[string]int)
	for t := range in {
		counts[t.(tuple).token] += t.(tuple).count
	}
	for word, count := range counts {
		out <- fmt.Sprintf("%s appears %d times", word, count)
	}
})

Thats it! Now you just have to start up the flow and feed in your document.

// Launch the word count Flow
input := make(chan interface{})
wordCountFlow := pipeline.NewFlow(wordCount, countAggregator)
output := wordCountFlow.Run(input)

// Feed in the input document
document := "the quick fox jumps over the lazy brown dog fox fox"
for _, word := range strings.Split(document, " ") {
	input <- word
}
// Signal that we are done submitting input
close(input)

// Read the output
for result := range output {
	fmt.Println(result)
}

// Output:
// jumps appears 1 times
// over appears 1 times
// lazy appears 1 times
// brown appears 1 times
// dog appears 1 times
// the appears 2 times
// quick appears 1 times
// fox appears 3 times

You can view this full example on Godoc

Errors

What about errors, one of the most ubiquitous (and best) parts of Go programming? The answer is obviously to not handle them! Just kidding, errors should ALWAYS be handled. One solution that we’ve used is the classic errCh channel error, with a little extra handling in between. For example, we did something like this

// flushError puts an error (if it is not nil) onto the error channel, and
// returns a bool indicating if the error was non-nil
func flushError(err error, errMessage string) bool {
	err = errors.Wrap(err, errMessage)
	hasErr := err != nil
	if hasErr {
		errCh <- err
	}
	return hasErr
}

This allows us to add some context to the error and lets us know if the error was non-nil. The calls would look something like this:

	for r := range in {
		// publish
		rule, ok := r.(myPbType)
		if !ok {
			_ = s.flushError(errors.Errorf("expected myPbType, got wrong type: %T", r), "error processing input")
			continue
		}
		b, err := rule.Bytes()
		if s.flushError(err, "error marshaling rule") {
			continue
		}
		// the deferred published will be handled by the handler above
		err = processor.Process(b)
		s.flushError(err, "error processing rule")
	}

This example, which is completely contrived, shows how we can use something like flushError() to add some context to these errors being reported asynchronously, while also using the boolean to help us dictate logic.

We used a separate go func() in our main() to listen on errCh and to log the errors to stdout. Of course, different code bases and sets of logic might have to handle errors differently, but this is simply a suggestion as to how one may go about it.

A Uruguayan lost in Colorado climbing, snowboarding, and coding to pass the time

Denver, CO https://bogacz.io

General problem solver. Freelance beer practitioner. Prone to coding all day.

Castle Rock, CO https://michaelrobinson.io