Last active
December 28, 2015 20:39
-
-
Save demikl/7559175 to your computer and use it in GitHub Desktop.
Example of data import from PostgreSQL to Couchbase. Used during Couchbase Meetup Number 1, Paris.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import psycopg2, psycopg2.extras, json | |
import eventlet,eventlet.pools, eventlet.db_pool | |
from multiprocessing import Process as mps, JoinableQueue | |
# Import de la lib memcache_client en remplaçant les I/O bloquantes par celles d'eventlet | |
mc = eventlet.import_patched('memcache_client.memcache') | |
# Taille des pools : connexions couchbase et processus workers | |
CB_SIZE = 50 | |
PS_SIZE = 10 | |
# Fonction exécutée par les processus workers | |
def ps_run( queue ): | |
# Pool de green-threads pour paralléliser les tâches effectuant des I/O | |
evPool = eventlet.GreenPool(CB_SIZE*2) | |
# Pool de connexions Couchbase utilisées par les green-threads | |
cbPool = eventlet.pools.Pool( | |
min_size=CB_SIZE, max_size=CB_SIZE, # Toutes les connexions sont établies dès la création du pool, et non à la demande | |
create=lambda: mc.Client("127.0.0.1", 11212) ) # Créateur de connexion | |
# Dispatching des éléments reçus sur la queue vers les green-threads | |
for cbKey in evPool.imap( | |
lambda r:workerCB(cbPool,r), # fonction exécutée par chaque green-thread | |
iter(queue.get()) ): # itération sur les éléments reçus via la queue | |
# Acquittement du message sur la queue | |
queue.task_done() | |
# Tâche parallélisée dans chaque worker | |
def workerCB( cbPool, row ): | |
with cbPool.item() as cb: # Récupération d'une connexion dans le pool | |
cb.set( row["ref"], json.dumps( row ) ) # Stockage dans Couchbase en format JSON | |
return cbKey | |
# Processus principal : extraction données PG et envoi vers workers | |
def run(): | |
# Queue de messages entre processus | |
q = JoinableQueue(CB_SIZE * PS_SIZE * 3) | |
# Pool de processus workers | |
ps = [ mps(target=ps_run, args=(q,)) for _ in range(PS_SIZE) ] | |
for p in ps: | |
p.start() | |
# Connexion PG et requête SQL | |
conn = psycopg2.connect( | |
"dbname='mydb' user='me' host='localhost'", | |
cursor_factory=psycopg2.extras.DictCursor ) # Permet d'obtenir les données SQL directement au format Dict de Python | |
c = conn.cursor() | |
c.execute( """SELECT ref, title, price | |
FROM matable WHERE idcatalog=42""" ) | |
# Itération sur les résultats SQL | |
for row in c: | |
# Envoi des données dans la queue | |
q.put( (row,) ) | |
# Attente fin execution des taches puis arret processus | |
q.join() | |
for p in ps: p.terminate() | |
# Aiguillage au lancement du script vers la fonction destinée au processus principal | |
if __name__ == '__main__': | |
run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment