Skip to content

Instantly share code, notes, and snippets.

@demikl
Last active December 28, 2015 20:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save demikl/7559175 to your computer and use it in GitHub Desktop.
Save demikl/7559175 to your computer and use it in GitHub Desktop.
Example of data import from PostgreSQL to Couchbase. Used during Couchbase Meetup Number 1, Paris.
import psycopg2, psycopg2.extras, json
import eventlet,eventlet.pools, eventlet.db_pool
from multiprocessing import Process as mps, JoinableQueue
# Import de la lib memcache_client en remplaçant les I/O bloquantes par celles d'eventlet
mc = eventlet.import_patched('memcache_client.memcache')
# Taille des pools : connexions couchbase et processus workers
CB_SIZE = 50
PS_SIZE = 10
# Fonction exécutée par les processus workers
def ps_run( queue ):
# Pool de green-threads pour paralléliser les tâches effectuant des I/O
evPool = eventlet.GreenPool(CB_SIZE*2)
# Pool de connexions Couchbase utilisées par les green-threads
cbPool = eventlet.pools.Pool(
min_size=CB_SIZE, max_size=CB_SIZE, # Toutes les connexions sont établies dès la création du pool, et non à la demande
create=lambda: mc.Client("127.0.0.1", 11212) ) # Créateur de connexion
# Dispatching des éléments reçus sur la queue vers les green-threads
for cbKey in evPool.imap(
lambda r:workerCB(cbPool,r), # fonction exécutée par chaque green-thread
iter(queue.get()) ): # itération sur les éléments reçus via la queue
# Acquittement du message sur la queue
queue.task_done()
# Tâche parallélisée dans chaque worker
def workerCB( cbPool, row ):
with cbPool.item() as cb: # Récupération d'une connexion dans le pool
cb.set( row["ref"], json.dumps( row ) ) # Stockage dans Couchbase en format JSON
return cbKey
# Processus principal : extraction données PG et envoi vers workers
def run():
# Queue de messages entre processus
q = JoinableQueue(CB_SIZE * PS_SIZE * 3)
# Pool de processus workers
ps = [ mps(target=ps_run, args=(q,)) for _ in range(PS_SIZE) ]
for p in ps:
p.start()
# Connexion PG et requête SQL
conn = psycopg2.connect(
"dbname='mydb' user='me' host='localhost'",
cursor_factory=psycopg2.extras.DictCursor ) # Permet d'obtenir les données SQL directement au format Dict de Python
c = conn.cursor()
c.execute( """SELECT ref, title, price
FROM matable WHERE idcatalog=42""" )
# Itération sur les résultats SQL
for row in c:
# Envoi des données dans la queue
q.put( (row,) )
# Attente fin execution des taches puis arret processus
q.join()
for p in ps: p.terminate()
# Aiguillage au lancement du script vers la fonction destinée au processus principal
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment