Flux: new streaming data processing framework

Flux is the framework I've been meaning to release for a very long time [1].

What's it good for? Message queues; organizing your data processing scripts in a scalable way; de-coupling your processing pipeline elements, making them reusable and testable; seeing your system as a collection of lego-like blocks which can be combined and replaced as you like. With Flux, your code is a series of tubes.

Flux is a rewrite of Stream framework which we wrote and used in Yandex for many years. Stream:: namespace on CPAN is taken, though, which gave me the reason to do a cleanup before uploading it, as well as a chance to rewrite everything with Moo/Moose.

I'm planning to release Flux in small chunks, explaining them along the way in separate blog posts, as time will allow. Today, I'll explain the main ideas behind it, some core classes, and how all its parts are working together.

Flux has input streams and output streams; storages, which are kind of like tanks storing the data as it flows through the system; pumpers to connect storages; and mappers which modify the data.

Let's start with input streams.

Flux::In is the role for all Flux input streams (it's a Moo role, as all other core Flux interfaces). Here's how input streams work:

my $item = $in->read;
my $other_item = $in->read;
$in->commit; # save the reading position

my $arrayref_with_10_items = $in->read_chunk(10);

There are two important things to note about this simple example:

  1. Input streams can be committed. Commit saves the current reading position, so that you can come back later and continue reading it from where you left.
  2. read_chunk is a part of core interface. Why? Because real stream objects often include several layers of delegations, and method invocations in Perl are relatively expensive. I'll come back to this point later.

How you can construct an input stream? Most objects in Flux are polymorphic; there are many different implementations. Here's the most basic implementation:

use Flux::Simple qw(array_in);

my $in = array_in([ "foo", "bar" ]);
say $in->read; # foo

array_in doesn't support commit(), i.e. committing it does nothing.
But there's still commit() method present; some other features of Flux streams are optional, and high-level code which deals with input and output stream objects have to check for those features, but commit and read_chunk are fundamental and omni-present.

array_in is useful for unit testing - since all input streams has the same basic interface, you can replace your real input stream (which would read data from file or DB or network) with it.

Now let's take a look at output streams, described by Flux::Out role/interface.

It's mirroring Flux::In interface:

$out->write("foo");
$out->write("bar");
$out->commit;

$out->write_chunk(["xxx", "yyy", "zzz"]);

When you commit an output stream, you force the data down the pipe. Depending on the implementation, it would mean flushing memory buffers into HTTP POST request, writing or fsyncing data on disk, or committing SQL transaction.

The simplest Flux::Out implementation is this:

use Flux::Simple qw(array_out);
my @data;
my $out = Flux::Out->new(\@data);
$out->write("foo");
$out->write("bar");
$out->commit;
say for @data; # foo; bar

But all this doesn't sound very useful so far, does it?

Here's a more serious bit of code:

use Flux::Log;

my $storage = Flux::Log->new("/opt/events.log");
$storage->write(q[{"type": "email", "title": "Hello"}\n]);
$storage->write(q[{"type": "email", "title": "Goodbye"}\n]);
$storage->commit;

# in another script:
my $in = $storage->in("sendmail");
while (my $item = $in->read) {
    do_sendmail($item);
}
$in->commit;

This code introduces Storages. Storages implement Flux::Storage interface; storages are Outs which can generate input streams. Flux::Log is a storage which writes to the log, and you can later read that log from another process, processing your data asynchronously.

In other words, it's a file-based message queue.

"sendmail" string in the code above is called client name, and $in object can be referred to as a client, because it's a client reading our storage. You can read one storage with multiple clients at the same time; each of these will get its own copy of data.

Flux::Log is not a trivial piece of code, by the way. It handles safe writing to the log file (which is *not* easy [2]), and its clients implement transparent reading, so you don't have to worry about logrotate rotating these logs (this part is covered by using Log::Unrotate).

Finally, I want to introduce the concept of Mappers. Here's an example:

use Flux::Simple qw(mapper);
use JSON;

my $raw_in = $storage->in("sendmail");
my $in = $raw_in | mapper { decode_json(shift) };

my $item = $in->read; # decoded hashref

Mappers rewrite your data. You can chain them into pipelines using shell-like "|" syntax sugar.
Mappers can be attached both to input streams (on the right side: $in | $mapper) and to output streams (on the left side: $mapper | $out).

Mappers don't have to map data in one-to-one fashion. You can use them for filtering:
$in | mapper { my $item = shift; $item->{type} eq 'email' ? $item : () }
Or for turning one item into several items:
my $double = mapper { my $item = shift; ($item) x 2 }

If you're wondering how Mappers are different from functions, remember when I said that read_chunk (and write_chunk) are parts of core interfaces? Low-level Mapper interface is similar to Flux::Out interface, with write_chunk and commit.
For example, you may write a mapper which gets items {id => 123} and fills them with data referenced by id from SQL database. In this case, it might make sense to store multiple items in mapper's inner memory buffer, and then do one bulk SELECT before returning them from mapper's other end.

So, those were the basic building blocks for programs using Flux. Inputs and outputs define your data sources; storages generally turn outputs of one program into inputs for another; and mappers let you abstract processing code into reusable objects.
Next time, I'll explain pumpers, which are scripts which actually *do* stuff with data, and formats, which are two-way lens encoding and decoding data.

You can like this post on Questhub (formerly Play Perl), which, coincidentally, uses Flux for sending notification emails.

Footnotes:
[1] https://blogs.perl.org/users/vyacheslav_matjukhin/2010/12/roadmaps.html - really, a very long time...
[2] See Suprisingly hard task of writing logs. Flux::Log solves this issue by locking the file while writing and doing look-behinds whenever necessary to check that file's last line ends with "\n".

3 Comments

This looks like something I had planned for Beam, which is a rewrite of a bunch of modules I use at Bank of America. Excellent work! I might have to see how well these can be described in Beam::Wire.

This seems to be similar to nodejs's streams. I'm looking forward to playing with this.

Leave a comment

About Vyacheslav Matyukhin

user-pic I wrote Ubic. I worked at Yandex for many years, and now i'm building my own startup questhub.io (formerly PlayPerl). I'm also working on Flux, streaming data processing framework. CPAN ID: MMCLERIC.