Skip to content

Instantly share code, notes, and snippets.

@JimLee1996
Created February 24, 2020 13:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JimLee1996/76bf27d154851244c152d9459c9aee92 to your computer and use it in GitHub Desktop.
Save JimLee1996/76bf27d154851244c152d9459c9aee92 to your computer and use it in GitHub Desktop.
import json
import time
import asyncio
from aionet import ImageFetcher
from aiokafka import AIOKafkaConsumer
from config import KAFKA_SERVERS, KAFKA_IN_TOPIC, KAFKA_GROUP_ID
from config import KAFKA_CIRCLE, KAFKA_HISTORY_TIMEOUT
import logging
logger = logging.getLogger(__name__)
logger.setLevel("INFO")
async def transactional_process(loop, output_queue):
consumer = AIOKafkaConsumer(
KAFKA_IN_TOPIC,
loop=loop,
bootstrap_servers=KAFKA_SERVERS,
group_id=KAFKA_GROUP_ID,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
)
await consumer.start()
fetcher = ImageFetcher(loop=loop)
async def process(msg, output_queue):
# 1. 解析kafka信息
recv_time = int(time.time() * 1000)
pass_id = msg.value['passId']
img_url = msg.value['imgURL']
img_path = msg.value['path']
# 2. 异步下载图片
img = await fetcher.get_img(img_url)
download_time = int(time.time() * 1000)
# 3. 图片放入待处理队列
msg = (img, pass_id, img_path, recv_time, download_time)
output_queue.put(msg)
return download_time - recv_time
try:
drops = []
tasks = []
start_time = time.time()
async for msg in consumer:
delta = int(time.time() - msg.value['sendTime'] / 1000)
if delta > KAFKA_HISTORY_TIMEOUT:
drops.append(delta)
else:
task = asyncio.create_task(process(msg, output_queue))
tasks.append(task)
if time.time() - start_time >= KAFKA_CIRCLE:
# 未处理信息
if len(drops) > 0:
logger.warning("未处理的过时消息 %d 条, 最远为 %d 秒前, 最近为 %d 秒前" %
(len(drops), drops[0], drops[-1]))
drops = []
# 收集处理任务
if len(tasks) > 0:
num_done = 0 # 完成任务数
duration = 0 # 完成时间
for task in tasks:
if task.done():
tasks.remove(task)
num_done += 1
duration += task.result()
if num_done > 0:
logger.info("下载待处理图片 %d 张, 平均用时 %d 毫秒" %
(num_done, round(duration / num_done)))
start_time = time.time()
finally:
await consumer.stop()
await fetcher.stop()
def run_async(async_main, output_queue):
# Setup to properly handle KeyboardInterrupt exception
loop = asyncio.get_event_loop()
m_task = loop.create_task(async_main(loop, output_queue))
m_task.add_done_callback(lambda task, loop=loop: loop.stop())
try:
loop.run_forever()
except KeyboardInterrupt:
m_task.cancel()
loop.run_forever()
finally:
if not m_task.cancelled():
m_task.result()
def run_consumer(output_queue):
run_async(transactional_process, output_queue)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment