Skip to content

Instantly share code, notes, and snippets.

@ggl
Created February 18, 2022 09:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ggl/5e024e58a2385e51d09593140a08f968 to your computer and use it in GitHub Desktop.
Save ggl/5e024e58a2385e51d09593140a08f968 to your computer and use it in GitHub Desktop.
#!/usr/bin/env perl
# ABSTRACT: A simple message broker using Mojolicious WebSockets
# USAGE: ./pubsub.pl daemon
#
# This is my remix of Doug Bell's event broker. It publishes numbers
# on a topic, subscribes to that topic and appends the numbers to the
# document body. The original source can be found here:
#
# https://gist.github.com/preaction/2078d33d87b126621e45
#
# Copyright 2015 Doug Bell (<preaction@cpan.org>)
#
# This is free software; you can redistribute it and/or modify it under
# the same terms as the Perl 5 programming language system itself.
use Mojolicious::Lite;
use Data::Dumper;
use Scalar::Util qw(refaddr);
my %topics;
helper add_topic_subscriber => sub {
my ($self, $topic) = @_;
$topics{$topic}{refaddr $self} = $self;
return;
};
helper remove_topic_subscriber => sub {
my ($self, $topic) = @_;
delete $topics{$topic}{refaddr $self};
return;
};
helper publish_topic_message => sub {
my ($self, $topic, $msg) = @_;
$_->send($msg) for values %{ $topics{$topic} };
return;
};
# Template with browser-side code
get '/' => 'index';
# WebSocket echo service
websocket '/echo' => sub {
my $c = shift;
# Opened
$c->app->log->debug('WebSocket opened');
# Increase inactivity timeout for connection a bit
$c->inactivity_timeout(120);
# Incoming message
$c->on(message => sub {
my ($c, $msg) = @_;
$c->send("echo: $msg");
});
# Closed
$c->on(finish => sub {
my ($c, $code, $reason) = @_;
$c->app->log->debug("WebSocket closed with status $code");
});
};
websocket '/pub/*topic' => sub {
my $c = shift;
$c->inactivity_timeout(120);
my $topic = $c->stash('topic');
$c->app->log->debug($c->tx->remote_address.' publish to '.$topic);
$c->on(message => sub {
my ($c, $msg) = @_;
$c->publish_topic_message($topic, $msg);
} );
};
websocket '/sub/*topic' => sub {
my $c = shift;
$c->inactivity_timeout(120);
my $topic = $c->stash('topic');
$c->add_topic_subscriber($topic);
$c->app->log->debug($c->tx->remote_address.' subscribed to '.$topic);
$c->on(finish => sub {
my $c = shift;
$c->remove_topic_subscriber($topic);
} );
};
app->secrets([join('', map(("a".."z","A".."Z","0".."9")[rand 62], 0..20))]);
app->start;
__DATA__
@@ index.html.ep
<!DOCTYPE html>
<html>
<head><title>Echo</title></head>
<body>
<script>
var count = 1;
var pub = new WebSocket('<%= $self->url_for('pub/blah')->to_abs->scheme('ws') %>');
var sub = new WebSocket('<%= $self->url_for('sub/blah')->to_abs->scheme('ws') %>');
sub.onmessage = function (event) {
document.body.appendChild(document.createTextNode(event.data+' '));
};
pub.onopen = function (event) {
window.setInterval(function () { pub.send(count++) }, 1000);
};
</script>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment