Skip to content

Instantly share code, notes, and snippets.

@ringerc
Created February 8, 2016 14:34
Show Gist options
  • Save ringerc/f74a12e430866ccd9227 to your computer and use it in GitHub Desktop.
Save ringerc/f74a12e430866ccd9227 to your computer and use it in GitHub Desktop.
Demo Python script for replicating to Solr
CREATE TABLE book_lines
(
id serial primary key,
book_line text not null
);
#!/usr/bin/env python3
import psycopg2
import json
import requests
import time
import pprint
import logging
solr_collection_url = "http://localhost:8983/solr/gettingstarted"
# connect to Pg
# for now we'll use the sql interface
def connect():
global conn
global curs
conn = psycopg2.connect('dbname=solrdemo user=postgres')
curs = conn.cursor()
def create_slot():
print("creating slot")
try:
curs.execute("SELECT * FROM pg_create_logical_replication_slot('solrdemo', 'pglogical_output');")
conn.commit()
print("slot created")
except psycopg2.ProgrammingError as ex:
conn.rollback()
print("slot already exists, re-using")
def decode_stream():
while True:
curs.execute("""
SELECT *
FROM pg_logical_slot_get_changes(
'solrdemo', NULL, NULL,
'expected_encoding', 'UTF8',
'min_proto_version', '1',
'max_proto_version', '1',
'startup_params_format', '1',
'proto_format', 'json',
'no_txinfo', 't'
);
""")
while True:
tup = curs.fetchone()
if tup is None:
break
(lsn, xid, data) = tup
data = json.loads(data)
logging.debug("fetched " + repr(data))
msgtype = data['action']
logging.debug("msg: " + msgtype)
if msgtype == 'S':
# ignore startup msg in this demo
pass
elif msgtype == 'B':
xact_start = time.time()
solr_message = []
counts = { 'I': 0, 'U': 0, 'D': 0 }
elif msgtype in ('I', 'U', 'D'):
counts[msgtype] += 1
process_change(solr_message, data)
elif msgtype == 'C':
if len(solr_message) == 0:
logging.debug("Skipping empty xact")
continue
xact_commit = time.time()
logging.info("Took {}ms to replay xact with {} members (I={}, U={}, D={}) to commit".format(
int(xact_commit - xact_start),
len(solr_message),
counts['I'], counts['U'], counts['D']
))
send_message(solr_message)
else:
raise ValueError("Unexpected message type " + msgtype)
# SQL interface doesn't let us block
logging.debug("Ran out of data, sleeping...")
time.sleep(1)
def process_change(solr_message, data):
"""
Transform a change to solr json
Solr relies on duplicate json keys and Python doesn't like them
so we string template the message.
"""
action = data['action']
newtuple = None
oldkey = None
if action in ('I', 'U'):
newtuple = data['newtuple']
if action == 'D':
# The non-null fields are the key.
# we should really send this differently...
oldkey = dict([ (k, v) for (k, v) in data['oldtuple'].items() if v is not None ])
#
# We use solr with schema and declare the uniqueKey in the solr
# schema so we don't have to declare any uniqueKey here.
#
# We're using a dynamic solr structure with uniquekey 'id'
# defined in its schema.xml .
#
if action in ('I', 'U'):
solr_message.append('"add": ' + json.dumps({ "doc": newtuple }))
# TODO: Do we have to generate a query to delete?
# We don't know the solr document id
if action == 'D':
solr_message.append('"delete": ' + json.dumps(oldkey))
def send_message(solr_message):
"""Send the solr json to the endpoint, with commit=true parameter"""
req_params = {
'commit': 'true'
}
post_body = "{" + ',\n'.join(solr_message) + "}"
logging.debug(post_body)
post_start = time.time()
r = requests.post(
solr_collection_url + "/update/json?",
params = req_params,
data = post_body
)
post_finish = time.time()
solr_response = json.loads(r.text)
logging.info("Took {}ms to send to solr".format(int((post_finish-post_start)*1000)))
logging.debug("response: " + r.text)
if int(solr_response['responseHeader']['status']) != 0:
logging.error("Failed to import a document: " + r.text)
else:
logging.debug("Solr apply took {}ms".format(
len(solr_message), int(solr_response['responseHeader']['QTime'])))
if __name__ == '__main__':
# level: DEBUG, INFO, ...
logging.basicConfig(level='INFO')
# shut requests up
logging.getLogger('requests.packages.urllib3.connectionpool').setLevel('ERROR')
connect()
create_slot()
decode_stream()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment