-
-
Save shayanjm/1ab27912f94219728fbc3cb8b4eb704b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns proj.consumer | |
(:require [clojure.data.json :as json] | |
[clojure.core.async :as >] | |
[proj.kafka :as kafka] | |
[proj.db.core :as db] | |
[clj-time.core :as t] | |
[clj-time.coerce :as c]) | |
(:use clj-kafka.consumer.zk | |
proj.config | |
proj.utils) | |
(:import [java.io ByteArrayInputStream ByteArrayOutputStream] | |
[java.util.concurrent Executors] | |
[kafka.consumer ConsumerIterator Consumer KafkaStream])) | |
(defonce ^Consumer c | |
(atom nil)) | |
(defn init! [config] | |
(reset! c (kafka/make-consumer (assoc config :group "CONSUMERGROUP")))) | |
(defn init-pool [num] | |
(Executors/newFixedThreadPool num)) | |
(defn consume-a-stream [stream] | |
(let [it (.iterator stream) | |
get-next-message #(.message (.next it))] | |
(loop [msg (get-next-message)] | |
(try | |
(let [match-end-ts (str (t/now)) | |
match (json/read-str (String. msg) :key-fn keyword)] | |
(db/insert-match<! {:meta (:meta match) | |
:content (:content match) | |
:uuid (:uuid match) | |
:start_ts (c/to-sql-time (:start_ts match)) | |
:end_ts (c/to-sql-time match-end-ts) | |
:filter_ids (:matched_regex_ids match)})) | |
(catch Exception e | |
(println (str "Caught exception while processing matches: " (.getMessage e))))) | |
(recur (get-next-message))))) | |
(defn start [topic threads] | |
(let [thread-pool (init-pool threads) | |
topic->streams (create-message-streams @c {topic threads}) | |
jobs (doall (for [[topic streams] topic->streams | |
stream streams] | |
(.execute thread-pool #(consume-a-stream stream))))] | |
thread-pool)) | |
(defn stop [thread-pool] | |
(shutdown @c) | |
(.shutdownNow thread-pool)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment