Concurrency Weirdness

I'm still trying to get my head around how the concurrency stuff works. I had an idea for a simple script: sometimes I run a program that dumps a lot of output, so I'd like a script that would take that as input and just hold onto the most recent X lines of it, so that they could be read at any point through a named pipe.

So I figured I'd need one thread to read lines from STDIN ($*IN in Perl 6) and keep an array loaded with a certain number of the most recent ones, and another thread to open the fifo for writing and dump the current contents of the array to it whenever someone reads it. In Perl 5, I'd probably do it by forking off a child process for half of the work, or maybe use a single loop that does both things but has quite a few no-ops. I figure in Perl 6 I can do it with threads.

So I wrote the code below. As I understand it, Promise.start begins a separate thread running the code it contains, so that opens a write to the file. Because of the way named pipes work, that open completes when someone reads it, then I write the lines in @l to it and close it. Then I sleep a second, to keep it from seeing single reads of the file more than once, then open a write to prepare for the next reader.

So that's running in a separate thread, then the main code goes on to a loop that reads lines from standard input and loads the array, chopping lines from the beginning of the array when it gets longer than the max. It all works, except....it gets stuck. It gets to about line 35-40 or so from $*IN, and then stops. Then if I read from the fifo, the for loop runs through another 130 or so lines, then stops again.

By adding some debugging lines, I gradually figured out what's happening, if not why: as soon as the start/loop gets to the open function and stops there, the for loop stops as well -- but it always stops at the line that truncates the array. Then once I read a batch of lines from the fifo, the for loop continues and reads more lines until the start/loop gets back around to the open command, and then it stops.

I don't know why one thread waiting on a file subprocess would make another thread stop, and I especially don't know why the main thread would always stop in that same place. Nothing from that array truncation line uses or has anything to do with the open line that seems to stop it.

If anyone has any ideas what I'm doing wrong or missing here, please leave a comment. In the meantime, if I figure out what's wrong, I'll be sure to update here. And if there's just plain a better way to do this with Perl 6's concurrency tools, I'd love to hear that too.

## Command line arguments:
#    $lines - number of most recent lines to keep saved
#    $pipefile - named pipe to write them to when it is read
##
sub MAIN( $lines, $pipefile ){
    my @l;
    my $p = run << mkfifo $pipefile >>;
    die 'Unable to create fifo' if $p.exitcode;
    start {
        loop {
            my $out = open $pipefile, :w or die $!; # culprit
            say 'got a reader!';     # debug
            $out.say: @l.join("\n");
            $out.close;
            sleep 1;      # prevent double-reads
        }
    }
    for $*IN.lines {
        @l.push: $_;
        # stops here when 'culprit' line is reached
        @l = @l[@l-$lines .. *] if @l > $lines;
        $_.say;  # debug
    }
}

6 Comments

I first tried to minimize the functionality as I understand it:

sub MAIN($lines) {
    .say for $*IN.lines.tail($lines);
}

This works fine.

Then I tried mixing in the named pipe.

sub MAIN( $lines, $pipefile ){
    my $p = run << mkfifo $pipefile >>;
    die 'Unable to create fifo' if $p.exitcode;
    say "made fifo";
    my $out = open $pipefile, :w or die $!; # culprit
    say "opened pipe";
    $out.say for $*IN.lines.tail($lines);
    say "done saying";
    $out.close;
}

This seems to hang on the open $pipefile. I'm not sure what's going on here, but it looks like the problem is not really in the concurrency? Or maybe I'm not awake enough yet :-)

The problem may be that changing the size of the array isn't thread safe, and you need some sort of lock.

See this: http://doc.perl6.org/language/concurrency#Safety_Concerns

Ah, yes, arrays are not thread safe if you have on thread adding and another thread removing. Please use Channels for that (the P6 equivalent to P5's Thread::Queue).

Locks should probably never be used in any user code.

Liz: that is expected, quote from mkfifo manpage:

"Opening a FIFO for reading normally blocks until some other process opens the same FIFO for writing, and vice versa."

Leave a comment

About Aaron Baugher

user-pic I'm a programmer and Unix sysadmin who uses Perl as much as possible, operating from the Midwest USA. To hire me for sysadmin or programming work, contact me at aaron.baugher @ gmail.com or as 'abaugher' on #perl6.