Created
November 8, 2014 03:06
-
-
Save brianmed/f9225853acf6189b253a to your computer and use it in GitHub Desktop.
Beginnings of a Minion::Backend::DBDPg (DBD::Pg backend for Minion)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package DBX; | |
use Mojo::Base -base; | |
use DBI; | |
use Carp; | |
use DBIx::Connector | |
has dbdsn => "dbi:Pg:dbname=jobs"; | |
has dbh => sub { state $dbh = pop }; | |
has dbix => sub { state $dbix = pop }; | |
has [qw(id username password email route)] => undef; | |
sub new { | |
my $class = shift; | |
my $self = $class->SUPER::new(@_); | |
$self->dbix($self->_build_dbix); | |
$self->dbh($self->_build_dbh); | |
return($self); | |
} | |
sub _build_dbh { | |
my $self = shift; | |
my $dbix = $self->dbix(); | |
return($dbix->dbh()); | |
} | |
sub _build_dbix { | |
my $self = shift; | |
my $conn = DBIx::Connector->new($self->dbdsn, "username", "password", { | |
RaiseError => 1, | |
PrintError => 0, | |
AutoCommit => 0, | |
}); | |
$conn->disconnect_on_destroy(0); | |
return($conn); | |
} | |
sub do { | |
my $self = shift; | |
my $sql = shift; | |
my $attrs = shift; | |
my @vars = @_; | |
if (ref($self)) { | |
eval { | |
return($self->dbh()->do($sql, $attrs, @vars)); | |
}; | |
if ($@) { | |
croak("$@"); | |
} | |
} | |
else { | |
my $dbh = $self->_build_dbh(); | |
return($dbh->do($sql, $attrs, @vars)); | |
} | |
} | |
sub success { | |
my $self = shift; | |
my $sql = shift; | |
my $attrs = shift; | |
my @vars = @_; | |
my $ret = $self->dbh()->do($sql, $attrs, @vars); | |
if ($ret && 0 != $ret) { # 0E0 | |
return(1); | |
} | |
return(0); | |
} | |
sub last_insert_id | |
{ | |
my $self = shift; | |
my $catalog = shift; | |
my $schema = shift; | |
my $table = shift; | |
my $field = shift; | |
my $attrs = shift; | |
if ($attrs) { | |
return($self->dbh()->last_insert_id(undef,undef,undef,undef,$attrs)); | |
} | |
else { | |
return($self->dbh()->last_insert_id($catalog, $schema, $table, $field, undef)); | |
} | |
} | |
sub col { | |
my $self = shift; | |
my $sql = shift; | |
my $attrs = shift; | |
my @vars = @_; | |
my $ret = undef; | |
eval { | |
my $col = $self->dbh()->selectcol_arrayref($sql, $attrs, @vars); | |
if ($col && defined $$col[0]) { | |
$ret = $$col[0]; | |
} | |
}; | |
if ($@) { | |
croak("$@"); | |
} | |
return($ret); | |
} | |
sub row { | |
my $self = shift; | |
my $sql = shift; | |
my $attrs = shift; | |
my @vars = @_; | |
my $ret = $self->dbh()->selectall_arrayref($sql, { Slice => {} }, @vars); | |
if ($ret && $$ret[0]) { | |
return($$ret[0]); | |
} | |
return(undef); | |
} | |
sub question | |
{ | |
my $self = shift; | |
my $nbr = shift; | |
return(join(", ", map({"?"} (1 .. $nbr)))); | |
} | |
sub array { | |
my $self = shift; | |
my $sql = shift; | |
my $attrs = shift; | |
my @vars = @_; | |
my $ret = $self->dbh()->selectall_arrayref($sql, { Slice => {} }, @vars); | |
if ($ret) { | |
return($ret); | |
} | |
return(undef); | |
} | |
package Minion::Backend::DBDPg; | |
use Mojo::Base 'Minion::Backend'; | |
use List::Util 'first'; | |
use Mojo::Util qw(md5_sum); | |
use Mojo::JSON qw(encode_json decode_json); | |
use Sys::Hostname 'hostname'; | |
use Time::HiRes qw(time usleep); | |
# use DBX; | |
has 'db'; | |
sub dequeue { | |
my ($self, $id, $timeout) = @_; | |
usleep $timeout * 1000000 unless my $job = $self->_try($id); | |
return $job || $self->_try($id); | |
} | |
sub enqueue { | |
my ($self, $task) = (shift, shift); | |
my $args = shift // []; | |
my $options = shift // {}; | |
my $db = $self->db; | |
my $job = { | |
args => encode_json($args), | |
created => time, | |
delayed => $options->{delay} ? (time + $options->{delay}) : 1, | |
id => $self->_job_id, | |
priority => $options->{priority} // 0, | |
retries => 0, | |
state => 'inactive', | |
task => $task | |
}; | |
$self->_jobs->{$job->{id}} = $job; | |
$db->do( | |
"UPDATE job SET args = ?, created = ?, delayed = ?, priority = ?, retries = ?, state = ?, task = ? WHERE id = ?", undef, | |
$job->{args}, $job->{created}, $job->{delayed}, $job->{priority}, $job->{retries}, $job->{state}, $job->{task}, $job->{id} | |
); | |
$db->dbh->commit; | |
return $job->{id}; | |
} | |
sub _job { | |
my ($self, $id) = (shift, shift); | |
return undef unless my $job = $self->_jobs->{$id}; | |
return grep({ $job->{state} eq $_ } @_) ? $job : undef; | |
} | |
sub _job_id { | |
my $self = shift; | |
my $db = $self->db; | |
my $q = $db->question(7); | |
$db->do( | |
"INSERT INTO job (args, created, delayed, priority, retries, state, task) VALUES ($q)", undef, | |
'{"_stub":"_stub"}', "_stub", "_stub", "_stub", "_stub", "_stub", "_stub" | |
); | |
my $id = $db->last_insert_id(undef, undef, "job"); | |
return $id; | |
} | |
sub new { shift->SUPER::new(db => DBX->new) } | |
sub register_worker { | |
my $self = shift; | |
$DB::single = 1; | |
my $db = $self->db; | |
my $worker = {host => hostname, id => $self->_worker_id, pid => $$, started => time}; | |
my $q = $db->question(4); | |
$db->do( | |
"UPDATE worker SET host = ?, pid = ?, started = ? WHERE id = ?", undef, | |
$worker->{host}, $worker->{pid}, $worker->{started}, $worker->{id} | |
); | |
$db->dbh->commit; | |
# $self->_workers->{$worker->{id}} = $worker; | |
return $worker->{id}; | |
} | |
sub _worker_id { | |
my $self = shift; | |
my $db = $self->db; | |
my $q = $db->question(3); | |
$db->do( | |
"INSERT INTO worker (host, pid, started) VALUES ($q)", undef, | |
"_stub", "_stub", "_stub" | |
); | |
my $id = $db->last_insert_id(undef, undef, "worker"); | |
return $id; | |
} | |
sub repair { | |
my $self = shift; | |
return; | |
### One day this will work | |
# Check workers on this host (all should be owned by the same user) | |
my $db = $self->db; | |
$db->lock_exclusive; | |
my $workers = $self->_workers; | |
my $host = hostname; | |
delete $workers->{$_->{id}} | |
for grep { $_->{host} eq $host && !kill 0, $_->{pid} } values %$workers; | |
# Abandoned jobs | |
my $jobs = $self->_jobs; | |
for my $job (values %$jobs) { | |
next if $job->{state} ne 'active' || $workers->{$job->{worker}}; | |
@$job{qw(error state)} = ('Worker went away', 'failed'); | |
} | |
# Old jobs | |
my $after = time - $self->minion->remove_after; | |
delete $jobs->{$_->{id}} | |
for grep { $_->{state} eq 'finished' && $_->{finished} < $after } | |
values %$jobs; | |
$db->unlock; | |
} | |
sub _try { | |
my ($self, $id) = @_; | |
my $db = $self->db; | |
$DB::single = 1; | |
my @ready = grep { $_->{state} eq 'inactive' } values %{$self->_jobs}; | |
my $now = time; | |
@ready = grep { $_->{delayed} < $now } @ready; | |
@ready = sort { $a->{created} <=> $b->{created} } @ready; | |
@ready = sort { $b->{priority} <=> $a->{priority} } @ready; | |
my $job = first { $self->minion->tasks->{$_->{task}} } @ready; | |
# @$job{qw(started state worker)} = (time, 'active', $id) if $job; | |
if ($job) { | |
$db->do( | |
"UPDATE job SET started = ?, state = ?, worker = ? WHERE id = ?", undef, | |
time, 'active', $id, $job->{id} | |
); | |
$db->dbh->commit; | |
} | |
return $job ? $job : undef; # export? | |
} | |
sub _jobs { | |
my ($self, $id) = @_; | |
$DB::single = 1; | |
my $jobs = $self->db->array("SELECT * FROM job ORDER BY id"); | |
my %jobs; | |
foreach my $job (@{ $jobs }) { | |
$job->{args} = decode_json($job->{args}); | |
$jobs{$$job{id}} = $job; | |
} | |
return(\%jobs); | |
} | |
sub _update { | |
my ($self, $fail, $id, $err) = @_; | |
$DB::single = 1; | |
my $db = $self->db; | |
my $job = $self->_job($id, 'active'); | |
if ($job) { | |
$job->{finished} = time; | |
$job->{state} = $fail ? 'failed' : 'finished'; | |
$job->{error} = $err if $err; | |
$db->do( | |
"UPDATE job SET args = ?, created = ?, delayed = ?, priority = ?, retries = ?, state = ?, task = ? WHERE id = ?", undef, | |
encode_json($job->{args}), $job->{created}, $job->{delayed}, $job->{priority}, $job->{retries}, $job->{state}, $job->{task}, $job->{id} | |
); | |
$db->dbh->commit; | |
} | |
return !!$job; | |
} | |
sub fail_job { shift->_update(1, @_) } | |
sub finish_job { shift->_update(0, @_) } | |
sub unregister_worker { | |
my $self = shift; | |
my $id = shift; | |
my $db = $self->db; | |
$db->do( | |
"DELETE FROM worker WHERE id = ?", undef, | |
$id | |
); | |
$db->dbh->commit; | |
} | |
sub remove_job { | |
my ($self, $id) = @_; | |
my $db = $self->db; | |
$DB::single = 1; | |
my $removed = !!$self->_job($id, 'failed', 'finished', 'inactive'); | |
if ($removed) { | |
$db->do( | |
"DELETE FROM job WHERE id = ?", undef, | |
$id | |
); | |
$db->dbh->commit; | |
} | |
return $removed; | |
} | |
1; | |
__END__ | |
BEGIN; | |
CREATE TABLE job( | |
id serial not null PRIMARY KEY, | |
args json NOT NULL, | |
started VARCHAR(64), | |
error VARCHAR(64), | |
worker VARCHAR(64), | |
created VARCHAR(64) NOT NULL, | |
delayed VARCHAR(64) NOT NULL, | |
priority VARCHAR(64) NOT NULL, | |
retries VARCHAR(64) NOT NULL, | |
state VARCHAR(64) NOT NULL, | |
task VARCHAR(64) NOT NULL, | |
updated timestamp default CURRENT_TIMESTAMP, | |
inserted timestamp default CURRENT_TIMESTAMP | |
); | |
CREATE TRIGGER user_timestamp BEFORE INSERT OR UPDATE ON job | |
FOR EACH ROW EXECUTE PROCEDURE update_timestamp(); | |
GRANT SELECT ON TABLE job TO username; | |
GRANT INSERT ON TABLE job TO username; | |
GRANT UPDATE ON TABLE job TO username; | |
GRANT DELETE ON TABLE job TO username; | |
GRANT USAGE, SELECT ON SEQUENCE job_id_seq TO username; | |
COMMIT; | |
BEGIN; | |
CREATE TABLE worker( | |
id serial not null PRIMARY KEY, | |
host VARCHAR(64) NOT NULL, | |
pid VARCHAR(64) NOT NULL, | |
started VARCHAR(64) NOT NULL, | |
updated timestamp default CURRENT_TIMESTAMP, | |
inserted timestamp default CURRENT_TIMESTAMP | |
); | |
CREATE TRIGGER user_timestamp BEFORE INSERT OR UPDATE ON worker | |
FOR EACH ROW EXECUTE PROCEDURE update_timestamp(); | |
GRANT SELECT ON TABLE worker TO username; | |
GRANT INSERT ON TABLE worker TO username; | |
GRANT UPDATE ON TABLE worker TO username; | |
GRANT DELETE ON TABLE worker TO username; | |
GRANT USAGE, SELECT ON SEQUENCE worker_id_seq TO username; | |
COMMIT; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment