-
-
Save alep/0234f89cc0245b5af949be504e5e8905 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import csv | |
import gevent | |
import gipc | |
from geventhttpclient import HTTPClient | |
from geventhttpclient.url import URL | |
import logging | |
from flask import Blueprint | |
from flask_restplus import Api, Resource | |
from .base import download | |
from .item import Item | |
logger = logging.getLogger(__name__) | |
loader = Blueprint("loader", __name__) | |
api = Api(loader, title="Loading API object", description="Loads items") | |
def post(reader): | |
num_worker_threads = 10 | |
logger.info('[post] started posting process') | |
url = URL('http://127.0.0.1:8000/anything') #using httpbin to test | |
http = HTTPClient.from_url(url, concurrency=num_worker_threads) | |
def _worker(client, headers, json): | |
response = client.post(url.request_uri, body=json, headers=headers) | |
g = gevent.getcurrent() | |
logger.info('sending {}'.format(g.minimal_ident)) | |
if response.status_code > 300: | |
logger.error('[warm-up] request failed: {response}'.format(response=response)) | |
pool = gevent.pool.Pool(num_worker_threads) | |
count = 0 | |
while True: | |
headers, json, done = reader.get() | |
count += 1 | |
if done: | |
logger.info('[warm-up] breaking') | |
break | |
pool.spawn(_worker, http, headers, json) | |
logger.info('[warm-up] Done: {}'.format(count)) | |
logger.info('[warm-up] Close') | |
logger.info('[warm-up] Exit') | |
def itemswriter(writer, filename): | |
with open(filename) as data: | |
for item_data in csv.reader(data): | |
item = Item(key, item_data) | |
headers, json = item.headers(), item.to_json() | |
writer.put((headers, json, False)) | |
writer.put(({}, "", True)) | |
@api.route("/") | |
class LoaderResource(Resource): | |
def post(self): | |
logger.info("[loader] starting warm-up") | |
filename = download() | |
with gipc.pipe() as (r, w): | |
p = gipc.start_process(target=post, args=(r,)) | |
wg = gevent.spawn(itemswriter, w, filename) | |
try: | |
p.join() | |
except: | |
wg.kill(block=True) | |
p.terminate() | |
return {'status': 'failed'} | |
logger.info('done join') | |
return {"status": "ok"} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment