Skip to content

Instantly share code, notes, and snippets.

@brianmed
Created November 8, 2014 03:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brianmed/f9225853acf6189b253a to your computer and use it in GitHub Desktop.
Save brianmed/f9225853acf6189b253a to your computer and use it in GitHub Desktop.
Beginnings of a Minion::Backend::DBDPg (DBD::Pg backend for Minion)
package DBX;
use Mojo::Base -base;
use DBI;
use Carp;
use DBIx::Connector
has dbdsn => "dbi:Pg:dbname=jobs";
has dbh => sub { state $dbh = pop };
has dbix => sub { state $dbix = pop };
has [qw(id username password email route)] => undef;
sub new {
my $class = shift;
my $self = $class->SUPER::new(@_);
$self->dbix($self->_build_dbix);
$self->dbh($self->_build_dbh);
return($self);
}
sub _build_dbh {
my $self = shift;
my $dbix = $self->dbix();
return($dbix->dbh());
}
sub _build_dbix {
my $self = shift;
my $conn = DBIx::Connector->new($self->dbdsn, "username", "password", {
RaiseError => 1,
PrintError => 0,
AutoCommit => 0,
});
$conn->disconnect_on_destroy(0);
return($conn);
}
sub do {
my $self = shift;
my $sql = shift;
my $attrs = shift;
my @vars = @_;
if (ref($self)) {
eval {
return($self->dbh()->do($sql, $attrs, @vars));
};
if ($@) {
croak("$@");
}
}
else {
my $dbh = $self->_build_dbh();
return($dbh->do($sql, $attrs, @vars));
}
}
sub success {
my $self = shift;
my $sql = shift;
my $attrs = shift;
my @vars = @_;
my $ret = $self->dbh()->do($sql, $attrs, @vars);
if ($ret && 0 != $ret) { # 0E0
return(1);
}
return(0);
}
sub last_insert_id
{
my $self = shift;
my $catalog = shift;
my $schema = shift;
my $table = shift;
my $field = shift;
my $attrs = shift;
if ($attrs) {
return($self->dbh()->last_insert_id(undef,undef,undef,undef,$attrs));
}
else {
return($self->dbh()->last_insert_id($catalog, $schema, $table, $field, undef));
}
}
sub col {
my $self = shift;
my $sql = shift;
my $attrs = shift;
my @vars = @_;
my $ret = undef;
eval {
my $col = $self->dbh()->selectcol_arrayref($sql, $attrs, @vars);
if ($col && defined $$col[0]) {
$ret = $$col[0];
}
};
if ($@) {
croak("$@");
}
return($ret);
}
sub row {
my $self = shift;
my $sql = shift;
my $attrs = shift;
my @vars = @_;
my $ret = $self->dbh()->selectall_arrayref($sql, { Slice => {} }, @vars);
if ($ret && $$ret[0]) {
return($$ret[0]);
}
return(undef);
}
sub question
{
my $self = shift;
my $nbr = shift;
return(join(", ", map({"?"} (1 .. $nbr))));
}
sub array {
my $self = shift;
my $sql = shift;
my $attrs = shift;
my @vars = @_;
my $ret = $self->dbh()->selectall_arrayref($sql, { Slice => {} }, @vars);
if ($ret) {
return($ret);
}
return(undef);
}
package Minion::Backend::DBDPg;
use Mojo::Base 'Minion::Backend';
use List::Util 'first';
use Mojo::Util qw(md5_sum);
use Mojo::JSON qw(encode_json decode_json);
use Sys::Hostname 'hostname';
use Time::HiRes qw(time usleep);
# use DBX;
has 'db';
sub dequeue {
my ($self, $id, $timeout) = @_;
usleep $timeout * 1000000 unless my $job = $self->_try($id);
return $job || $self->_try($id);
}
sub enqueue {
my ($self, $task) = (shift, shift);
my $args = shift // [];
my $options = shift // {};
my $db = $self->db;
my $job = {
args => encode_json($args),
created => time,
delayed => $options->{delay} ? (time + $options->{delay}) : 1,
id => $self->_job_id,
priority => $options->{priority} // 0,
retries => 0,
state => 'inactive',
task => $task
};
$self->_jobs->{$job->{id}} = $job;
$db->do(
"UPDATE job SET args = ?, created = ?, delayed = ?, priority = ?, retries = ?, state = ?, task = ? WHERE id = ?", undef,
$job->{args}, $job->{created}, $job->{delayed}, $job->{priority}, $job->{retries}, $job->{state}, $job->{task}, $job->{id}
);
$db->dbh->commit;
return $job->{id};
}
sub _job {
my ($self, $id) = (shift, shift);
return undef unless my $job = $self->_jobs->{$id};
return grep({ $job->{state} eq $_ } @_) ? $job : undef;
}
sub _job_id {
my $self = shift;
my $db = $self->db;
my $q = $db->question(7);
$db->do(
"INSERT INTO job (args, created, delayed, priority, retries, state, task) VALUES ($q)", undef,
'{"_stub":"_stub"}', "_stub", "_stub", "_stub", "_stub", "_stub", "_stub"
);
my $id = $db->last_insert_id(undef, undef, "job");
return $id;
}
sub new { shift->SUPER::new(db => DBX->new) }
sub register_worker {
my $self = shift;
$DB::single = 1;
my $db = $self->db;
my $worker = {host => hostname, id => $self->_worker_id, pid => $$, started => time};
my $q = $db->question(4);
$db->do(
"UPDATE worker SET host = ?, pid = ?, started = ? WHERE id = ?", undef,
$worker->{host}, $worker->{pid}, $worker->{started}, $worker->{id}
);
$db->dbh->commit;
# $self->_workers->{$worker->{id}} = $worker;
return $worker->{id};
}
sub _worker_id {
my $self = shift;
my $db = $self->db;
my $q = $db->question(3);
$db->do(
"INSERT INTO worker (host, pid, started) VALUES ($q)", undef,
"_stub", "_stub", "_stub"
);
my $id = $db->last_insert_id(undef, undef, "worker");
return $id;
}
sub repair {
my $self = shift;
return;
### One day this will work
# Check workers on this host (all should be owned by the same user)
my $db = $self->db;
$db->lock_exclusive;
my $workers = $self->_workers;
my $host = hostname;
delete $workers->{$_->{id}}
for grep { $_->{host} eq $host && !kill 0, $_->{pid} } values %$workers;
# Abandoned jobs
my $jobs = $self->_jobs;
for my $job (values %$jobs) {
next if $job->{state} ne 'active' || $workers->{$job->{worker}};
@$job{qw(error state)} = ('Worker went away', 'failed');
}
# Old jobs
my $after = time - $self->minion->remove_after;
delete $jobs->{$_->{id}}
for grep { $_->{state} eq 'finished' && $_->{finished} < $after }
values %$jobs;
$db->unlock;
}
sub _try {
my ($self, $id) = @_;
my $db = $self->db;
$DB::single = 1;
my @ready = grep { $_->{state} eq 'inactive' } values %{$self->_jobs};
my $now = time;
@ready = grep { $_->{delayed} < $now } @ready;
@ready = sort { $a->{created} <=> $b->{created} } @ready;
@ready = sort { $b->{priority} <=> $a->{priority} } @ready;
my $job = first { $self->minion->tasks->{$_->{task}} } @ready;
# @$job{qw(started state worker)} = (time, 'active', $id) if $job;
if ($job) {
$db->do(
"UPDATE job SET started = ?, state = ?, worker = ? WHERE id = ?", undef,
time, 'active', $id, $job->{id}
);
$db->dbh->commit;
}
return $job ? $job : undef; # export?
}
sub _jobs {
my ($self, $id) = @_;
$DB::single = 1;
my $jobs = $self->db->array("SELECT * FROM job ORDER BY id");
my %jobs;
foreach my $job (@{ $jobs }) {
$job->{args} = decode_json($job->{args});
$jobs{$$job{id}} = $job;
}
return(\%jobs);
}
sub _update {
my ($self, $fail, $id, $err) = @_;
$DB::single = 1;
my $db = $self->db;
my $job = $self->_job($id, 'active');
if ($job) {
$job->{finished} = time;
$job->{state} = $fail ? 'failed' : 'finished';
$job->{error} = $err if $err;
$db->do(
"UPDATE job SET args = ?, created = ?, delayed = ?, priority = ?, retries = ?, state = ?, task = ? WHERE id = ?", undef,
encode_json($job->{args}), $job->{created}, $job->{delayed}, $job->{priority}, $job->{retries}, $job->{state}, $job->{task}, $job->{id}
);
$db->dbh->commit;
}
return !!$job;
}
sub fail_job { shift->_update(1, @_) }
sub finish_job { shift->_update(0, @_) }
sub unregister_worker {
my $self = shift;
my $id = shift;
my $db = $self->db;
$db->do(
"DELETE FROM worker WHERE id = ?", undef,
$id
);
$db->dbh->commit;
}
sub remove_job {
my ($self, $id) = @_;
my $db = $self->db;
$DB::single = 1;
my $removed = !!$self->_job($id, 'failed', 'finished', 'inactive');
if ($removed) {
$db->do(
"DELETE FROM job WHERE id = ?", undef,
$id
);
$db->dbh->commit;
}
return $removed;
}
1;
__END__
BEGIN;
CREATE TABLE job(
id serial not null PRIMARY KEY,
args json NOT NULL,
started VARCHAR(64),
error VARCHAR(64),
worker VARCHAR(64),
created VARCHAR(64) NOT NULL,
delayed VARCHAR(64) NOT NULL,
priority VARCHAR(64) NOT NULL,
retries VARCHAR(64) NOT NULL,
state VARCHAR(64) NOT NULL,
task VARCHAR(64) NOT NULL,
updated timestamp default CURRENT_TIMESTAMP,
inserted timestamp default CURRENT_TIMESTAMP
);
CREATE TRIGGER user_timestamp BEFORE INSERT OR UPDATE ON job
FOR EACH ROW EXECUTE PROCEDURE update_timestamp();
GRANT SELECT ON TABLE job TO username;
GRANT INSERT ON TABLE job TO username;
GRANT UPDATE ON TABLE job TO username;
GRANT DELETE ON TABLE job TO username;
GRANT USAGE, SELECT ON SEQUENCE job_id_seq TO username;
COMMIT;
BEGIN;
CREATE TABLE worker(
id serial not null PRIMARY KEY,
host VARCHAR(64) NOT NULL,
pid VARCHAR(64) NOT NULL,
started VARCHAR(64) NOT NULL,
updated timestamp default CURRENT_TIMESTAMP,
inserted timestamp default CURRENT_TIMESTAMP
);
CREATE TRIGGER user_timestamp BEFORE INSERT OR UPDATE ON worker
FOR EACH ROW EXECUTE PROCEDURE update_timestamp();
GRANT SELECT ON TABLE worker TO username;
GRANT INSERT ON TABLE worker TO username;
GRANT UPDATE ON TABLE worker TO username;
GRANT DELETE ON TABLE worker TO username;
GRANT USAGE, SELECT ON SEQUENCE worker_id_seq TO username;
COMMIT;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment