Created
December 6, 2013 20:07
-
-
Save HeinrichHartmann/7831282 to your computer and use it in GitHub Desktop.
Distributed File Donwloader using ZMQ. List of urls is read from a file dstributed to workers, gathered and written to a directory.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Distributed File Dowoloader | |
# =========================== | |
# | |
# List of urls is read from a file, distributed to workers, gathered and written to a directory. | |
# | |
# INSTALLATION & USAGE | |
# -------------------- | |
# sudo apt-get install python-pip python-dev build-essential git screen | |
# sudo pip install pyzmq requests | |
# | |
# screen python DistributedUrlDownloader.py server_zmq.py distribute | |
# screen python DistributedUrlDownloader.py server_zmq.py work | |
# screen python DistributedUrlDownloader.py server_zmq.py gather | |
# | |
import requests | |
import traceback | |
import os | |
import shutil | |
import time | |
import sys | |
import zmq | |
REQUEST_FILE = "urls.txt" | |
OUT_DIR = "OUT" | |
WORK_OUT = "tcp://*:60123" | |
WORK_IN = "tcp://127.0.0.1:60123" | |
RESULT_OUT = "tcp://127.0.0.1:60124" | |
RESULT_IN = "tcp://*:60124" | |
import thread | |
def main(): | |
if not len(sys.argv) >= 2: | |
print "Usage: $0 distribute, work, gather" | |
return | |
if sys.argv[1] == "distribute": distributeWork(WORK_OUT, REQUEST_FILE) | |
if sys.argv[1] == "work": | |
if len(sys.argv) == 2: doWork(WORK_IN, RESULT_OUT) | |
if len(sys.argv) == 3: | |
host = sys.argv[2] | |
doWork("tcp://" + host + ":60123", "tcp://" + host + ":60124") | |
if sys.argv[1] == "gather": gatherResults(RESULT_IN, OUT_DIR) | |
def distributeWork(out_addr, filename): | |
context = zmq.Context() | |
work_out = context.socket(zmq.PUSH) | |
work_out.setsockopt(zmq.SNDHWM, 1) | |
work_out.set_hwm(1) | |
work_out.bind(out_addr) | |
fh = open(filename) | |
for url in fh: | |
try: | |
# remove ending \n of URL | |
url = url.strip() | |
filename = getFileName(url) | |
if fileOk(filename): | |
print "Skipping image, already found: " + url | |
continue | |
print "Sending out: " + url | |
work_out.send(url) | |
except KeyboardInterrupt: | |
break | |
except Exception: | |
print "Error parsing line\n> " + line | |
traceback.print_exc() | |
def doWork(work_in_addr, res_out_addr): | |
context = zmq.Context() | |
print "Reading from: " + work_in_addr | |
print "Writing to: " + res_out_addr | |
work_in = context.socket(zmq.PULL) | |
work_in.setsockopt(zmq.RCVHWM, 1) | |
work_in.setsockopt(zmq.RCVBUF, 1) # in bytes | |
work_in.set_hwm(1) | |
work_in.connect(work_in_addr) | |
res_out = context.socket(zmq.PUSH) | |
res_out.set_hwm(1) | |
res_out.connect(res_out_addr) | |
while (True): | |
try: | |
url = work_in.recv_string() | |
if url == None: break # Keyboad interrupt | |
print "Processing: " + url | |
image_string = downloadImage(url) | |
if not len(image_string) > 100: | |
print "> Error: File too small." | |
continue | |
res_out.send_json([url, image_string]); | |
except KeyboardInterrupt: | |
break | |
except Exception: | |
print "> Error." | |
traceback.print_exc() | |
def downloadImage(url): | |
print "Downloading url: " + url | |
time.sleep(1) # BE NICE! | |
return requests.get(url).content.encode('base64') | |
def gatherResults(res_addr, filename): | |
context = zmq.Context() | |
res_socket = context.socket(zmq.PULL) | |
res_socket.bind(res_addr) | |
while(True): | |
try: | |
url, image_string = res_socket.recv_json() | |
print "Received url: " + url | |
out_file = getFileName(url) | |
if fileOk(out_file): | |
print "> Skipping: File already present." | |
writeImageToFile(image_string, out_file) | |
if not fileOk(out_file): | |
print "> Error: Writing file failed." | |
except KeyboardInterrupt: | |
break | |
except Exception: | |
print "> Error." | |
traceback.print_exc() | |
def writeImageToFile(image_string, filename): | |
# make dirs if not available | |
dirname = os.path.dirname(filename) | |
if not os.path.exists(dirname): | |
os.makedirs(dirname) | |
fh = open(filename, 'w') | |
fh.write(image_string.decode('base64')) | |
fh.close() | |
def getFileName(url): | |
# write to path relative to OUT DIR | |
# modify this method if other paths are intended. | |
return os.path.join(OUT_DIR,url) | |
def fileOk(filename): | |
if not os.path.exists(filename): | |
return False | |
if not os.path.getsize(filename) > 100: | |
return False | |
return True | |
if __name__ == "__main__": main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment