PAWS Almost

I think in my last post I said this is going to be a very short series well I think I am wrong on that count.

When I last posted on the Kinesis 'SubscribeToShard' action I discovered that it is returning a 'application/vnd.amazon.eventstream' and that lead me down a very deep rabbit hole that got me well sidetracked.

Well to start out I had to figure out what AWS was returning when it was sending 'vnd.amason.eventstream' I eventually found that here Event Stream Encoding

Ok time to take the way-back machine to my first play-dates with computers, assembling GIS data from an Amdahl mainframe that was spooling a 9inch tape directly to my Unitron 2000

U2000_AII_TOP.jpg

over a 330 baud modem, Then taking the various bits, and putting them back together so I could draw pretty maps on this;

9928951466280032549.JPG

Though my one was the budget 880.

Anyway scratching my head a little I figured whatever the solution I come up with I am not going to treat the handling of this stream as an integrated as part of PAWs. It will have to be a separate CPAN mod the same as 'Net::Amazon::Signature::V4;'. A generic module and can be used anyone who may need it.

Now the only question is to call it 'AWS::EventStream::VND', 'Net::Amazon::EventStream::VND' or 'Net::AWS::EventStream::VND'?

At this point that doesn't matter I really just want to get it working I can sort that out later.

Well from past experience the first rule of working with any bit-stream is;

Always start with a data file never a stream.

So in my case I just dumped the content that was returning from 'SubscribeToShard' when it timed out after five mins and that looked like this;

^@^@^@r^@^@^@`«<82>^M<9e>^K:event-type^G^@^Pinitial-response^M:content-type^G^@^Zapplication/x-amz-json-1.1^M:message-
type^G^@^Eevent{}¬®k}^@^@^@ò^@^@^@ej^NI<83>^K:event-t
….

Not very easy on the eyes. AWS dose provide an nice pattern diagarm to look at;

frame-diagram-frame-overview.png

So lets tackle the Prelude and that is the fist 8 bytes and then 4 more for a CRC.


my $filename = 'shards';
open my $fh, '<:raw', $filename;
my $bytes_read = read $fh, my $prelude, 8;

Now to get the binary into something we humans can read;


my ($total_length, $header_length) =  unpack 'N*',$prelude;

print "total_length=$total_length,header_length=$header_length\n"

and that will give me;

total_length=114,header_length=96

Ahh good old perl no need for anything fancy just one extra param on a read to get 8 bytes out of a file and unpack built right in. It did take a little while to figure out what template to use, I had to reach way back in my brain to my 'C' days. I guess ost of that data in there is now lost as all I remember is it uses some sort of template. Sort of shamed to admit I had to look up which one to use.

The next four bytes are a CRC checksum, that is used to ensure that you have decoded the first two correctly. It is a 'CRC' digest of the first two, but how to check them. Well CPAN comes to the rescue with ' Digest::Crc32'.


use  Digest::Crc32;

$bytes_read = read $fh, my $prelude_checksum, 4;

my ($check_value) = unpack 'N', $prelude;

my ($checksum) = unpack 'N', $prelude_checksum;

my $crc = new Digest::Crc32();

if ($crc->strcrc32($prelude) != $checksum){

die "Prelude checkum fails!";

}

print "Prelude checkum Pass\n";

and when I run it I get

total_length=114,header_length=96
Prelude checkum Pass

So that is part one done. Really not much else to it. I did find this module very useful 'IO::Scalar'

The problem being you can't just read the full record of the stream and play with the bits. The structure forces you to jump around a bit (pardon the pun) in the stream and then find your way back to where you left off.

On my first iteration I think I had to make I think six position changes and resets. Thanks to IO::Scalar I manage to get that down to just one when I re-factored the spaghetti into a little module.

Eventually I got the decodeing working and what I was getting from the stream looked like this;


headers={
          ':message-type' => 'event',
          ':event-type' => 'initial-response',
          ':content-type' => 'application/x-amz-json-1.1'
        };
message={}

which was the first message and most of the rest looked like this

headers={
':message-type' => 'event',
':event-type' => 'SubscribeToShardEvent',
':content-type' =>'application/x-amz-json-1.1'
};
message={"ContinuationSequenceNumber":"49604106570538379893614088729479714815975373587922026498","MillisBehindLatest":0,"Records":[]}

Now that that is done time to see if I can get a real stream to read.

It did take me quite some time to actually get this to work in a fashion and I will give you the quick review. Luckily I have played with event streams and HTTP before with Mojo mostly but the odd time with LWP so I at least knew where to start. As well I found a few test cases in the /t that helped out as well.

So I first needed to get direct access to the 'User Agent' that was handling the call to AWS. So I have to make an instance of my 'FullTestMakerLWPCaller' mod like this;



my $caller = FullTestMakerLWPCaller->new();

Which if you recall is just a mucked up version of 'LWP' so I can easily get to the 'User Agent' and what I want to do is add in a 'handler' for the 'response_data' event; like this



$caller->ua->add_handler(
'response_data',
sub {
my ($response, $ua, $h, $data) = @_;
my $es = AWS::EventStream::VND->new();
use IO::Scalar;
my $content = $response->content;
my $ios = new IO::Scalar(\$content);
my $output = $es->decode($ios);
print $output.”\n”;
return 1;
},
);

my $aws = Paws->service(
'Kinesis',
region => 'us-east-1',
debug => 1,
caller => $caller,
);

my $Output = $aws->SubscribeToShard(
ConsumerARN => 'arn:aws:kinesis:us-east-1:32938372322:stream/TestSteam5Shard/consumer/TestKinesisApp:1581111187',
ShardId => 'shardId-000000000000',
StartingPosition => {
Type => 'LATEST',
}
);

In the above I create an instace of my 'AWS::EventStream::VND' decoder, the get the content from the response convert that to an IO::Scalar then pass that to my decode sub with returns the decoded content which I then print. The next two statments just set the first call im motion.

The really important thing in the above it so included that return 1; in the handler sub or else you will only ever decode the first parts of the stream content rather than handling everything that is coming down the pipe.

I ran the above and did get streaming content printing though not much use as the above is rather hacked up code.

Paws does have a way to handle the above and that is its Pagination system. But that is another post.


9451761.jpg

Leave a comment

About byterock

user-pic Long time Perl guy, a few CPAN mods allot of work on DBD::Oracle and a few YAPC presentations