In the previous post, I developed a class called PipelineElement. This made it relatively easy to create elements that act as producers and filters in a programmatic pipeline. Using it, we could write Ruby 1.9 code like:
10.times do
puts (evens | multiples_of_three | multiples_of_seven).resume
end
The construct in the loop is a pipeline containing three chunks of code: a generator of even numbers, a filter that only passes multiples of three, and another filter that passes multiples of seven. Numbers are passed from the producer to the first filter, and then from that filter to the next, until finally popping out and being made available to puts.
However, creating these pipeline elements is still something of a pain. It turns out that we can simplify things when it comes to creating filters. In the implementation I'll show here, we'll only handle the case of simple transforming filters—filters that take an input, do something to it, and write the result to the filter chain.
Let's revisit the PipelineElement class
class PipelineElement
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
process
end
end
def |(other)
other.source = self
other
end
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)
end
end
def handle_value(value)
output(value)
end
def input
source.resume
end
def output(value)
Fiber.yield(value)
end
end
The process method is the driving loop. It reads the next input from the pipeline, then calls handle_value to deal with it. In the base class, handle_value simply echoes the input to the output-real filters subclass PipelineElement and subclass this method.
Let's make a small change to the handle_value method.
def handle_value(value)
output(transform(value))
end
def transform(value)
value
end
By doing this, we've split the transformation of the incoming value into a separate method. And the work done by this method no longer uses any of the state in the PipelineElement class to allow this. We'll have the constructor take an optional block, and we'll use that block in preference to the transform. Here's another listing, showing just the changed methods.
class PipelineElement
def initialize(&block)
@transformer = block || method(:transform)
@fiber_delegate = Fiber.new do
process
end
end
# ...
def handle_value(value)
output(@transformer.call(value))
end
end
This illustrates a cool (and underused) feature of Ruby. Method objects (created with the method(...) call) are duck-typed with proc objects: we can use .call(params) on both. This is a great way of letting users of a class change its behavior either by subclassing and overriding a method, or by simply passing in a block.
With this change in place, we can now write transforming filters using blocks. This is a lot more compact that the previous subclassing approach.
class Evens < PipelineElement
def process
value = 0
loop do
output(value)
value += 2
end
end
end
evens = Evens.new
tripler = PipelineElement.new {|val| val * 3}
incrementer = PipelineElement.new {|val| val + 1}
5.times do
puts (evens | tripler | incrementer ).resume
end
This outputs 1, 7, 13, 19, and 25.
Different Kinds of Filter
This approach works well if all we want is transforming filters. But what if we would also like to simplify filters that either pass of don't pass values based on some criteria? A block would seem like a great way of specifying the condition, but we've already used our one block parameter up. Subclassing to the rescue. We can create two subclasses, Transformer and Filter. One sets the @transformer instance variable to any block it is passed. The other sets @filter. Here's the relevant code:
class PipelineElement
attr_accessor :source
def initialize(&block)
@transformer ||= method(:transform)
@filter ||= method(:filter)
@fiber_delegate = Fiber.new do
process
end
end
# ...
def handle_value(value)
output(@transformer.call(value)) if @filter.call(value)
end
def transform(value)
value
end
def filter(value)
true
end
end
class Transformer < PipelineElement
def initialize(&block)
@transformer = block
super
end
end
class Filter < PipelineElement
def initialize(&block)
@filter = block
super
end
end
Thus equipped, we can write:
tripler = Transformer.new {|val| val * 3}
incrementer = Transformer.new {|val| val + 1}
multiple_of_five = Filter.new {|val| val % 5 == 0}
5.times do
puts (evens | tripler | incrementer | multiple_of_five ).resume
end
Moving The Blocks Inline
Our final hack lets us move the blocks directly into the pipeline.
Let's look at the actual pipeline code:
puts (evens | tripler | incrementer | multiple_of_five ).resume
Those pipe characters are simply calls to the | method in class PipelineElement. And methods can take block arguments, right? So what stops us writing
puts (evens | {|v| v*3} | {|v| v+1} | multiple_of_five ).resume
It turns out that Ruby stops us. The brace characters are taken to be hash parameters, not blocks, so Ruby gets its knickers in a twist. Fortunately, that's easily fixed by making the method calls explicit.
puts (evens .| {|v| v*3} .| {|v| v+1} .| multiple_of_five ).resume
Now we just need to make the | method accept an optional block. If the block is present, we use it to create a new transformer.
def |(other=nil, &block)
other = Transformer.new(&block) if block
other.source = self
other
end
Ruby 1.9 lets you chain method calls across lines, so we can tidy up our pipeline visually.
5.times do
puts (evens
.| {|v| v*3}
.| {|v| v+1}
.| multiple_of_five
).resume
end
A Palindrome Finder
Let's finish with another trivial example. We'll create a generic producer class that takes a collection and passes it, one element at a time, into the pipeline.
class Pump < PipelineElement
def initialize(source)
@source = source
super()
end
def process
@source.each {|item| Fiber.yield item}
nil
end
end
Now we can write a simple palindrome finder (a palindrome is a word which is the same when spelled backwards).
words = Pump.new %w{Madam, the civic radar rotator is not level.}
is_palindrome = Filter.new {|word| word == word.reverse}
pipeline = words .| {|word| word.downcase.tr("^a-z", '') } .| is_palindrome
while word = pipeline.resume
puts word
end
This outputs: madam, civic, radar, rotator, level.
But what if we instead want to show each word in the input stream, and flag it if it is a palindrome? That's easily done, but we won't do it the easy way. Instead, let's show a more convoluted method, because it might be useful in the general case.
There's no law to say that a transformer that receives a string as input has to write a string as output. It could, if it wanted to, write an array. Or a structure. So we could write:
WordInfo = Struct.new(:original, :forwards, :backwords)
words = Pump.new %w{Madam, the civic radar rotator is not level.}
normalize = Transformer.new {|word| [word, word.downcase.tr("^a-z", '')] }
to_word_info = Transformer.new do |word, normalized|
reversed = normalized.reverse
WordInfo.new(word, normalized, reversed)
end
formatter = Transformer.new do |word_info|
if word_info.forwards == word_info.backwords
"'#{word_info.original}' is a palindrome"
else
"'#{word_info.original}' is not a palindrome"
end
end
pipeline = words | normalize | to_word_info | formatter
while word = pipeline.resume
puts word
end
This outputs
'Madam,' is a palindrome
'the' is not a palindrome
'civic' is a palindrome
'radar' is a palindrome
'rotator' is a palindrome
'is' is not a palindrome
'not' is not a palindrome
'level.' is a palindrome
So, What's the Point?
Is this a great way of writing a palindrome finder? Not really. But...
What we've done here is turned the way a program works on it's head. We've written chunks of isolated code, each of which either filters or transforms an input. We've then independently knitted these chunks together. That's a high degree of decoupling. We can also leave it until runtime to determine what gets put into the pipeline (and the order that it appears in the pipeline), which means we can move more power into the hands of our users.
Could we have done all this without Fibers? Of course. Could we do it without Ruby 1.9? Absolutely. But sometimes factors come together which lead us to experiment with new ways of thinking about our code.
This pipeline stuff is not revolutionary, and it isn't generally applicable. But it's fun to play with. And, for me, that's the main thing.
A Wee Postscript
All this content is stuff that I decided not to include in the third edition of the PickAxe. It didn't work in the section on fibers, because it uses programming techniques not yet covered. It didn't work later because, as an example of various programming techniques, it is just too long.




Welcome to functional programming. Try a better language.
Posted by: Gabe | January 03, 2008 at 12:30 AM
Regarding your comment: "Method objects (created with the method(...) call) are duck-typed with proc objects: we can use .call(params) on both." What is the self object in method objects called like this? Is "method" automatically creating a closure for the current receiver?
Posted by: Paolo Bonzini | January 03, 2008 at 06:24 AM
Gabe:
I try lots of different languages. In fact, Haskell was my recommended "language of the year" several years ago.
Paolo:
A bound method, which this is, acts as a closure.
Dave
Posted by: Dave Thomas | January 03, 2008 at 09:08 AM
Thanks for the rest of the story, Dave.
(Suggested edit: s/let's us/let us/
Posted by: Leif | January 03, 2008 at 10:05 AM
This post, like the previous one, has a [/PipelineElement] instead of a [/code], breaking formatting in Safari.
Posted by: Ahruman | January 04, 2008 at 10:04 AM
It's a little disappointing this Fiber stuff won't be included in the new Pickaxe.
Posted by: Tobin | January 18, 2008 at 08:10 AM
Tobin:
It's already in there (and I'm adding symmetric fibers and coroutines before I finish) ... it's just that this particular escapade was too long, and didn't really fit in any of the flows. When you're bumping 900 pages, you always have to be thinking about the contribution stuff makes.
Posted by: Dave Thomas | January 18, 2008 at 09:11 AM
Great pair of posts. One more nit:
s/it's head/its head/
Posted by: Art Vandelay | January 19, 2008 at 04:17 AM
I like seeing the functional style in an object oriented language. Gabe's hit-and-run comment seems a little juvenile, since we choose our tools based on our project. The nice thing seems to be that every time I pick up a tool, both me and my code get better.
I've "borrowed" this code wholesale for a while. I want to play around with it and make it my own. I thought I'd refactor a KMeans implementation with this kind of style. I hope that doesn't walk on your toes, Dave.
Thanks for your constant contribution.
Posted by: David Richards | March 05, 2009 at 08:49 PM
Aside from cool factor, what's the advantage to using Fibers to build the pipeline? I'm about to implement something a lot like this, but I'm not sure there's a lot of payoff to doing it this way as opposed to the obvious way.
Posted by: Mediocretes | January 10, 2012 at 08:45 AM