Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Distributed File Donwloader using ZMQ. List of urls is read from a file dstributed to workers, gathered and written to a directory.
#
# Distributed File Dowoloader
# ===========================
#
# List of urls is read from a file, distributed to workers, gathered and written to a directory.
#
# INSTALLATION & USAGE
# --------------------
# sudo apt-get install python-pip python-dev build-essential git screen
# sudo pip install pyzmq requests
#
# screen python DistributedUrlDownloader.py server_zmq.py distribute
# screen python DistributedUrlDownloader.py server_zmq.py work
# screen python DistributedUrlDownloader.py server_zmq.py gather
#
import requests
import traceback
import os
import shutil
import time
import sys
import zmq
REQUEST_FILE = "urls.txt"
OUT_DIR = "OUT"
WORK_OUT = "tcp://*:60123"
WORK_IN = "tcp://127.0.0.1:60123"
RESULT_OUT = "tcp://127.0.0.1:60124"
RESULT_IN = "tcp://*:60124"
import thread
def main():
if not len(sys.argv) >= 2:
print "Usage: $0 distribute, work, gather"
return
if sys.argv[1] == "distribute": distributeWork(WORK_OUT, REQUEST_FILE)
if sys.argv[1] == "work":
if len(sys.argv) == 2: doWork(WORK_IN, RESULT_OUT)
if len(sys.argv) == 3:
host = sys.argv[2]
doWork("tcp://" + host + ":60123", "tcp://" + host + ":60124")
if sys.argv[1] == "gather": gatherResults(RESULT_IN, OUT_DIR)
def distributeWork(out_addr, filename):
context = zmq.Context()
work_out = context.socket(zmq.PUSH)
work_out.setsockopt(zmq.SNDHWM, 1)
work_out.set_hwm(1)
work_out.bind(out_addr)
fh = open(filename)
for url in fh:
try:
# remove ending \n of URL
url = url.strip()
filename = getFileName(url)
if fileOk(filename):
print "Skipping image, already found: " + url
continue
print "Sending out: " + url
work_out.send(url)
except KeyboardInterrupt:
break
except Exception:
print "Error parsing line\n> " + line
traceback.print_exc()
def doWork(work_in_addr, res_out_addr):
context = zmq.Context()
print "Reading from: " + work_in_addr
print "Writing to: " + res_out_addr
work_in = context.socket(zmq.PULL)
work_in.setsockopt(zmq.RCVHWM, 1)
work_in.setsockopt(zmq.RCVBUF, 1) # in bytes
work_in.set_hwm(1)
work_in.connect(work_in_addr)
res_out = context.socket(zmq.PUSH)
res_out.set_hwm(1)
res_out.connect(res_out_addr)
while (True):
try:
url = work_in.recv_string()
if url == None: break # Keyboad interrupt
print "Processing: " + url
image_string = downloadImage(url)
if not len(image_string) > 100:
print "> Error: File too small."
continue
res_out.send_json([url, image_string]);
except KeyboardInterrupt:
break
except Exception:
print "> Error."
traceback.print_exc()
def downloadImage(url):
print "Downloading url: " + url
time.sleep(1) # BE NICE!
return requests.get(url).content.encode('base64')
def gatherResults(res_addr, filename):
context = zmq.Context()
res_socket = context.socket(zmq.PULL)
res_socket.bind(res_addr)
while(True):
try:
url, image_string = res_socket.recv_json()
print "Received url: " + url
out_file = getFileName(url)
if fileOk(out_file):
print "> Skipping: File already present."
writeImageToFile(image_string, out_file)
if not fileOk(out_file):
print "> Error: Writing file failed."
except KeyboardInterrupt:
break
except Exception:
print "> Error."
traceback.print_exc()
def writeImageToFile(image_string, filename):
# make dirs if not available
dirname = os.path.dirname(filename)
if not os.path.exists(dirname):
os.makedirs(dirname)
fh = open(filename, 'w')
fh.write(image_string.decode('base64'))
fh.close()
def getFileName(url):
# write to path relative to OUT DIR
# modify this method if other paths are intended.
return os.path.join(OUT_DIR,url)
def fileOk(filename):
if not os.path.exists(filename):
return False
if not os.path.getsize(filename) > 100:
return False
return True
if __name__ == "__main__": main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment