PAWS Almost
I think in my last post I said this is going to be a very short series well I think I am wrong on that count.
When I last posted on the Kinesis 'SubscribeToShard' action I discovered that it is returning a 'application/vnd.amazon.eventstream' and that lead me down a very deep rabbit hole that got me well sidetracked.
Well to start out I had to figure out what AWS was returning when it was sending 'vnd.amason.eventstream' I eventually found that here Event Stream Encoding
Ok time to take the way-back machine to my first play-dates with computers, assembling GIS data from an Amdahl mainframe that was spooling a 9inch tape directly to my Unitron 2000
over a 330 baud modem, Then taking the various bits, and putting them back together so I could draw pretty maps on this;
Though my one was the budget 880.
Anyway scratching my head a little I figured whatever the solution I come up with I am not going to treat the handling of this stream as an integrated as part of PAWs. It will have to be a separate CPAN mod the same as 'Net::Amazon::Signature::V4;'. A generic module and can be used anyone who may need it.
Now the only question is to call it 'AWS::EventStream::VND', 'Net::Amazon::EventStream::VND' or 'Net::AWS::EventStream::VND'?
At this point that doesn't matter I really just want to get it working I can sort that out later.
Well from past experience the first rule of working with any bit-stream is;
Always start with a data file never a stream.
So in my case I just dumped the content that was returning from 'SubscribeToShard' when it timed out after five mins and that looked like this;
^@^@^@r^@^@^@`«<82>^M<9e>^K:event-type^G^@^Pinitial-response^M:content-type^G^@^Zapplication/x-amz-json-1.1^M:message- type^G^@^Eevent{}¬®k}^@^@^@ò^@^@^@ej^NI<83>^K:event-t ….
Not very easy on the eyes. AWS dose provide an nice pattern diagarm to look at;
So lets tackle the Prelude and that is the fist 8 bytes and then 4 more for a CRC.
my $filename = 'shards';
open my $fh, '<:raw', $filename;
my $bytes_read = read $fh, my $prelude, 8;
Now to get the binary into something we humans can read;
my ($total_length, $header_length) = unpack 'N*',$prelude;
print "total_length=$total_length,header_length=$header_length\n"
and that will give me;
total_length=114,header_length=96
Ahh good old perl no need for anything fancy just one extra param on a read to get 8 bytes out of a file and unpack built right in. It did take a little while to figure out what template to use, I had to reach way back in my brain to my 'C' days. I guess ost of that data in there is now lost as all I remember is it uses some sort of template. Sort of shamed to admit I had to look up which one to use.
The next four bytes are a CRC checksum, that is used to ensure that you have decoded the first two correctly. It is a 'CRC' digest of the first two, but how to check them. Well CPAN comes to the rescue with ' Digest::Crc32'.
use Digest::Crc32;
$bytes_read = read $fh, my $prelude_checksum, 4;
my ($check_value) = unpack 'N', $prelude;
my ($checksum) = unpack 'N', $prelude_checksum;
my $crc = new Digest::Crc32();
if ($crc->strcrc32($prelude) != $checksum){
die "Prelude checkum fails!";
}
print "Prelude checkum Pass\n";
and when I run it I get
total_length=114,header_length=96
Prelude checkum Pass
So that is part one done. Really not much else to it. I did find this module very useful 'IO::Scalar'
The problem being you can't just read the full record of the stream and play with the bits. The structure forces you to jump around a bit (pardon the pun) in the stream and then find your way back to where you left off.
On my first iteration I think I had to make I think six position changes and resets. Thanks to IO::Scalar I manage to get that down to just one when I re-factored the spaghetti into a little module.
Eventually I got the decodeing working and what I was getting from the stream looked like this;
headers={
':message-type' => 'event',
':event-type' => 'initial-response',
':content-type' => 'application/x-amz-json-1.1'
};
message={}
which was the first message and most of the rest looked like this
headers={
':message-type' => 'event',
':event-type' => 'SubscribeToShardEvent',
':content-type' =>'application/x-amz-json-1.1'
};
message={"ContinuationSequenceNumber":"49604106570538379893614088729479714815975373587922026498","MillisBehindLatest":0,"Records":[]}
Now that that is done time to see if I can get a real stream to read.
It did take me quite some time to actually get this to work in a fashion and I will give you the quick review. Luckily I have played with event streams and HTTP before with Mojo mostly but the odd time with LWP so I at least knew where to start. As well I found a few test cases in the /t that helped out as well.
So I first needed to get direct access to the 'User Agent' that was handling the call to AWS. So I have to make an instance of my 'FullTestMakerLWPCaller' mod like this;
my $caller = FullTestMakerLWPCaller->new();
Which if you recall is just a mucked up version of 'LWP' so I can easily get to the 'User Agent' and what I want to do is add in a 'handler' for the 'response_data' event; like this
$caller->ua->add_handler(
'response_data',
sub {
my ($response, $ua, $h, $data) = @_;
my $es = AWS::EventStream::VND->new();
use IO::Scalar;
my $content = $response->content;
my $ios = new IO::Scalar(\$content);
my $output = $es->decode($ios);
print $output.”\n”;
return 1;
},
);
my $aws = Paws->service(
'Kinesis',
region => 'us-east-1',
debug => 1,
caller => $caller,
);
my $Output = $aws->SubscribeToShard(
ConsumerARN => 'arn:aws:kinesis:us-east-1:32938372322:stream/TestSteam5Shard/consumer/TestKinesisApp:1581111187',
ShardId => 'shardId-000000000000',
StartingPosition => {
Type => 'LATEST',
}
);
In the above I create an instace of my 'AWS::EventStream::VND' decoder, the get the content from the response convert that to an IO::Scalar then pass that to my decode sub with returns the decoded content which I then print. The next two statments just set the first call im motion.
The really important thing in the above it so included that return 1; in the handler sub or else you will only ever decode the first parts of the stream content rather than handling everything that is coming down the pipe.
I ran the above and did get streaming content printing though not much use as the above is rather hacked up code.
Paws does have a way to handle the above and that is its Pagination system. But that is another post.
Leave a comment