Skip to content

Instantly share code, notes, and snippets.

@mnencia
Created April 16, 2016 08:35
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mnencia/ca25e28635c2ceb08938b2fa074f053a to your computer and use it in GitHub Desktop.
Save mnencia/ca25e28635c2ceb08938b2fa074f053a to your computer and use it in GitHub Desktop.
pglogical decodin in python
from __future__ import print_function
import sys
import psycopg2
import psycopg2.extras
conn = psycopg2.connect(
'host=localhost user=postgres port=5495',
connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()
replication_options = {
'min_proto_version': '1',
'max_proto_version': '1',
'startup_params_format': '1',
'proto_format': 'json'}
try:
cur.start_replication(
slot_name='pytest', decode=True,
options=replication_options)
except psycopg2.ProgrammingError:
cur.create_replication_slot('pytest', output_plugin='pglogical_output')
cur.start_replication(
slot_name='pytest', decode=True,
options=replication_options)
class DemoConsumer(object):
def __call__(self, msg):
print(msg.payload)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
democonsumer = DemoConsumer()
print("Starting streaming, press Control-C to end...", file=sys.stderr)
try:
cur.consume_stream(democonsumer)
except KeyboardInterrupt:
cur.close()
conn.close()
print("The slot 'pytest' still exists. Drop it with "
"SELECT pg_drop_replication_slot('pytest'); if no longer needed.",
file=sys.stderr)
print("WARNING: Transaction logs will accumulate in pg_xlog "
"until the slot is dropped.", file=sys.stderr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment