Users of the command line are familiar with the idea of building pipelines: a chain of simple commands strung together to the output of one becomes the input of the next. Using pipelines and a basic set of primitives, shell users can accomplish some sophisticated tasks. Here's a basic Unix shell pipeline that reports the ten longest .tip files in the current directory, based on the number of lines in each file:
wc -l *.tip | grep \.tip | sort -n | tail -10
Let's see how to add something similar to Ruby. By the end of this set of two articles, we'll be able to write things like
puts (even_numbers | tripler | incrementer | multiple_of_five ).resume
and a palindrome finder using blocks:
words = Pump.new %w{Madam, the civic radar rotator is not level.}
is_palindrome = Filter.new {|word| word == word.reverse}
pipeline = words .| {|word| word.downcase.tr("^a-z", '') } .| is_palindrome
while word = pipeline.resume
puts word
end
Great code? Nope. But getting there is fun. And, who knows? The techniques might well be useful in your next project.
A Daily Dose of Fiber
Ruby 1.9 adds support for Fibers. At their most basic, let you create simple generators (much as you could do previously with blocks. Here's a trivial example: a fiber that generates successive Fibonacci numbers:
fib = Fiber.new do
f1 = f2 = 1
loop do
Fiber.yield f1
f1, f2 = f2, f1 + f2
end
end
10.times { puts fib.resume }
A fiber is somewhat like a thread, except you have control over when it gets scheduled. Initially, a fiber is suspended. When you resume it, it runs the block until the block finishes, or it hits a Fiber.yield. This is similar to a regular block yield: it suspends the fiber and passes control back to the resume. Any value passed to Fiber.yield becomes the value returned by resume.
By default, a fiber can only yield back to the code that resumed it. However, if you require the "fiber" library, Fibers get extended with a transfer method that allows one fiber to transfer control to another. Fibers then become fully fledged coroutines. However, we won't be needing all that power today.
Instead, let's get back to the idea of creating pipelines of functionality in code, much as you can create pipelines in the shell.
As a starting point, let's write two fibers. One's a generator—it creates a list of even numbers. The second is a consumer. All it does it accept values from the generator and print them. We'll make the consumer stop after printing 10 numbers.
evens = Fiber.new do
value = 0
loop do
Fiber.yield value
value += 2
end
end
consumer = Fiber.new do
10.times do
next_value = evens.resume
puts next_value
end
end
consumer.resume
Note how we had to use resume to kick off the consumer. Technically, the consumer doesn't have to be a Fiber, but, as we'll see in a minute, making it one gives us some flexibility.
As a next step, notice how we've created some coupling in this code. Our consumer fiber has the name of the evens generator coded into it. Let's wrap both fibers in a method, and pass the name of the generator into the consumer method.
def evens
Fiber.new do
value = 0
loop do
Fiber.yield value
value += 2
end
end
end
def consumer(source)
Fiber.new do
10.times do
next_value = source.resume
puts next_value
end
end
end
consumer(evens).resume
OK. Let's add one more fiber to the weave. We'll create a filter that only passes on numbers that are multiples of three. Again, we'll wrap it in a method.
def evens
Fiber.new do
value = 0
loop do
Fiber.yield value
value += 2
end
end
end
def multiples_of_three(source)
Fiber.new do
loop do
next_value = source.resume
Fiber.yield next_value if next_value % 3 == 0
end
end
end
def consumer(source)
Fiber.new do
10.times do
next_value = source.resume
puts next_value
end
end
end
consumer(multiples_of_three(evens)).resume
Running this, we get the output
0
6
12
18
. . .
This is getting cool. We write little chunks of code, and then combine them to get work done. Just like a pipeline. Except...
We can do better. First, the composition looks backwards. Because we're passing methods to methods, we write
consumer(multiples_of_three(evens))
Instead, we'd like to write
evens | multiples_of_three | consumer
Also, there's a fair amount of duplication in this code. Each of our little pipeline methods has the same overall structure, and each is coupled to the implementation of fibers. Let's see if we can fix this.
Wrapping Fibers
As is usual when we're refactoring towards a solution, we're about to get really messy. Don't worry, though. It will all wash off, and we'll end up with something a lot neater.
First, let's create a class that represents something that can appear in our pipeline. At it's heart is the process method. This reads something from the input side of the pipe, then "handles" that value. The default handling is to write that value to the output side of the pipeline, passing it on to the next element in the chain.
class PipelineElement
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
process
end
end
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)
end
end
def handle_value(value)
output(value)
end
def input
source.resume
end
def output(value)
Fiber.yield(value)
end
end
When I first wrote this, I was tempted to make PipelineElement a subclass of Fiber, but that leads to coupling. In the end, the pipeline elements delegate to a separate Fiber object.
The first element of the pipeline doesn't receive any input from prior elements (because there are no prior elements), so we need to override its process method.
class Evens < PipelineElement
def process
value = 0
loop do
output(value)
value += 2
end
end
end
evens = Evens.new
Just to make things more interesting, we'll create a generic MultiplesOf filter, so we can filter based on any number, and not just 3:
class MultiplesOf < PipelineElement
def initialize(factor)
@factor = factor
super()
end
def handle_value(value)
output(value) if value % @factor == 0
end
end
multiples_of_three = MultiplesOf.new(3)
multiples_of_seven = MultiplesOf.new(7)
Then we just knit it all together into a pipeline:
multiples_of_three.source = evens
multiples_of_seven.source = multiples_of_three
10.times do
puts multiples_of_seven.resume
end
We get 0, 42, 84, 126, 168, and so on as output. (Any output stream that contains 42 must be correct, so no need for any unit tests here.)
But we're still a little way from our ideal of being able to pipe these puppies together. It's a good thing that Ruby let's us override the "|" operator. Up in class PipelineElement, define a new method:
def |(other)
other.source = self
other
end
This allows us to write:
10.times do
puts (evens | multiples_of_three | multiples_of_seven).resume
end
or even:
pipeline = evens | multiples_of_three | multiples_of_seven
10.times do
puts pipeline.resume
end
Cool, or what?
In The Next Thrilling Installment
The next post will take these basic ideas and tart them up a bit, allowing us to use blocks directly in pipelines. We'll also reveal why our PipelineElement class I just wrote is somewhat more complicated than might seem necessary. In the meantime, here's the full source of the code so far.
class PipelineElement
attr_accessor :source
def initialize
@fiber_delegate = Fiber.new do
process
end
end
def |(other)
other.source = self
other
end
def resume
@fiber_delegate.resume
end
def process
while value = input
handle_value(value)
end
end
def handle_value(value)
output(value)
end
def input
source.resume
end
def output(value)
Fiber.yield(value)
end
end
##
# The classes below are the elements in our pipeline
#
class Evens < PipelineElement
def process
value = 0
loop do
output(value)
value += 2
end
end
end
class MultiplesOf < PipelineElement
def initialize(factor)
@factor = factor
super()
end
def handle_value(value)
output(value) if value % @factor == 0
end
end
evens = Evens.new
multiples_of_three = MultiplesOf.new(3)
multiples_of_seven = MultiplesOf.new(7)
pipeline = evens | multiples_of_three | multiples_of_seven
10.times do
puts pipeline.resume
end




Nice one Dave. A truely ruby-tastic article!
Posted by: Mark | December 31, 2007 at 08:11 PM
Thank you for a delightful article. I can hardly wait for the next installment.
Posted by: Christophe Broult | January 01, 2008 at 06:03 AM
A great article. I need to read more about Ruby 1.9 new features especially Fibres and Threads. I recently read about Erlang / Haskell and wonder if Ruby could do the same distributed process / multi core / functional magic, any time in the future!? or that's totally another paradigm (I mean Functional programming) that Ruby programmers could not experience but a few of its real benifits.
Posted by: Ibrahim Ahmed | January 01, 2008 at 08:09 AM
Thanks for this post and for making me excited about 1.9, Dave. I'm looking forward to the rest of the story.
Posted by: Leif Wickland | January 01, 2008 at 10:50 AM
Excellent article as always.
Maybe I'm missing something but I think that the block in the first initialize "def initialize(&block)" is not used.
Posted by: Daniel Cadenas | January 01, 2008 at 11:14 AM
Daniel:
That's true: I've fixed it (we will use it tomorrow, though...)
Dave
Posted by: Dave Thomas | January 01, 2008 at 11:40 AM
Is this what Haskell programmers call lazy evaluation?
Posted by: yaxu | January 01, 2008 at 11:50 AM
Hmmm, it seems like fibers are (at least superficially) the equivalent of generators in python.
yaxu: Yes, using Fiber.yield is effectively lazy evaluation. It's a lot cleaner in Haskell though since it's built into the language.
Ibrahim: The thing that makes functional languages like Haskell and Erlang good at concurrent programming is the minimization of mutable state. Once you bind a value to a variable that value cannot change so it is safe for other threads to use it without having to worry about synchronization. It would be a pretty radical paradigm shift for ruby to pick up this property.
Posted by: brett | January 01, 2008 at 06:41 PM
Brett:
Fibers as shown here are basically generators. However, you can also turn them into full-blown coroutines and continuations.
The thing that's interesting to me about Ruby in this context is how much is can bend into multiple paradigms. Haskell does FP way better than Ruby. Smalltalk does OO (marginally) better. But Ruby does them all, and in a way that interoperates nicely.
Dave
Posted by: Dave Thomas | January 01, 2008 at 07:42 PM
Awesome article Dave, now I have to start using 1.9 more. I tested the difference performance-wise to 1.8.6 just using the fib example you gave (slightly modified). Some first run differences (not indicative to anything given cacheing etc...) Tested on stock ruby in 10.5.1 with security patches applied, 1.8.6 p100 I think is the 1.8 version.
$ time ruby fib.rb 40
102334155
ruby fib.rb 40 190.05s user 0.98s system 98% cpu 3:13.71 total
$ time ruby1.9 fib.rb 40
102334155
ruby1.9 fib.rb 40 49.24s user 0.30s system 98% cpu 50.526 total
$ time ruby1.9 fibfiber.rb 40
102334155
ruby1.9 fibfiber.rb 40 0.01s user 0.00s system 53% cpu 0.020 total
For completeness, those "other" languages from the benchmark folder (sans scheme, all modified just to allow first arg to be passed and to print the number):
$ time perl fib.pl 40
102334155
perl fib.pl 40 157.06s user 0.86s system 98% cpu 2:40.32 total
$ time python fib.py 40
102334155
python fib.py 40 81.56s user 0.38s system 98% cpu 1:23.05 total
And straight old c.
$ time ./fib 40
102334155
./fib 40 2.90s user 0.01s system 99% cpu 2.922 total
Posted by: Mitch Tishmack | January 02, 2008 at 03:29 AM
I disagree that Smalltalk does not do FP; Ruby does it "(marginally) better" because of the implied self, but otherwise they do it the same way---with blocks. (Likewise, I disagree that Ruby is worse than Smalltalk at OO, but in this comment I want to show off Smalltalk at OO).
Since you're basically using fibers as generators, you can simply use GNU Smalltalk's own generators (they should be portable to Squeak), which are Stream objects. The "pipe" method is just a stream
Stream extend [
| aBlock [
^Generator on: self do: aBlock
]
]
evens := [ :gen :x | x even ifTrue: [ gen yield: x ] ]
multiplesOfThree := [ :gen :x | x \\ 3 == 0 ifTrue: [ gen yield: x ] ]
data := (1 to: 20) readStream | evens | multiplesOfThree.
data next "returns 6"
data contents "returns (12 18)"
You need GNU Smalltalk 2.95 or later to run this.
My apologies for the poor formatting of this comment.
Posted by: Paolo Bonzini | January 03, 2008 at 02:07 AM
After the first occurrence of “MultiplesOf”, you’ve got [/PipelineElement] instead of [/code]. This messes up the rest of the page in Safari.
Posted by: Ahruman | January 04, 2008 at 09:59 AM
Ahruman: thanks. fixed
Paolo: I don't think Ruby or Smalltalk really do functional programming to any deep level. However, both can be used to implement particular FP constructs (such as generators).
Posted by: Dave Thomas | January 04, 2008 at 10:08 AM
Fiber can be used not only for piping, but also for spliting data streams.
They allow to transperently convert n-pass algorithm to obe-pass algorithm.
def squares
puts "squares before"
each_number do |n|
puts "#{n}**2 = #{n**2}"
end
puts "squares after"
end
def powers
puts "powers before"
each_number do |n|
puts "2**#{n} = #{2**n}"
end
puts "powers after"
end
# A really expensive generator
def each_number
puts "each number orig before"
[1,2,3,4,5].each do |n|
yield(n)
end
puts "each number orig after"
end
# Two pass algorithm
def do_it
powers
squares
end
---------------
# Generator is called twice
# But lets fiber it and make the algorithm one-pass.
alias :orig_each_number :each_number
def each_number
puts "each number before "
while true
begin
yield(@iterator.resume)
Fiber.yield
rescue Exception=>e
break
end
end
puts "each number after"
end
def do_it
@iterator = Fiber.new{ orig_each_number{|n| 2.times { Fiber.yield(n) } } }
@p = Fiber.new { powers }
@s = Fiber.new { squares }
5.times{ @p.resume; @s.resume; }
end
do_it
Posted by: Artem Voroztsov | January 13, 2009 at 09:31 AM
alias :orig_each_number :each_number
def each_number
puts "each number before "
while true
begin
yield(@iterator.resume)
Fiber.yield
rescue Exception=>e
break
end
end
puts "each number after"
end1Y0-A13 exam
Posted by: john robet | January 23, 2010 at 06:23 AM
Good article :) I read it and thought "I want to create it without Fibers". My result:
https://gist.github.com/1016493
It is based on your idea, but without Fiber.
Posted by: Nest_d | June 09, 2011 at 05:38 AM