Skip to content

Instantly share code, notes, and snippets.

@xaprb
Created January 18, 2014 18:58
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save xaprb/8494656 to your computer and use it in GitHub Desktop.
#!/usr/bin/perl
use strict;
use warnings FATAL => 'all';
use DBI;
use English qw(-no_match_vars);
use Getopt::Long;
use List::Util qw(max);
our $VERSION = '@VERSION@';
our $DISTRIB = '@DISTRIB@';
our $SVN_REV = sprintf("%d", q$Revision: 765 $ =~ m/(\d+)/g);
# ############################################################################
# Get configuration information.
# ############################################################################
# Define cmdline args; each is GetOpt::Long spec, whether required,
# human-readable description. Add more hash entries as needed.
my @opt_spec = (
{ s => 'database|D=s', d => 'Database to use' },
{ s => 'defaults-file|F=s', d => 'Only read default options from the given file' },
{ s => 'host|h=s', d => 'Connect to host' },
{ s => 'help', d => 'Show this help message' },
{ s => 'password|p=s', d => 'Password to use when connecting' },
{ s => 'port|P=i', d => 'Port number to use for connection' },
{ s => 'socket|S=s', d => 'Socket file to use for connection' },
{ s => 'user|u=s', d => 'User for login if not current user' },
{ s => 'version', d => 'Output version information and exit' },
{ s => 'mode=s', d => 'Mode: producer or consumer (p/c)' },
);
# This is the container for the command-line options' values to be stored in
# after processing. Initial values are defaults.
my %opts;
# Post-process...
my %opt_seen;
foreach my $spec ( @opt_spec ) {
my ( $long, $short ) = $spec->{s} =~ m/^([\w-]+)(?:\|([^!+=]*))?/;
$spec->{k} = $short || $long;
$spec->{l} = $long;
$spec->{t} = $short;
$spec->{n} = $spec->{s} =~ m/!/;
$opts{$spec->{k}} = undef unless defined $opts{$spec->{k}};
die "Duplicate option $spec->{k}" if $opt_seen{$spec->{k}}++;
}
Getopt::Long::Configure('no_ignore_case', 'bundling');
GetOptions( map { $_->{s} => \$opts{$_->{k}} } @opt_spec) or $opts{help} = 1;
if ( $opts{version} ) {
print "$PROGRAM_NAME Ver $VERSION Distrib $DISTRIB Changeset $SVN_REV\n";
exit(0);
}
$opts{help} = 1 unless $opts{mode};
# If a filename or other argument(s) is required after the other arguments,
# add "|| !@ARGV" inside the parens on the next line.
if ( $opts{help} ) {
print "Usage: $PROGRAM_NAME <options> --mode <mode>\n\n";
my $maxw = max(map { length($_->{l}) + ($_->{n} ? 4 : 0)} @opt_spec);
foreach my $spec ( sort { $a->{l} cmp $b->{l} } @opt_spec ) {
my $long = $spec->{n} ? "[no]$spec->{l}" : $spec->{l};
my $short = $spec->{t} ? "-$spec->{t}" : '';
printf(" --%-${maxw}s %-4s %s\n", $long, $short, $spec->{d});
}
print <<USAGE;
$PROGRAM_NAME produces messages and puts them into test.messages table.
If possible, database options are read from your .my.cnf file.
For more details, please read the documentation:
perldoc $PROGRAM_NAME
USAGE
exit(1);
}
# ############################################################################
# Get ready to do the main work.
# ############################################################################
my %conn = (
F => 'mysql_read_default_file',
h => 'host',
P => 'port',
S => 'mysql_socket'
);
# Connect to the database
my $dsn = 'DBI:mysql:' . ( $opts{D} || '' ) . ';'
. join(';', map { "$conn{$_}=$opts{$_}" } grep { defined $opts{$_} } qw(F h P S))
. ';mysql_read_default_group=mysql';
my $dbh = DBI->connect($dsn, @opts{qw(u p)}, { AutoCommit => 1, RaiseError => 1, PrintError => 0 } );
my $ins = $dbh->prepare('INSERT INTO test.messages(message) values (?)');
my $sel = $dbh->prepare('SELECT * FROM test.messages WHERE id > ?');
my $get = 'SELECT GET_LOCK("test.messages", ?)';
my $rel = 'SELECT RELEASE_LOCK("test.messages")';
# ############################################################################
# PRODUCER CODE
# ############################################################################
my $lock_time = 1_000_000;
if ( $opts{mode} eq 'p' ) {
print "Starting up in producer mode\n";
my $msg;
do {
my $lck = $dbh->selectall_arrayref($get, {}, $lock_time)->[0]->[0];
print "Enter a one-line message:\n";
chomp($msg = <STDIN>);
if ( $msg ) {
$ins->execute($msg);
}
$dbh->do($rel);
} while ($msg);
}
# ############################################################################
# CONSUMER CODE
# ############################################################################
else {
print "Starting up in consumer mode\n";
my $last_row = 0;
my $got_lock = 1;
while ( 1 ) {
$got_lock = $dbh->selectall_arrayref($get, {}, $lock_time)->[0]->[0];
if ( $got_lock ) {
# I got the lock, can go ahead and consume
$sel->execute($last_row);
my @rows = @{$sel->fetchall_arrayref({})};
foreach my $row ( @rows ) {
print "Message: $row->{message}\n";
$last_row = $row->{id};
}
}
$dbh->do($rel);
# $got_lock could be undefined (I already had the lock, there's no
# producer) or 0 (I timed out).
if ( !defined $got_lock ) {
# Since there's no producer, I need to sleep a bit and allow the
# producer a chance to get a lock
sleep(10);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment