public
Created

  • Download Gist
producer_consumer.pl
Perl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148
#!/usr/bin/perl
 
use strict;
use warnings FATAL => 'all';
 
use DBI;
use English qw(-no_match_vars);
use Getopt::Long;
use List::Util qw(max);
 
our $VERSION = '@VERSION@';
our $DISTRIB = '@DISTRIB@';
our $SVN_REV = sprintf("%d", q$Revision: 765 $ =~ m/(\d+)/g);
 
# ############################################################################
# Get configuration information.
# ############################################################################
 
# Define cmdline args; each is GetOpt::Long spec, whether required,
# human-readable description. Add more hash entries as needed.
my @opt_spec = (
{ s => 'database|D=s', d => 'Database to use' },
{ s => 'defaults-file|F=s', d => 'Only read default options from the given file' },
{ s => 'host|h=s', d => 'Connect to host' },
{ s => 'help', d => 'Show this help message' },
{ s => 'password|p=s', d => 'Password to use when connecting' },
{ s => 'port|P=i', d => 'Port number to use for connection' },
{ s => 'socket|S=s', d => 'Socket file to use for connection' },
{ s => 'user|u=s', d => 'User for login if not current user' },
{ s => 'version', d => 'Output version information and exit' },
{ s => 'mode=s', d => 'Mode: producer or consumer (p/c)' },
);
# This is the container for the command-line options' values to be stored in
# after processing. Initial values are defaults.
my %opts;
# Post-process...
my %opt_seen;
foreach my $spec ( @opt_spec ) {
my ( $long, $short ) = $spec->{s} =~ m/^([\w-]+)(?:\|([^!+=]*))?/;
$spec->{k} = $short || $long;
$spec->{l} = $long;
$spec->{t} = $short;
$spec->{n} = $spec->{s} =~ m/!/;
$opts{$spec->{k}} = undef unless defined $opts{$spec->{k}};
die "Duplicate option $spec->{k}" if $opt_seen{$spec->{k}}++;
}
 
Getopt::Long::Configure('no_ignore_case', 'bundling');
GetOptions( map { $_->{s} => \$opts{$_->{k}} } @opt_spec) or $opts{help} = 1;
 
if ( $opts{version} ) {
print "$PROGRAM_NAME Ver $VERSION Distrib $DISTRIB Changeset $SVN_REV\n";
exit(0);
}
 
$opts{help} = 1 unless $opts{mode};
 
# If a filename or other argument(s) is required after the other arguments,
# add "|| !@ARGV" inside the parens on the next line.
if ( $opts{help} ) {
print "Usage: $PROGRAM_NAME <options> --mode <mode>\n\n";
my $maxw = max(map { length($_->{l}) + ($_->{n} ? 4 : 0)} @opt_spec);
foreach my $spec ( sort { $a->{l} cmp $b->{l} } @opt_spec ) {
my $long = $spec->{n} ? "[no]$spec->{l}" : $spec->{l};
my $short = $spec->{t} ? "-$spec->{t}" : '';
printf(" --%-${maxw}s %-4s %s\n", $long, $short, $spec->{d});
}
print <<USAGE;
 
$PROGRAM_NAME produces messages and puts them into test.messages table.
 
If possible, database options are read from your .my.cnf file.
For more details, please read the documentation:
 
perldoc $PROGRAM_NAME
 
USAGE
exit(1);
}
 
# ############################################################################
# Get ready to do the main work.
# ############################################################################
my %conn = (
F => 'mysql_read_default_file',
h => 'host',
P => 'port',
S => 'mysql_socket'
);
 
# Connect to the database
my $dsn = 'DBI:mysql:' . ( $opts{D} || '' ) . ';'
. join(';', map { "$conn{$_}=$opts{$_}" } grep { defined $opts{$_} } qw(F h P S))
. ';mysql_read_default_group=mysql';
my $dbh = DBI->connect($dsn, @opts{qw(u p)}, { AutoCommit => 1, RaiseError => 1, PrintError => 0 } );
 
my $ins = $dbh->prepare('INSERT INTO test.messages(message) values (?)');
my $sel = $dbh->prepare('SELECT * FROM test.messages WHERE id > ?');
my $get = 'SELECT GET_LOCK("test.messages", ?)';
my $rel = 'SELECT RELEASE_LOCK("test.messages")';
 
 
# ############################################################################
# PRODUCER CODE
# ############################################################################
my $lock_time = 1_000_000;
if ( $opts{mode} eq 'p' ) {
print "Starting up in producer mode\n";
my $msg;
do {
my $lck = $dbh->selectall_arrayref($get, {}, $lock_time)->[0]->[0];
print "Enter a one-line message:\n";
chomp($msg = <STDIN>);
if ( $msg ) {
$ins->execute($msg);
}
$dbh->do($rel);
} while ($msg);
}
 
# ############################################################################
# CONSUMER CODE
# ############################################################################
else {
print "Starting up in consumer mode\n";
my $last_row = 0;
my $got_lock = 1;
while ( 1 ) {
$got_lock = $dbh->selectall_arrayref($get, {}, $lock_time)->[0]->[0];
if ( $got_lock ) {
# I got the lock, can go ahead and consume
$sel->execute($last_row);
my @rows = @{$sel->fetchall_arrayref({})};
foreach my $row ( @rows ) {
print "Message: $row->{message}\n";
$last_row = $row->{id};
}
}
$dbh->do($rel);
# $got_lock could be undefined (I already had the lock, there's no
# producer) or 0 (I timed out).
if ( !defined $got_lock ) {
# Since there's no producer, I need to sleep a bit and allow the
# producer a chance to get a lock
sleep(10);
}
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.