Websockets in Catalyst
Recently I've heard quite a few people comment about how Catalyst can't do websockets. I'm writing this article to dispell that myth, and document how you can retro-fit websockets into an existing Catalyst application in a materially useful way, and without rewriting any of your existing code. I'll also show a case where it actually makes sense to do this rather than using a framework designed for that purpose.
Backstory
I studied all the different ways to do websockets in Perl for my presentation at TPC 2019. I think the talk turned out rather well, so I encourage people to watch it and look at the GitHub repo for example code, but the TL;DR of it was that websockets in PSGI are a bit of an under-specified hack, and you probably need to rewrite a bunch of your Plack code to accommodate non-blocking needs, and Mojolicious seemed like the obvious answer for implementing websockets. I concluded that anyone who needs to add event-driven features to their existing Plack-based web app should just write a new Mojo app to handle only the websockets, and then use reverse proxies to run the Mojo app under the same hostname as the Plack app. (I should also mention that since that talk, there is now PAGI which addresses the limitations of PSGI and offers an alternative to Mojolicious)
Since then, I have used Mojolicious a few times for event-driven hobby projects, but since I was writing it from scratch I just wrote the whole thing in Mojo. I had not had a request from any of my customers that required a websocket, so I had not actually gotten to test out my advice about making a hybrid app.
Interactive Feature Request
Last year, the opportunity finally came along. One of our customers has a web store which is written in Catalyst, and they wanted to add some features that would let sales representatives interact with customers while they were on the phone with the customer who was actively building a cart. They wanted to be able to quickly identify which cart belonged to the customer on the phone (who might be using an anonymous cart, not logged into an account) and interact with the customer by helping them edit their cart and possibly apply discounts to the cart and maybe send them links to pages on the site. While this could be implemented with polling, any lag between the phone conversation and what they saw in their browser was a potential for confusion, so the polling would need to be rather frequent to give the desired user experience. While we probably have enough capacity to handle some fast polling by a few sales reps, it would just be a messy way to implement it and possibly cause future problems if any of the queries got expensive. Implementing it in an event-driven manner was the clear winner. It was finally time to add websockets!
I started by following my advice from the TPC talk, but quickly ran into a snag. We have two systems of session management, one for public users and one for the admin/sales users. Due to swarms of bots hitting the site, the public user sessions are stored in Redis, while the admin-side sessions are using database tables. All of this has been nicely abstracted behind Catalyst plugins, and we have nice APIs to query users and their permissions. The first thing that a Mojolicious controller would need to do for an incoming connection is authenticate the client. I realized I was going to have to dig down into all the details of my Catalyst sessions and re-implement a bunch of that logic, and that seemed like a lot of effort. It would also mean that any future changes to session management would require updates to both the Catalyst and Mojolicious apps.
I knew I would still need a two-app aproach, because there was lots and lots of blocking code in the workers of the Catalyst app, and you can't have blocking code (of more than a few milliseconds) in an event-driven app. But, what if I just ran the same app twice, with all the websocket actions diverted to the Catalyst app running under Twiggy, and the rest sent to the existing app running under Gazelle? I would also need to "professionalize" the hack that lets Catalyst handle websockets so that it's clear what is going on, unlikely to break, and my co-workers don't hate me in the future.
Controller Design
Here's what I came up with. (anonymized and simplified a bit) (also the syntax highlighter doesn't recognize POD, so I had to prefix all the POD with '#'. pretend that isn't there)
First, I created a new Controller to hold all the new event-driven code.
package MyApp::Controller::Event;
# =head1 DESCRIPTION
#
# This controller handles all event-driven (websocket)
# behavior of the admin interface.
#
# Actions in this controller can only be served via the
# Twiggy event-driven webserver which runs from the
# myapp-twiggy docker container. Accessing this controller
# from the normal Gazelle server gives an error message.
#
# The Twiggy server is mounted via Traefik PathPrefix rule
# at /(redacted), but the request paths are not rewritten
# so Catalyst doesn't need to add any special prefixes when
# it generates links.
#
# This controller uses the "instance per request" design,
# so Moose attributes only apply to the current user, and
# continue to only apply to the current user even after the
# event-driven callbacks have started.
#
# =cut
use Moose;
use Scalar::Util 'refaddr';
use JSON::MaybeXS;
use namespace::clean;
use v5.36;
BEGIN { extends 'MyApp::Controller'; }
# One instance per request
sub ACCEPT_CONTEXT {
my ($self, $c)= @_;
return $self unless ref $c;
$c->stash->{(__PACKAGE__)} //= do {
$self= bless { %$self }, ref $self;
$self->context($c);
$self;
};
}
I should note here that this borrows the workings of Catalyst::Component::InstancePerContext, but since that module only saves four lines of code, I just paste it into each controller so that it's clear to everyone what exactly is going on to provide InstancePerContext behavior, and have one fewer CPAN dependency.
Next, I chose a design where the Catalyst context object and controller object are long-lived, with references held globally and cleared by the disconnect event of the websocket.
# =attribute context
#
# A weak reference to the Catalyst context ($c).
#
# =attribute fh
#
# The file handle of the websocket.
#
# =attribute websocket
#
# The AnyEvent::Websocket::Connection, if one has been
# created.
#
# =attribute io_session_name
#
# A convenient name to identify the websocket session in
# logs. Currently "$username-$n" where $n counts upward
# on a per-user basis.
#
# =cut
# This holds the top-level strong references to websocket
# session instances. It is keyed by refaddr($self) and
# holds values of [ $self, $c ].
# The Controller::Event instance ($self) holds references
# to the websocket, Postgres listeners, and a weak-ref
# back to the Catalyst context ($c). The context holds a
# strong reference to the stash, which has a strong
# reference to $self.
our %active_contexts;
has context => ( is => 'rw', weak_ref => 1 );
has fh => ( is => 'rw' );
has websocket => ( is => 'rw' );
has io_session_name => ( is => 'ro', lazy_build => 1,
predicate => 'has_io_session_name' );
sub _build_io_session_name($self) {
state %next_n_for_user;
my $uname= $self->context->user->username;
return $uname . '-' . ++$next_n_for_user{$uname};
}
# =method io_connect
#
# Called by AnyEvent::Websocket::Server when the websocket
# handshake is complete. It receives a $promise that is
# either a websocket object or an exception.
#
# =method io_disconnect
#
# This is called every time we receive a disconnect event
# from a websocket client.
#
# =cut
sub io_connect($self, $promise) {
unless(eval { $self->websocket($promise->recv); 1 }) {
warn "Rejected connection '$sess_name': $@\n";
close($self->fh);
delete $active_contexts{refaddr $self};
return;
}
Scalar::Util::weaken($self);
$self->websocket->on(each_message => sub($conn, @args) {
eval { $self->io_message(@args); 1 }
or warn "Exception for $sess_name: $@";
});
$self->websocket->on(finish => sub {
eval { $self->io_disconnect; 1 }
or warn "Exception for $sess_name: $@";
});
}
sub io_disconnect($self) {
delete $active_contexts{refaddr $self};
}
Event Plumbing
This is perhaps a topic for another article, but I have extensions on the DBIC Postgres connection of my app that enable some event-driven features. I should get that packaged for CPAN some day...
# =attribute cart_listener
#
# This is an instance of
# L<DBIx::Class::Storage::DBI::PgWithEventListeners::Listener>
# which delivers Postgres events named 'cart_activity' to method
# L</on_cart_activity>. The object is lazy-built. Note that
# DBIx::Class::Storage::DBI::PgWithEventListeners keeps track
# of whether objects exist for a Pg channel, so LISTEN
# happens when the first listener is created, and
# UNLISTEN happens after the last listener is garbage
# collected.
#
# =cut
has cart_listener => ( is => 'rw', lazy_build => 1,
predicate => 'has_cart_listener',
clearer => 'clear_cart_listener'
);
sub _build_cart_listener($self) {
my $db= $self->context->model('DB');
# Ensure we are dispatching events via event loop.
# This only works on Twiggy. The Gazelle-served
# instance of the app doesn't call this.
$db->storage->dispatch_via_anyevent;
# Each instance of Controller::Event has its own listener.
# As long as one of these objects exists, postgres will
# be listening to "cart_activity" events.
return $db->storage->new_listener(
'cart_activity', $self, 'on_cart_activity'
);
}
Then some methods that send and receive the events. The events I'm generating from Postgres are fairly benign (just indicating which records have changed), so they can just be forwarded directly out to the websocket clients. The JavaScript client then uses the information about what has changed to decide which normal AJAX requests to execute to refresh the screen. Those AJAX requests go to the normal Gazelle-based web app instance. I'm using this Event controller only for the delivery of change notifications.
# =method send_event
#
# $self->send_event($data);
#
# Serialize $data into JSON and send to the client over the
# websocket.
#
# =method on_cart_activity
#
# This is called by the database listener every time relevant
# cart activity has occurred. It relays the event to the
# websocket client.
#
# =cut
#
# sub send_event($self, $data) {
# $self->websocket->send(JSON::MaybeXS->new->encode($data));
# }
#
# sub on_cart_activity($self, $channel, $pg_pid, $payload) {
# $self->send_event([ cart_activity => $payload ]);
# }
#
# =method io_message
#
# This is called every time we receive a packet from the
# webscket client. Right now the client just requets to
# listen to a feed of events like 'cart_activity'.
# Actions the client takes in response to these events are
# sent as normal HTTP requests to other controllers.
#
# =cut
sub io_message($self, $msg, @) {
if ($msg->is_text) {
my $data= JSON->new->decode($msg->decoded_body);
if ($data->{listen} eq 'cart_activity') {
$self->cart_listener; # lazy-build
}
}
}
Actions
And finally, the Websocket-handling action:
# =action /(redacted)/io
#
# This is the endpoint for making websocket connections.
# The browser must send the header 'Upgrade: websocket'
# and the user must be logged in and be permitted to use
# event features. Websocket events are then dispatched
# to the L</io_message> and L</io_disconnect> methods.
#
# =cut
sub io : Local Args(0) ($self, $c) {
my $h= $c->req->headers;
($h->header('Upgrade')//'') eq 'websocket'
or $c->detach(HTTP => 400, ['Expected websocket']);
$c->user && $c->check_user_roles('event_listener')
or $c->detach(HTTP => 403, ['Can't monitor events']);
# trigger building of io_session_name
my $sess_name= $self->io_session_name;
# lazy-load, so that the normal Gazelle app instance
# doesn't need to load AnyEvent
require AnyEvent::WebSocket::Server;
my $env= $c->req->env;
# Ensure that neither Catalyst nor Twiggy can make
# further writes to the handle (or close it)
# by dup()-ing it to a new FD number and then closing
# the original.
open(my $fh, '>&', $env->{'psgix.io'})
or die "dup psgix.io: $!";
close($env->{'psgix.io'});
$self->fh($fh);
# save a ref to ourselves to prevent garbage collection.
# note that ->context is a weak-ref, so need to hold a
# ref to that too.
$active_contexts{refaddr $self}= [ $self, $c ];
AnyEvent::WebSocket::Server->new
->establish_psgi({ %$env, 'psgix.io' => $fh })
->cb(sub($promise) { $self->io_connect($promise) });
# for Catalyst logging only; the 101 response is not
# actually sent by this code, because the handle that
# Catalyst is holding has been closed.
$c->res->code(101);
$c->res->body('');
$c->detach();
}
And there's the hack: I close the TCP handle which Twiggy exposed in 'psgi.io' so that neither Twiggy nor Catalyst can break the websocket connection. Currently this works without even showing an error. Even if future versions of Twiggy or Catalyst start throwing an exception due to attempting to write a closed file handle, at best the error will get caught and logged and at worst it will kill a call stack that didn't have any useful work left to do anyway. Meanwhile AnyEvent will still have the undisturbed websocket. So, it seems like a fairly safe hack.
My earlier attempts at circumventing Catalyst involved reaching into internals to grab the "writer" (the object that writes the final response into the callback that Twiggy provided via PSGI) and holding onto it until after the websocket was closed. Those attempts were touchng too many private attributes and seemed more fragile. Also if you hold onto the request indefinitely it interferes with the Catalyst logging and makes it appear the request never got a response.
Reverse Proxy
As I mentioned earlier, I have one container that is running the app under Gazelle (a pre-forking worker pool where each worker handles one request at a time) and another that runs the app under Twiggy (where one process is juggling multiple event-driven requests interleaved with eachother). The only differences between these containers are the command and the Traefik labels.
Docker myapp-gazelle command:
["plackup","-s","Gazelle","-p","3000","--max-reqs-per-child","10000","myapp.psgi"]
Docker myapp-twiggy command:
["plackup","-s","Twiggy","-p","3000","myapp.psgi"]
I'm a fan of the Traefik reverse proxy, mostly because of how nicely it
integrates with Docker and LetsEncrypt. These are the relevant labels from the
myapp-twiggy docker container:
- "traefik.http.services.myapp-twiggy.loadbalancer.server.port=3000"
- "traefik.http.services.myapp-twiggy.loadbalancer.server.scheme=http"
- "traefik.http.routers.myapp-twiggy.entryPoints=https"
- "traefik.http.routers.myapp-twiggy.priority=15"
- "traefik.http.routers.myapp-twiggy.rule=(Host(
redacted) && PathPrefix(/redacted) )" - "traefik.http.routers.myapp-twiggy.service=myapp-twiggy"
I have omitted some rules for middlewares and TLS. The main points are that the
priority=15 gives this router a higher priority than the router of
myapp-gazelle, and Host and PathPrefix rules match only the paths served
by my Event controller, leaving all the other requests to fall back to
myapp-gazelle.
Catalyst Websocket Tradeoffs
I retrospect, using Catalyst for websockets actually worked out even better than I anticipated.
- I was able to re-use the authentication and sessions, as intended.
- I was able to re-use the application's DBIC configuration instead of needing to implement the equivalent with Mojo::Pg. (passwords, on-connect settings, logging, trace/debug, etc)
- Homogenous logging of HTTP request/response/errors
- No additional reverse proxy configuration (getting Mojolicious to trust the same reverse-proxy headers that Plack is trusting)
- Docker container configuration is nearly identical
- Avoid introducing a completely different framework into the app, which helps with maintenance.
The only downside is that the session setup code has a brief blocking behavior as it queries the database, during which Twiggy cannot also be delivering websocket events. This could theoretically make a denial-of-service attack easier, but just barely. Any attack distributed enough to dodge the connection-throttling middleware would be a problem regardless of some milliseconds lost to blocking database queries. I could always add a worker pool of Twiggy instances if I needed to.
Extras
It's important to ensure that the references to the controller and Catalyst context go out of scope when websockets disconnect. While initially writing the code above, I used the following "destructor logger" to log every time an object I cared about got destroyed. Just create an instance and then assign it to a random hash element of the object of interest.
package DestructorLogger {
use v5.36;
use Log::Any '$log';
sub new($class, $msg) {
bless \$msg, $class;
}
sub DESTROY($self) {
$log->info("Destroyed: $$self");
}
}
...
$c->{destructor_logger}= DestructorLogger->new('context');
$self->{destructor_logger}= DestructorLogger->new('controller');
I should also mention that I removed a lot of the logging from the code in this article, since most of it was rather app-specific, and cluttered the view a bit.
I also have a controller action that serves a static page that can test the websocket server and see the events it is sending:
# =head2 GET /(redacted)
#
# This is a simple status page to verify that this controller
# is running through the correct webserver and delivering the
# expected events.
#
# =cut
sub index : Path Args(0) ($self, $c) {
$c->detach(HTTP => 200, [ <<~HTML ]);
<!DOCTYPE html>
<html>
<head>
<title>Event Server</title>
<script src="/(redacted)/jquery-3.4.0.js"></script>
<script>
window.liveupdate= {
init: function(ws_uri) {
var self= this;
this.ws_uri= ws_uri;
\$('.chatline').on('keypress', function(event) {
self.onkeypress(event.originalEvent)
});
// Connect WebSocket and initialize events
console.log('connecting WebSocket '+this.ws_uri);
this.ws= new WebSocket(this.ws_uri);
this.ws.onopen= function(event) {
console.log("onopen");
};
this.ws.onmessage= function(event) {
self.onmessage(event)
};
this.ws.onclose= function(event) {
console.log("onclose");
};
},
onmessage: function(event) {
\$('body').append(
document.createTextNode(event.data+"\n"))
},
onkeypress: function(event) {
if (event.key == 'Enter')
this.onsend();
},
onsend: function(event) {
var text= \$('.chatline').val();
if (text) {
this.ws.send(text);
\$('.chatline').val('');
}
}
};
\$(document).ready(function() {
var loc= '' + window.location;
window.liveupdate.init(
loc.replace(/^http/, 'ws')
.replace(/\/?\$/, '/io'))
});
</script>
</head>
<body>
Serving @{[ scalar keys %active_contexts ]} connections
<br><input class="chatline" type="text">
</body>
</html>
HTML
}
I like code, and code that writes code, and code that writes code that writes code. So I especially like Perl.
Nicely done! Informative and well put together.