Skip to content

Instantly share code, notes, and snippets.

@dimitri
Created April 25, 2012 15:32
Show Gist options
  • Save dimitri/2490675 to your computer and use it in GitHub Desktop.
Save dimitri/2490675 to your computer and use it in GitHub Desktop.
PGQ queue forwarding
#! /usr/bin/env python
"""
This script is used to forward events from source queues to
destination queue. Source and destination queues must be named
the same. The script applies to as many source databases as
needed. It allows for consuming all events from a distributed
system (think plproxy nodes) from a single federated queue
installed on the destination database.
Setup looks like:
[movers]
job_name = queue_mover
src_db = dbname=postgres port=5432 user=postgres
dst_db = host=10.10.10.10 port=6432 user=dbuser dbname=foo
pidfile = /var/tmp/%(job_name)s.pid
logfile = /var/tmp/%(job_name)s.log
loop_delay = 5
connection_lifetime = 30
pgq_lazy_fetch = 5000
fl_p16 = dbname=fl_p16 port=5432 user=fl
fl_p17 = dbname=fl_p17 port=5432 user=fl
...
The extra connection strings are used to connect to the local databases where to find the queues you want to forward. The main src_db is mainly used to discover those databases.
"""
import sys, os, ConfigParser
import pkgloader
pkgloader.require('skytools', '3.0')
import skytools, pgq, skytools.config as config
class QueueMover(pgq.SerialConsumer):
def __init__(self, dst_db, db_name, qname, log, args):
self.service_name = "mover %s.%s" % (db_name, qname)
self.dbname = db_name # keep a copy for logs output
self.db_name = db_name
self.dst_db = dst_db
self.queue_name = qname
self.batches = 0 # support for --mark-- info lines
self.empty = 0
self.load_config()
pgq.SerialConsumer.__init__(self, "queue_mover", "src_db", "dst_db", args)
self.dst_queue_name = self.queue_name
# force non daemon mode, whatever the command line says
self.set_single_loop(True)
self.go_daemon = 0
self.log = log
def load_config(self):
connstr = "dbname=%s user=fl" % self.dbname
override = {
"pidfile": '',
"src_db": connstr,
"dst_db": self.dst_db,
"queue_name": self.queue_name,
"dst_queue_name": self.queue_name
}
self.cf = config.Config(self.service_name, None, override = override)
def process_remote_batch(self, db, batch_id, ev_list, dst_db):
self.batches += 1
# load data
rows = []
for ev in ev_list:
data = [ev.type, ev.data, ev.extra1, ev.extra2, ev.extra3, ev.extra4, ev.time]
rows.append(data)
fields = ['type', 'data', 'extra1', 'extra2', 'extra3', 'extra4', 'time']
self.log.info("%9s %-15s batch: %d count: %d" %
(self.dbname, self.queue_name, batch_id, len(rows)))
# insert data
curs = dst_db.cursor()
pgq.bulk_insert_events(curs, rows, fields, self.dst_queue_name)
def work(self):
self.empty += 1
sleep = pgq.SerialConsumer.work(self)
if self.batches:
self.empty = 0
else:
self.log.debug("%9s %-15s saw %d consecutive empty batches" %
(self.dbname, self.queue_name, self.empty))
# we got called to work but didn't enter process_remote_batch
if self.empty > 10:
self.log.info("%9s %-15s --mark--" %
(self.dbname, self.queue_name))
self.empty = 0
return sleep
class Movers(skytools.DBScript):
def __init__(self, args):
skytools.DBScript.__init__(self, "movers", args)
self.load_config()
self.db_name = self.cf.get("src_db")
self.dst_db = self.cf.get("dst_db")
self.queue_movers = []
self.args = args
def init_optparse(self, parser = None):
p = skytools.DBScript.init_optparse(self, parser)
p.add_option('--list-databases', action='store_true', dest="listdb",
help = 'list local databases')
p.add_option('--register', action='store_true',
help = 'register consumer on queue')
p.add_option('--unregister', action='store_true',
help = 'unregister consumer from queue')
return p
def init_workers(self):
""" create the QueueMover objects """
db = self.get_database("src_db")
curs = db.cursor()
curs.execute("select datname from pg_database where datname ~ 'fl_p' order by datname")
for part in curs.fetchall():
datname = part['datname']
self.log.info("adding movers for %s" % datname)
pdb = self.get_database(datname)
pcurs = pdb.cursor()
pcurs.execute("select queue_name from pgq.queue order by queue_name")
for q in pcurs.fetchall():
qname = q['queue_name']
qm = QueueMover(self.dst_db, datname, qname, self.log, self.args)
self.queue_movers.append(qm)
def startup(self):
"""Handle commands here. __init__ does not have error logging."""
if self.options.listdb:
self.list_databases()
sys.exit(0)
if self.options.register:
self.register_consumer()
sys.exit(0)
if self.options.unregister:
self.unregister_consumer()
sys.exit(0)
# startup code is a good place to init our workers
self.init_workers()
return skytools.DBScript.startup(self)
def list_databases(self):
""" discover local databases """
db = self.get_database("src_db")
curs = db.cursor()
curs.execute("select datname from pg_database where datname ~ 'fl_p' order by datname")
for part in curs.fetchall():
datname = part['datname']
print "%s = dbname=%s port=5432 user=fl" % (datname, datname)
def register_consumer(self):
self.init_workers()
for qm in self.queue_movers:
self.log.info("register consumer %s for queue %s in database %s" %
(qm.service_name, qm.queue_name, qm.dbname))
qm.register_consumer()
def unregister_consumer(self):
self.init_workers()
for qm in self.queue_movers:
self.log.info("unregister consumer %s for queue %s in database %s" %
(qm.service_name, qm.queue_name, qm.dbname))
qm.unregister_consumer()
def work(self):
sleep = 0
for qm in self.queue_movers:
try:
sleep += qm.work()
except Exception, e:
print e
sys.exit(1)
self.log.debug("work is done, sleep = %d" % sleep)
return sleep
def start(self):
self.log.info("Starting process")
skytools.DBScript.start(self)
if __name__ == '__main__':
script = Movers(sys.argv[1:])
script.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment