Created
February 8, 2016 14:34
-
-
Save ringerc/f74a12e430866ccd9227 to your computer and use it in GitHub Desktop.
Demo Python script for replicating to Solr
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE book_lines | |
( | |
id serial primary key, | |
book_line text not null | |
); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import psycopg2 | |
import json | |
import requests | |
import time | |
import pprint | |
import logging | |
solr_collection_url = "http://localhost:8983/solr/gettingstarted" | |
# connect to Pg | |
# for now we'll use the sql interface | |
def connect(): | |
global conn | |
global curs | |
conn = psycopg2.connect('dbname=solrdemo user=postgres') | |
curs = conn.cursor() | |
def create_slot(): | |
print("creating slot") | |
try: | |
curs.execute("SELECT * FROM pg_create_logical_replication_slot('solrdemo', 'pglogical_output');") | |
conn.commit() | |
print("slot created") | |
except psycopg2.ProgrammingError as ex: | |
conn.rollback() | |
print("slot already exists, re-using") | |
def decode_stream(): | |
while True: | |
curs.execute(""" | |
SELECT * | |
FROM pg_logical_slot_get_changes( | |
'solrdemo', NULL, NULL, | |
'expected_encoding', 'UTF8', | |
'min_proto_version', '1', | |
'max_proto_version', '1', | |
'startup_params_format', '1', | |
'proto_format', 'json', | |
'no_txinfo', 't' | |
); | |
""") | |
while True: | |
tup = curs.fetchone() | |
if tup is None: | |
break | |
(lsn, xid, data) = tup | |
data = json.loads(data) | |
logging.debug("fetched " + repr(data)) | |
msgtype = data['action'] | |
logging.debug("msg: " + msgtype) | |
if msgtype == 'S': | |
# ignore startup msg in this demo | |
pass | |
elif msgtype == 'B': | |
xact_start = time.time() | |
solr_message = [] | |
counts = { 'I': 0, 'U': 0, 'D': 0 } | |
elif msgtype in ('I', 'U', 'D'): | |
counts[msgtype] += 1 | |
process_change(solr_message, data) | |
elif msgtype == 'C': | |
if len(solr_message) == 0: | |
logging.debug("Skipping empty xact") | |
continue | |
xact_commit = time.time() | |
logging.info("Took {}ms to replay xact with {} members (I={}, U={}, D={}) to commit".format( | |
int(xact_commit - xact_start), | |
len(solr_message), | |
counts['I'], counts['U'], counts['D'] | |
)) | |
send_message(solr_message) | |
else: | |
raise ValueError("Unexpected message type " + msgtype) | |
# SQL interface doesn't let us block | |
logging.debug("Ran out of data, sleeping...") | |
time.sleep(1) | |
def process_change(solr_message, data): | |
""" | |
Transform a change to solr json | |
Solr relies on duplicate json keys and Python doesn't like them | |
so we string template the message. | |
""" | |
action = data['action'] | |
newtuple = None | |
oldkey = None | |
if action in ('I', 'U'): | |
newtuple = data['newtuple'] | |
if action == 'D': | |
# The non-null fields are the key. | |
# we should really send this differently... | |
oldkey = dict([ (k, v) for (k, v) in data['oldtuple'].items() if v is not None ]) | |
# | |
# We use solr with schema and declare the uniqueKey in the solr | |
# schema so we don't have to declare any uniqueKey here. | |
# | |
# We're using a dynamic solr structure with uniquekey 'id' | |
# defined in its schema.xml . | |
# | |
if action in ('I', 'U'): | |
solr_message.append('"add": ' + json.dumps({ "doc": newtuple })) | |
# TODO: Do we have to generate a query to delete? | |
# We don't know the solr document id | |
if action == 'D': | |
solr_message.append('"delete": ' + json.dumps(oldkey)) | |
def send_message(solr_message): | |
"""Send the solr json to the endpoint, with commit=true parameter""" | |
req_params = { | |
'commit': 'true' | |
} | |
post_body = "{" + ',\n'.join(solr_message) + "}" | |
logging.debug(post_body) | |
post_start = time.time() | |
r = requests.post( | |
solr_collection_url + "/update/json?", | |
params = req_params, | |
data = post_body | |
) | |
post_finish = time.time() | |
solr_response = json.loads(r.text) | |
logging.info("Took {}ms to send to solr".format(int((post_finish-post_start)*1000))) | |
logging.debug("response: " + r.text) | |
if int(solr_response['responseHeader']['status']) != 0: | |
logging.error("Failed to import a document: " + r.text) | |
else: | |
logging.debug("Solr apply took {}ms".format( | |
len(solr_message), int(solr_response['responseHeader']['QTime']))) | |
if __name__ == '__main__': | |
# level: DEBUG, INFO, ... | |
logging.basicConfig(level='INFO') | |
# shut requests up | |
logging.getLogger('requests.packages.urllib3.connectionpool').setLevel('ERROR') | |
connect() | |
create_slot() | |
decode_stream() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment