Asynchronous Task Distribution with AnyEvent and ZeroMQ

Some months ago i wrote how to (ab)use your database as a messagequeue in order to distribute tasks among worker-processes.

"From your code, it also looks like having more than one demon will put you at risk of processing the same jobs more than once." (Jerome Eteve)

"Abuse" is the right word. The moment you hit more than three concurrent jobs, you will see a nasty slowdown ..." (rob.kinyon)

The comments made pretty clear that this was not a good idea and I promised to clean up. (Note: it worked pretty well but only as long as you didnt upscale the system)

Here we go!

I decided to go with ZeroMQ as a messagequeue and ZMQx::Class + AnyEvent.

Processing tasks takes some time in my project (5 seconds up to 20 minutes) and are of varying priority. So I cannot have some low priority task delay incoming high priority tasks.

What I need is a broker that collects the tasks and gives them out to worker processes. A very simple implementation uses ZeroMQs default distribution algorithm which is Round Robin. That's only good for tasks that roughly take the same time. If they differ alot you will end up with idling workers even if tasks are waiting in the queue.

My solution ... a broker that keeps a queue of outstanding tasks aswell as a queue of workers ready to do some work.

The broker binds to an address. Workers connect and send a "I'm ready" msg to the broker at startup and after they finished a task.


my $broker = ZMQx::Class->socket( 'ROUTER', bind => $address );

# a worker is ready, add to worker queue
my $w1 = $broker->anyevent_watcher(
    sub {
        while ( my $msg = $broker->receive ) {
            my $worker_id       = shift(@$msg);
            my $null            = shift(@$msg);
            my $worker_priority = shift(@$msg);
            say "worker $worker_id ($worker_priority) is ready.";
            unshift( @{ $workers->{$worker_priority} }, $worker_id );
        }
    }
);

# every 10 seconds check for new tasks
my $w = AnyEvent->timer(
    after    => 1,
    interval => 10, 
    cb       => sub {
        my $tasks = get_tasks();
        for my $task (@$tasks) {
            my $priority = $task->{priority};
            my $worker   = pop( @{ $workers->{$priority} } );
            if ( defined $worker ) { 
                say 'Send task. '
                    . 'Priority ' . $task->{priority} . '. '
                    . 'Sleep ' . $task->{sleep} . '. '
                    . "Worker $worker.";
                $broker->send(
                    [ $worker, '', $task->{sleep} ] );
            }   
        }   
    },  
);
AnyEvent->condvar->recv;

my $worker = ZMQx::Class->socket( 'REQ', connect => $address );

# event handler for incoming tasks
my $w = $worker->anyevent_watcher(
    sub {
        while ( my $msg = $worker->receive ) {
            say "got task: " . join( '/', @$msg );
            process_task($msg);
            send_ready();
        }
    }
);

send_ready();
AnyEvent->condvar->recv;

sub send_ready {
    my ($self) = @_;
    say "worker sends ready msg. (PID $$)";
    $worker->send( $priority );
}

The code snippets here are not functional for the sake of readability of this post, full working example can be found in the example section of the ZMQx::Class github repository as soon as my pull request has been merged.

4 Comments

So in this example you still keep your tasks in the DB, only your workers don't talk to the DB queue? Instead you have one instance of a 'task server', which talks to the DB, selects tasks and sends them to one-or-many workers?

No, on the contrary, I like it. Right now i have a similar setup, but the workers are themselves looking into the DB (locking the table while doing that) for jobs to execute.. I guess it won't scale very well..but your one-server-many-workers approach is very interesting and I'll probably implement it as soon as we hit performance issues.

Leave a comment

About davewood

user-pic I like Toast.