Lazy Pipelines with JavaScript Generators

Generators are one of the most powerful new features in JavaScript. In essence, they are functions that can be paused and resumed.

With this power to jump in and out of contexts, we can design interesting pipeline APIs that allow us to skip forward and back along a processing chain, only passing along the values that are needed.

Let’s consider an example where we have a collection of randomly generated books with star ratings and we want to pick the first three books with 5 stars.

const books = [
  { title: "Anger Of Silver", rating: 5 },
  { title: "Rat Of Freedom", rating: 3 },
  { title: "Savior Of Dread", rating: 5 },
  { title: "Thieves Of Heaven", rating: 4 },
  { title: "Agents Of Hope", rating: 3 },
  { title: "Edge Of Insanity", rating: 5},
  { title: "Blacksmiths And Friends", rating: 2 },
  { title: "Vultures And Warriors", rating: 4 },
  { title: "Intention Of Freedom", rating: 3 },
  { title: "Searching At The City", rating: 5 },
  { title: "Separated In The Mountains", rating: 2 },
  { title: "Family Of The Banished", rating: 5 },
  { title: "Blinded In My Destiny", rating: 4 },
  { title: "Memory Of The Depths", rating: 3 }
]

Using a pipeline API, we want to chain each transformation in a left-to-right sequence with an API like the following.

pipe(books).where((b) => b.rating == 5).take(3).all()

If we implement this as normal imperative code with strict sequential evaluation, each step will run synchronously to completion before returning its result to the next step in the chain.

When we call where on pipe(books), it will iterate through each item in the collection, returning all items that match the predicate, then pass this new filtered collection on to take, where the first three items are selected.

A very basic implementation of a strict evalation pipeline might look like this.

class Pipe {
  constructor(sequence) {
    this.sequence = sequence
  }

  where(predicate) {
    return new Pipe(this.sequence.filter(predicate))
  }

  take(count) {
    return new Pipe(this.sequence.slice(0, count))
  }

  all() {
    return this.sequence
  }
}

const pipe = (sequence) => new Pipe(sequence)

With this approach, even though we only want the first three items, we always end up filtering all the items in the initial collection. When we’re dealing with very large collections or situations where filtering is expensive, we end up doing a lot of extra work.

In some cases, what we really want from a pipeline API is to push items one at a time through each step in the chain. This gives us a correct result while avoiding wasted processing on entries in earlier steps that get discarded in later steps.

A related benefit of lazy evaluation is that we can avoid allocating temporary arrays all over the place as we step along the chain. This reduces memory usage and will significantly improve performance when processing larger collections.

Libraries like Lodash, LazyJS and others already support lazy chaining in this way. Using generators, we can implement this pattern ourselves in just a few lines of code.

Starting with our existing API scaffolding, we can change the Pipe into an iterable object that uses the Symbol.iterator protocol to provide an iterator over the wrapped sequence it gets given.

class Pipe {
  constructor() {
    this.sequence = sequence[Symbol.iterator]();
  }

  [Symbol.iterator]() {
    return this.sequence;
  }
}

To test that this is a valid iterator, we can drop it into a for-of loop and dump out each item in a given sequence.

for (book of pipe(books)) {
  console.log(book)
}

To pass values along the pipeline using yield, we can pass an instance of a generator wrapped in an iterator to the pipe constructor in place of a raw collection. Within instances of Pipe, this.sequence will always be some kind of iterator—a custom generator we’ve provided will be supported in exactly the same way as any built-in iterator.

Since we’re going to need to use this same pattern for each chained method, we can wrap the construction in a factory function.

const step = (generator) => new Pipe({[Symbol.iterator]: generator})

Now we can implement a generator for the where filter. This needs to iterate through each item in the sequence and test it against the given predicate function. When the predicate returns true, we yield the item. Returning false means we skip the item.

class Pipe {
  // ...

  where(predicate) {
    const sequence = this.sequence
    return step(function*() {
      for (const item of sequence) {
        if (predicate(item)) yield item
      }
    })
  }
}

Implementing take is similar. Unlike our strict evaluation version, we can’t conveniently call Array.slice as we will only be operating on one value at a time when the previous step in the chain yields to this step. The most straightforward way around this is to use a traditional imperative loop with a counter which gets incremented each time the generator resumes.

class Pipe {
  // ...

  take(count) {
    const sequence = this.sequence
    return step(function*() {
      for (let i=0; i < count; i++) {
        yield sequence.next().value
      }
    })
  }
}

The final change is to ensure we can convert the sequence being wrapped by the pipeline back into an array we can use at the end.

class Pipe {
  // ...

  all() {
    return Array.from(this.sequence)
  }
}

Now we can run our original pipeline, but only push values through the chain as needed.

pipe(books).where((b) => b.rating == 5).take(3).all()

When we call where on pipe(books), it will yield each item matching the predicate to take. When take reaches the given count then execution is halted and the pipeline completes.

The other big win here is deferred execution. If we don’t force the values out of the pipeline by calling all to convert the iterator to an array, then nothing is processed at all. The pipeline is truly lazy. It only yields values when they’re asked for. This makes it easy to prepare complex queries in advance and execute them later when needed.

To explore this code running live, I’ve set up the strict evaluation example and the lazy evaluation example on JSFiddle with logging of each predicate filter call so you can compare how many times the predicate function runs.

The code shown here isn’t much more than a toy at this stage, but you should be able to see how it could be used as the foundation for more specialised pipeline APIs.

As a final improvement, we can add more structure to the step chaining API to provide a general purpose representation that can be used as the foundation for building more complex behaviour.

Following standard patterns in JavaScript with some influence from the Gremlin API, we can identify a small set of higher-order functions which nearly all other steps are specialisations of.

  • Map: map the input item to a transformed output item and pass it on to the next step
  • Filter: evaluate to either true or false for the input item and only pass it on to the next step when true
  • Each: process the input item with side-effects that don’t influence the output to the next step

map requires a transform function that transforms the input item to a new output value.

class Pipe {
  // ...

  map(transform) {
    const sequence = this.sequence
    return step(function*() {
      for (const item of sequence) {
        yield transform(item)
      }
    })
  }
}

filter requires a predicate function that returns true or false for any given item.

class Pipe {
  // ...

  filter(predicate) {
    const sequence = this.sequence
    return step(function*() {
      for (const item of sequence) {
        if (predicate(item)) yield item
      }
    })
  }
}

each requires a function that may cause side-effects and doesn’t alter the output value.

class Pipe {
  // ...

  each(operation) {
    const sequence = this.sequence
    return step(function*() {
      for (const item of sequence) {
        operation(item)
        yield item
      }
    })
  }
}

In most pipeline APIs, steps are going to be specialisations of one of these types. This offers opportunities for composition and reuse. For example, a reject step that performs the opposite of a standard filter can be constructed by negating the passed-in predicate, rather than reimplementing the original filter function.

class Pipe {
  // ...

  reject(predicate) {
    return this.filter((val) => !predicate(val))
  }
}

Whether or not to compose and reuse is going to depend on the specificity of the API, the range of different behaviours needed and the complexity of wrapping vs reimplementing.

This is still experimental and a work in progress. As far as I know, cooperative scheduling isn’t really the standard way to implement lazy pipelines in JavaScript (it’s more commonly done with carefully designed function composition), but this does seem to run as expected and the code is lightweight and easy to maintain.

If you’re interested in applying this technique in Ruby, I have a more developed example of the same lazy pipeline approach in a Gremlin inspired graph processing API, which uses fibers rather than generators (fibers are Ruby’s standard abstraction for cooperative scheduling).

I plan to port this in-memory graph API to JavaScript as I have a lot of things I want to do with graphs in the browser so making graph queries and traversals as straightforward and intuitive as possible is of great value in my creative work.