Creating pipelines in Go or; How I took an runaway idea about channels, too, too far.

The Idea

Ideas are hard to come by, or at least, interesting ideas are. I think we’ve all found ourselves having moments of perceived brilliance, only to realize the lack of originality, or, for that matter, brilliance. Every now and then, however, we’re stuck by something that has some promise. It’s hard to hold on to these fleeting moments of understanding, and many people have different ways in which they deal with them (carrying a note pad everywhere, including the shower… yeah I know, I think it’s weird too). This time, I was lucky to be in a meeting room drawing up a design for an interesting project at zvelo, with my friend and co-worker Michael Robinson. We were discussing a distributed system that had to consume several inputs, de-duplicate, buffer, and massage them in order to update the Source Of Truth, AKA, the backup for all of our datasets. You know, no biggie.

As we were diagramming things on the board, I blurted out, “Well, what if we could hook up different channels together?” to which Michael replied “Yeah, like channel middleware.”

Channel middleware

We kept diagramming and figuring out which were the different components that we’d need, but the pattern wouldn’t leave my head. I wrote up a quick proof of concept to run past the rest of the team (all of whom I’m constantly in awe of), to see if it was, in fact, the worst idea I’d ever had.

package main

import "fmt"

func main() {

	type Mapper func(int) (int, bool)

	var applyMapper = func(in chan int, mappers ...Mapper) chan int {
		var out chan int
		for _, m := range mappers {
			out = make(chan int)
			go func(m Mapper, in chan int, out chan int) {
				for n := range in {
					if val, ok := m(n); ok {
						out <- val
					}
				}
				close(out)
			}(m, in, out)
			in = out
		}
		return out
	}

	var doubler = func(n int) (int, bool) {
		return n * 2, true
	}

	var ifOdd = func(n int) (int, bool) {
		return n, n%2 == 1
	}

	in := make(chan int)

	out := applyMapper(in, ifOdd, doubler)

	go func() {
		for i := 0; i < 10; i++ {
			in <- i
		}
		close(in)
	}()

	for n := range out {
		fmt.Println(n)
	}
}

Try it out in the Go Playground.

We have type Mapper func(int) (int, bool) that takes an int, returns an int and a bool indicating whether the result should be passed on.

Keep in mind, this was only a POC, to see how the chaining would look in practice. Defining functions as vars and such is certainly not the most idiomatic approach, but it was an easy way to get my thoughts into the playground.

out := applyMapper(in, ifOdd, doubler) isn’t ideal, but it gets the idea across.

Given some input, we filter out the odds and double them. For a first pass at putting our idea down on paper, it actually made me more optimistic about it. That being said, I wanted to cleanup the design a bit. And by a bit I obviously mean a lot.

The above code works, however it’s not very clean to have to return a value and an indicator as to whether it should be passed on. If only there was some type of data structure that we could stream values onto…

Round Two

package main

import (
	"fmt"
	"sync"
)

func main() {

	type Operator func(chan int, chan int)

	var fanIn = func(ins ...chan int) chan int {
		out := make(chan int)
		wg := &sync.WaitGroup{}
		wg.Add(len(ins))
		for _, in := range ins {
			go func(in chan int) {
				for n := range in {
					out <- n
				}
				wg.Done()
			}(in)
		}
		go func() {
			wg.Wait()
			close(out)
		}()
		return out
	}

	var apply = func(in chan int, operators ...Operator) chan int {
		var out chan int
		for _, m := range operators {
			out = make(chan int)
			go func(m Operator, in chan int, out chan int) {
				m(in, out)
				close(out)
			}(m, in, out)
			in = out
		}
		return out
	}

	var multiplier = func(x float64) Operator {
		return Operator(func(in chan int, out chan int) {
			for n := range in {
				out <- int(float64(n) * x)
			}
		})
	}

	var ifOdd = Operator(func(in chan int, out chan int) {
		for n := range in {
			if n%2 == 1 {
				out <- n
			}
		}

	})

	var summer = Operator(func(in chan int, out chan int) {
		total := 0
		for n := range in {
			total += n
		}
		out <- total
	})

	in1 := make(chan int)
	in2 := make(chan int)

	out := apply(fanIn(
					apply(in1, ifOdd, multiplier(4.3), summer),
					apply(in2, ifOdd, multiplier(4.3), summer)),
				 summer)

	go func() {
		for i := 0; i < 10; i++ {
			in1 <- i
			in2 <- i
		}
		close(in1)
		close(in2)
	}()

	for n := range out {
		fmt.Println(n)
	}
}

Try it in the Playground.

So now the design is a bit more elegant. We’ve replaced Mapper with Operator, taking one channel for input and another for output. apply gets simplified with the channel approach and is more legible, and we added fanIn as a kind of convenience method. Since we’re operating on channels, it seemed logical to want to pool data in together. Also, I do want to point out something that may be obvious, but is very crucial to sequence of operations in this code, and that’s apply’s closing of the output channel once each operator is done running. The channel close combined with the use of for range in the operators gives us a cascading close once the input channel itself is closed. It is a different approach than to use a select with a stop channel or something of the sort, but I think it’s a cleaner approach in this particular instance.

So, all of this trial an error, ultimately led to go-pipeline, which is introduced in this post!

A Uruguayan lost in Colorado climbing, snowboarding, and coding to pass the time

Denver, CO https://bogacz.io