Skip to content

Instantly share code, notes, and snippets.

@alep

alep/loader.py Secret

Created July 22, 2019 00:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alep/0234f89cc0245b5af949be504e5e8905 to your computer and use it in GitHub Desktop.
Save alep/0234f89cc0245b5af949be504e5e8905 to your computer and use it in GitHub Desktop.
import collections
import csv
import gevent
import gipc
from geventhttpclient import HTTPClient
from geventhttpclient.url import URL
import logging
from flask import Blueprint
from flask_restplus import Api, Resource
from .base import download
from .item import Item
logger = logging.getLogger(__name__)
loader = Blueprint("loader", __name__)
api = Api(loader, title="Loading API object", description="Loads items")
def post(reader):
num_worker_threads = 10
logger.info('[post] started posting process')
url = URL('http://127.0.0.1:8000/anything') #using httpbin to test
http = HTTPClient.from_url(url, concurrency=num_worker_threads)
def _worker(client, headers, json):
response = client.post(url.request_uri, body=json, headers=headers)
g = gevent.getcurrent()
logger.info('sending {}'.format(g.minimal_ident))
if response.status_code > 300:
logger.error('[warm-up] request failed: {response}'.format(response=response))
pool = gevent.pool.Pool(num_worker_threads)
count = 0
while True:
headers, json, done = reader.get()
count += 1
if done:
logger.info('[warm-up] breaking')
break
pool.spawn(_worker, http, headers, json)
logger.info('[warm-up] Done: {}'.format(count))
logger.info('[warm-up] Close')
logger.info('[warm-up] Exit')
def itemswriter(writer, filename):
with open(filename) as data:
for item_data in csv.reader(data):
item = Item(key, item_data)
headers, json = item.headers(), item.to_json()
writer.put((headers, json, False))
writer.put(({}, "", True))
@api.route("/")
class LoaderResource(Resource):
def post(self):
logger.info("[loader] starting warm-up")
filename = download()
with gipc.pipe() as (r, w):
p = gipc.start_process(target=post, args=(r,))
wg = gevent.spawn(itemswriter, w, filename)
try:
p.join()
except:
wg.kill(block=True)
p.terminate()
return {'status': 'failed'}
logger.info('done join')
return {"status": "ok"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment