Supercharge Event Loops with Thread::Subs

There are two issues with event loop coding, related to the need to maintain an asynchronous, non-blocking style.

  • It's harder to write and maintain than linear, blocking code.
  • Despite all the asynchronous behaviour, it's still single threaded.

You can break out of the async/non-blocking mode by forking, of course, but it's not a lightweight operation and creates the risk of orphaned processes even if most of the IPC work is hidden by a good library.

Wouldn't it be nice if you could simply write subs in the plain old linear, blocking style and then call them asynchronously, letting them run in parallel to your main thread until they're ready, no forking required? After all, you're probably already using some kind of async result mechanism like callbacks, or promises, or AnyEvent condition variables, or Future objects to manage existing async behaviour. Wouldn't it be nice if you could just call a sub and deal with it using one of those mechanisms instead of the usual synchronous behaviour?

Enter Thread::Subs.

Thread::Subs grants you this very powerful ability to execute subs in parallel using Perl's not-much-loved "iThreads" mechanism, but with almost all of the sharp edges that make parallel programming hard carefully hidden behind a simple API. So long as your subs pass basic data types in and out (within the limits of threads::shared), you'll be able to execute the sub in parallel, the only visible difference being that it returns a "result" object like an AnyEvent condition variable, which is easily converted to a real AnyEvent condition variable, a Future, a Mojo::Promise, or a callback.

This functionality facilitates several frequently-encountered use cases.

  • Generic worker pools to execute CPU-intensive or otherwise slow operations
  • Resource-limited pools, such as DB workers, where concurrency must be capped
  • Operations like log-writing which require serial execution but can operate concurrently with other activities

Worker threads are spawned early in the process lifecycle and persist after that point, meaning that you don't have anywhere near as much per-call overhead as forking or spawning additional threads. It also means you don't have to manage workers once they start up: it's set and forget.

Thread::Subs uses Perl's attribute mechanism to mark specific subs as "Thread" and can then replace the sub with a shim that calls the sub asynchronously in a worker thread. As such, a generic blocking function like the following ...

sub munge {
    my ($foo, $bar) = @_;
    # perform heavyweight munge operation
    return $munged;
}

my $munged = munge('fred', 123);

... can be converted into an asynchronous, parallel operation like so.

use threads;
use Thread::Subs;

sub munge :Thread {
    my ($foo, $bar) = @_;
    # perform heavyweight munge operation
    return $munged;
}

Thread::Subs::startup(); # using default pool sizes
my $result = munge('fred', 123);
# do something else while munging
my $munged = $result->recv;

This is the kind of low-boilerplate code that's hard to pull off in anything but Perl. Java has had something very similar in the form of ExecutorService for a long time, but you have to construct the object yourself and feed it specific functions. Furthermore, Thread::Subs has three attributes, "qlim", "clim", and "pool", which allow fine-grain control over the parallelism not available in most other languages except on a roll-your-own basis. In particular, requests for such sub calls go into a FIFO queue prior to execution by workers, and you can limit that queue independently of the concurrency.

  • pool - worker pool name - subs can be tied to named pools or even given a private pool of workers; you can set the number of workers per pool
  • qlim - queue limit - the maximum number of requests for a sub which can be in the queue before requests start to block; default no limit
  • clim - concurrency limit - the maximum number of workers which can execute this sub simultaneously; default all workers in the pool

These parameters can be specified as part of the Thread attribute, as in sub foo :Thread(clim=1), or they can be set via a function, Thread::Subs::define(). The specific combination of a concurrency limit of one and a private pool of one worker, Thread(clim=1, pool=SUB), effectively offers the sub its own private Perl interpreter. It has free reign to maintain any kind of state it needs in this environment.

Thread::Subs is designed to make basic parallelism trivial to implement with no further dependencies (nothing outside core Perl as of v5.22), but it's also designed with event loops in mind. It has native support for AnyEvent, Mojolicious, and anything which works with Future objects. For example, if foo() is a sub with the Thread attribute, then foo()->future returns a Future object (so long as you use Future; first).

Give it a try and put some of those underutilised CPU cores to work.

Leave a comment

About TFBW

user-pic The Famous Brett Watson