Skip to content

Instantly share code, notes, and snippets.

@rubdos
Created January 16, 2016 19:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rubdos/4f7d0f3658eb15711da1 to your computer and use it in GitHub Desktop.
Save rubdos/4f7d0f3658eb15711da1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from boodschap.database import Product, productdb
from datetime import timedelta, datetime
from multiprocessing.pool import ThreadPool
import time
import urllib
import json
import threading
from queue import Queue
from couchdb import Server, ResourceNotFound
page_size = 10
until = 1452657600 # Arbitrary date in januari 2016. Change to whatever offset you need.
url = "http://world.openfoodfacts.org/cgi/search.pl?search_terms=&sort_by=last_modified_t&page_size=%s&json=1&page=%s"
threadPool = ThreadPool(10) # Threadpool for json decode workers
importerThreadCount = 10 # Amount of workers for the CouchDB import
simultaneousDownloads = 1
# Write the update time
with open("last_update.txt", "r+") as f:
contents = f.read()
if len(contents) > 0:
until = int(contents) - 3600/4 # Substract 15 minutes. You never know you missed something
print("Previous update: " + contents)
f.seek(0)
f.write(str(int(time.time())))
f.close()
q = Queue()
def parse_product(result):
if 'product_name' in result and len(result['product_name']) > 0:
now = datetime.utcnow().isoformat("T") + "Z"
p = {
'name': result['product_name'],
'barcode': result['code'],
'meta': {
'source': url,
'license': "ODbL + ODcL",
'retrieval_date': now,
'update_date': now,
'original': result,
}
}
q.put(p)
print(result['product_name'] + "\t\t\tadded to Queue")
return int(result['last_modified_t'])
def execute_transfers(thread_id):
print(thread_id)
last_date = int(time.time())
page = thread_id + 1
while last_date > until:
our_url = url % (page_size, page)
req = urllib.request.urlopen(our_url)
p = json.loads(req.read().decode())
result = threadPool.map(parse_product, p['products'])
new_last_date = min(result)
if not last_date > new_last_date:
print("Warning, infinite loop")
else:
last_date = new_last_date
page += simultaneousDownloads
def importer():
server = Server()
productdb = server['products']
while True:
item = q.get()
print(item['name'] + "\t\t\t\t\t popped from Queue")
barcode = item['barcode']
existingRecords = productdb.view('products/all')[barcode]
if len(existingRecords.rows) > 0:
doc = productdb[existingRecords.rows[0].id]
doc.update(item)
productdb[doc.id] = doc
else:
productdb.save(item)
q.task_done()
for i in range(importerThreadCount):
t = threading.Thread(target=importer)
t.daemon=True
t.start()
threadPool.map(execute_transfers, range(0, simultaneousDownloads))
q.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment