Skip to content

Instantly share code, notes, and snippets.

@VyBui
Last active April 9, 2020 10:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save VyBui/382c754076232143155fa14cbcb99abb to your computer and use it in GitHub Desktop.
Save VyBui/382c754076232143155fa14cbcb99abb to your computer and use it in GitHub Desktop.
import logging
log = logging.getLogger(__name__)
from smart_fashion_client.api.request_util import extract_extension_for_key
from smart_fashion_client.s3_upload import helpers
#
from smart_fashion_client.api.body_parts_segmentation.logic.body_parts_segmentation import human_parsing_processing
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer
from json import loads
results_json = []
def get_human_parsing_process(message, count):
"""
:param message:
:return:
"""
image_person_url = message['imagePersonURL']
owner_id = count
# print("handling {} for {}".format(image_person_url, count))
body_segments, schp_segments = human_parsing_processing(image_person_url)
# For saving with the proper colours
schp_segments = schp_segments[:, :, ::-1]
body_segments = body_segments[:, :, ::-1]
key, format_key = extract_extension_for_key(image_person_url=image_person_url,
user_id=owner_id,
module="combine_features")
body_segments_url = helpers.upload_numpy_array_to_s3(body_segments, key, format_key)
key, format_key = extract_extension_for_key(image_person_url=image_person_url,
user_id=owner_id,
module="SCHP")
schp_segments_url = helpers.upload_numpy_array_to_s3(schp_segments, key, format_key)
single_result = {
"clothesImageURL": "https://bap-fashion-shop.s3.amazonaws.com/monster_energy.jpg",
"ownerId": int(owner_id),
"combineFeaturesURL": str(body_segments_url),
"schpURL": str(schp_segments_url),
"imagePersonURL": str(image_person_url)
}
return single_result
consumer = KafkaConsumer(
'bpsIn',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8')))
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
dumps(x).encode('utf-8'))
count = 0
import time
start = time.time()
for message in consumer:
message = message.value
count += 1
result = get_human_parsing_process(message, count)
results_json.append(result)
print(results_json)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment