Skip to content

Instantly share code, notes, and snippets.

/PubSub.pm Secret

Created February 14, 2015 00:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/de3f35464265cc2363fb to your computer and use it in GitHub Desktop.
Save anonymous/de3f35464265cc2363fb to your computer and use it in GitHub Desktop.
package Mojo::Pg::PubSub;
use Mojo::Base -base;
use Scalar::Util 'weaken';
has 'pg';
sub emit {
my ($self, $name, $payload) = @_;
$self->_db->notify($name, $payload);
return $self;
}
sub has_subscribers { !!@{shift->{events}{shift()} || []} }
sub on {
my ($self, $name, $cb) = @_;
push @{$self->{events}{$name}}, $cb;
$self->_db->listen($name) unless @{$self->{events}{$name}} > 1;
return $cb;
}
sub subscribers { shift->{events}{shift()} || [] }
sub unsubscribe {
my ($self, $name, $cb) = @_;
# One
if ($cb) {
$self->{events}{$name} = [grep { $cb ne $_ } @{$self->{events}{$name}}];
delete $self->{events}{$name} unless @{$self->{events}{$name}};
}
# All
else { delete $self->{events}{$name} }
$self->_db->unlisten($name) unless @{$self->{events}{$name}};
return $self;
}
sub _db {
my $self = shift;
return $self->{db} if $self->{db};
my $db = $self->{db} = $self->pg->db;
weaken $self;
$db->on(
notification => sub {
my ($db, $name, $pid, $payload) = @_;
for my $cb (@{$self->{events}{$name}}) { $self->$cb($payload) }
}
);
$db->once(
close => sub {
delete $self->{db};
my $db = $self->_db;
$db->listen($_) for keys %{$self->{events}};
}
);
return $db;
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment