-
-
Save anonymous/7ce6fb198a377aba6804 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lib/Minion/Backend/Pg.pm b/lib/Minion/Backend/Pg.pm | |
index 6795a56..0a35d9b 100644 | |
--- a/lib/Minion/Backend/Pg.pm | |
+++ b/lib/Minion/Backend/Pg.pm | |
@@ -1,17 +1,26 @@ | |
package Minion::Backend::Pg; | |
use Mojo::Base 'Minion::Backend'; | |
+use Mojo::IOLoop; | |
use Mojo::JSON qw(decode_json encode_json); | |
use Mojo::Pg; | |
use Sys::Hostname 'hostname'; | |
-use Time::HiRes 'usleep'; | |
has 'pg'; | |
sub dequeue { | |
my ($self, $id, $timeout) = @_; | |
- usleep $timeout * 1000000 unless my $job = $self->_try($id); | |
- return $job || $self->_try($id); | |
+ | |
+ if (my $job = $self->_try($id)) { return $job } | |
+ | |
+ my $db = $self->pg->db; | |
+ my $timer = Mojo::IOLoop->timer($timeout => sub { $db->unlisten('*') }); | |
+ $db->on(notification => | |
+ sub { shift->unlisten('*') and Mojo::IOLoop->remove($timer) }); | |
+ $db->listen('minion.job'); | |
+ Mojo::IOLoop->start; | |
+ | |
+ return $self->_try($id); | |
} | |
sub enqueue { | |
@@ -19,7 +28,8 @@ sub enqueue { | |
my $args = shift // []; | |
my $options = shift // {}; | |
- return $self->pg->db->query( | |
+ my $db = $self->pg->db; | |
+ return $db->query( | |
"insert into minion_jobs | |
(args, created, delayed, priority, retries, state, task) | |
values | |
@@ -427,6 +437,17 @@ create table if not exists minion_workers ( | |
pid int not null, | |
started timestamp with time zone not null | |
); | |
+create or replace function notify_minion_jobs_insert() returns trigger as $$ | |
+begin | |
+ perform pg_notify('minion.job', 'trigger'); | |
+ return null; | |
+end; | |
+$$ language plpgsql; | |
+drop trigger if exists minion_jobs_insert_trigger on minion_jobs; | |
+create trigger minion_jobs_insert_trigger after insert on minion_jobs | |
+ for each row execute procedure notify_minion_jobs_insert(); | |
+ | |
-- 1 down | |
drop table if exists minion_jobs; | |
+drop function if exists notify_minion_jobs_insert(); | |
drop table if exists minion_workers; | |
diff --git a/t/pg.t b/t/pg.t | |
index 404b368..ca77770 100644 | |
--- a/t/pg.t | |
+++ b/t/pg.t | |
@@ -11,6 +11,7 @@ use Time::HiRes 'time'; | |
# Clean up before start | |
my $minion = Minion->new(Pg => $ENV{TEST_ONLINE}); | |
+$minion->backend->pg->migrations->migrate(0)->migrate; | |
$minion->reset; | |
# Nothing to repair |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment