Skip to content

Instantly share code, notes, and snippets.

Created November 16, 2014 15:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/7ce6fb198a377aba6804 to your computer and use it in GitHub Desktop.
Save anonymous/7ce6fb198a377aba6804 to your computer and use it in GitHub Desktop.
diff --git a/lib/Minion/Backend/Pg.pm b/lib/Minion/Backend/Pg.pm
index 6795a56..0a35d9b 100644
--- a/lib/Minion/Backend/Pg.pm
+++ b/lib/Minion/Backend/Pg.pm
@@ -1,17 +1,26 @@
package Minion::Backend::Pg;
use Mojo::Base 'Minion::Backend';
+use Mojo::IOLoop;
use Mojo::JSON qw(decode_json encode_json);
use Mojo::Pg;
use Sys::Hostname 'hostname';
-use Time::HiRes 'usleep';
has 'pg';
sub dequeue {
my ($self, $id, $timeout) = @_;
- usleep $timeout * 1000000 unless my $job = $self->_try($id);
- return $job || $self->_try($id);
+
+ if (my $job = $self->_try($id)) { return $job }
+
+ my $db = $self->pg->db;
+ my $timer = Mojo::IOLoop->timer($timeout => sub { $db->unlisten('*') });
+ $db->on(notification =>
+ sub { shift->unlisten('*') and Mojo::IOLoop->remove($timer) });
+ $db->listen('minion.job');
+ Mojo::IOLoop->start;
+
+ return $self->_try($id);
}
sub enqueue {
@@ -19,7 +28,8 @@ sub enqueue {
my $args = shift // [];
my $options = shift // {};
- return $self->pg->db->query(
+ my $db = $self->pg->db;
+ return $db->query(
"insert into minion_jobs
(args, created, delayed, priority, retries, state, task)
values
@@ -427,6 +437,17 @@ create table if not exists minion_workers (
pid int not null,
started timestamp with time zone not null
);
+create or replace function notify_minion_jobs_insert() returns trigger as $$
+begin
+ perform pg_notify('minion.job', 'trigger');
+ return null;
+end;
+$$ language plpgsql;
+drop trigger if exists minion_jobs_insert_trigger on minion_jobs;
+create trigger minion_jobs_insert_trigger after insert on minion_jobs
+ for each row execute procedure notify_minion_jobs_insert();
+
-- 1 down
drop table if exists minion_jobs;
+drop function if exists notify_minion_jobs_insert();
drop table if exists minion_workers;
diff --git a/t/pg.t b/t/pg.t
index 404b368..ca77770 100644
--- a/t/pg.t
+++ b/t/pg.t
@@ -11,6 +11,7 @@ use Time::HiRes 'time';
# Clean up before start
my $minion = Minion->new(Pg => $ENV{TEST_ONLINE});
+$minion->backend->pg->migrations->migrate(0)->migrate;
$minion->reset;
# Nothing to repair
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment