Skip to content

Instantly share code, notes, and snippets.

@zed
Created March 12, 2011 21:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zed/53154856be2005d4fa50 to your computer and use it in GitHub Desktop.
Save zed/53154856be2005d4fa50 to your computer and use it in GitHub Desktop.
upload files using cloudfiles with multiprocessing/concurrent.futures modules
from concurrent import futures # pip install futures
from cloudfiles.connection import ConnectionPool
def get_container(name, conn):
#NOTE: ConnectionPool ensures that `conn` is owned by one thread
# at a time; get/set themself on a dict are atomic in CPython for
# tuples of strings, ints
try: return get_container.cache[name, id(conn)]
except KeyError:
ret = get_container.cache[name, id(conn)] = conn.create_container(name)
return ret
get_container.cache = {}
def uploader(connection_pool, filename):
conn = connection_pool.get()
try:
obj = get_container('test', conn).create_object(filename)
obj.load_from_filename(filename)
finally:
connection_pool.put(conn)
nconnections = 16
username = '---'
api_key = '---'
connection_pool = ConnectionPool(username, api_key, poolsize=nconnections)
with futures.ThreadPoolExecutor(max_workers=nconnections) as executor:
jobs = [executor.submit(uploader, connection_pool, f)
for f in get_filenames()]
for future in futures.as_completed(jobs):
if future.exception() is not None:
print(future.exception()) # error
import multiprocessing
import cloudfiles
USERNAME = '---'
API_KEY = '---'
def get_container():
if get_container.value is None:
conn = cloudfiles.get_connection(USERNAME, API_KEY)
get_container.value = conn.create_container('test')
return get_container.value
get_container.value = None
def upload_file(filename):
container = get_container()
obj = container.create_object(filename)
obj.load_from_filename(filename)
def main():
NPROCESSES = 4
pool = multiprocessing.Pool(processes=NPROCESSES)
for _ in pool.imap_unordered(upload_file, get_filenames()):
pass
if __name__=="__main__":
multiprocessing.freeze_support()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment