Skip to content

Instantly share code, notes, and snippets.

@Jongbhin
Last active July 6, 2020 23:58
Show Gist options
  • Save Jongbhin/1811f2eec472678ce2e4b94f603ed4ac to your computer and use it in GitHub Desktop.
Save Jongbhin/1811f2eec472678ce2e4b94f603ed4ac to your computer and use it in GitHub Desktop.
[kafak reference] #kafka

Reference

session.timeout.ms

Install

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.5.0/kafka_2.12-2.5.0.tgz
tar -xzf kafka_2.12-2.5.0.tgz
cd kafka_2.12-2.5.0

Start the server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.

./bin/zookeeper-server-start.sh config/zookeeper.properties
./bin/kafka-server-start.sh config/server.properties

Sample code

https://docs.confluent.io/current/clients/python.html#python-client

Cleanscore Batch

#!/usr/bin/env python
import logging
import os
import threading
import time
from multiprocessing import Process
from queue import Queue
import csv
import json
from concurrent.futures import ThreadPoolExecutor

from confluent_kafka import Consumer, Producer, KafkaException, KafkaError
import requests

import config
import utility

app_api = config.BatchJob.app_api
headers = {'Content-Type': 'application/json'}


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        logging.error('Message delivery failed: {}'.format(err))
    else:
        logging.info('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


def _process_msg(q, c, p):
    try:
        p.poll(0)
        msg = q.get(timeout=60)  # Set timeout to care for POSIX<3.0 and Windows.
        msg_v = msg.value()
        json_msg = json.loads(msg_v)
        json_request = json_msg['request']

        prd_no = json_msg['prdNo']
        file_path = json_request['file_path']
        if not file_path:
            raise ValueError

    except ValueError:
        logging.error("The image file not exists! prd_no : {}, file_path : {}".format(prd_no, file_path))
        c.commit(msg)
        q.task_done()
        return
    except Exception as err:
        logging.error('Consume error : {}, prd_no : {}, message : {}'.format(err, prd_no, json_msg))
        c.commit(msg)
        q.task_done()
        return

    try:
        r = requests.post('http://' + app_api + '/v2/getScore/filePath', headers=headers,
                          data=json.dumps(json_request))
        r_json = r.json()
        json_response = json_msg.copy()
        del json_response['request']
        json_response['response'] = r_json
        data = json.dumps(json_response)
    except Exception as err:
        logging.error('Scoring error : {}, prd_no : {}, json_request : {}'.format(err, prd_no, json_request))
        # Scoring error! Don't commit.
        q.task_done()
        return

    try:
        p.produce('image.cleanscore.response', data, callback=delivery_report)
    except KafkaException as err:
        logging.error('Produce error : {}, prd_no : {}, json_request : {}'.format(err, prd_no, json_request))
        # Produce error! Don't commit
        q.task_done()
        slack_title = 'Local queue full!!!!'
        slack_message = '<@UFN7T1K6D>? Check the batch process!!!'
        utility.slack(slack_title, slack_message)
        p.poll(5)
        return

    try:
        if r_json['status'] != 'success':
            logging.error("Scoring failed! prd_no : {}, file_path : {}".format(prd_no, file_path))
        else:
            logging.info(
                '#%sT%s - Scored: %s',
                os.getpid(), threading.get_ident(), json_response
            )
    except Exception as err:
        logging.error('Json error : {}, prd_no : {}, json_response : {}'.format(err, prd_no, json_response))

    finally:
        q.task_done()
        c.commit(msg)


def _consume(kafka_config):
    '''
    There is no connection checking! So check if it works after start!!!
    '''

    logging.info(
        '#%s - Starting consumer group=%s, consume_topic=%s',
        os.getpid(), kafka_config['kafka_kwargs']['group.id'], kafka_config['consume_topic'],
    )
    c = Consumer(**kafka_config['kafka_kwargs'])
    c.subscribe([kafka_config['consume_topic']])
    q = Queue(maxsize=kafka_config['num_threads'])
    p = Producer({'bootstrap.servers': config.Kafka.produce_bootstrap_servers,
                  'linger.ms': 5,
                  })

    with ThreadPoolExecutor(max_workers=config.BatchJob.max_threads_per_process) as executor:
        while True:
            logging.info('#%s - Waiting for message...', os.getpid())
            try:
                msg = c.poll(config.Kafka.poll_time)
                if msg is None:
                    continue
                if msg.error():
                    logging.error(
                        '#%s - Consumer error: %s', os.getpid(), msg.error()
                    )
                    continue
                q.put(msg)

                # Use default daemon=False to stop threads gracefully in order to
                # release resources properly.
                # t = threading.Thread(target=_process_msg, args=(q, c, p))
                # t.start()
                # _process_msg(q, c, p)

                executor.submit(_process_msg, q, c, p)

            except Exception:
                logging.exception('#%s - Worker terminated.', os.getpid())
            finally:
                c.close()
                p.flush()


def main(kafka_config):
    """
    Program that consumes request messages from Kafka request topic and score to response topic
    """

    workers = []
    while True:
        num_alive = len([w for w in workers if w.is_alive()])
        if kafka_config['num_workers'] == num_alive:
            time.sleep(1)
            continue
        for _ in range(kafka_config['num_workers'] - num_alive):
            p = Process(target=_consume, daemon=True, args=(kafka_config,))
            p.start()
            workers.append(p)
            logging.info('Starting worker #%s', p.pid)


if __name__ == '__main__':

    if config.BatchJob.log_to_file:
        logging.basicConfig(
            level=config.BatchJob.log_level,
            filename=config.BatchJob.log_file, filemode='a',
            format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',
        )
    else:
        logging.basicConfig(
            level=config.BatchJob.log_level,
            format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',
        )

    main(kafka_config={
        # At most, this should be the total number of Kafka partitions on
        # the topic.
        'num_workers': config.Kafka.num_workers,
        'num_threads': config.Kafka.num_threads,
        'consume_topic': config.Kafka.consume_topic,
        'produce_topic': config.Kafka.produce_topic,
        'kafka_kwargs': {
            'bootstrap.servers': config.Kafka.consume_bootstrap_servers,
            'group.id': config.Kafka.group_id,
            'auto.offset.reset': 'earliest',
            # Commit manually to care for abrupt shutdown.
            'enable.auto.commit': False,
        },
    })

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment