Skip to content

Instantly share code, notes, and snippets.

@shayanjm
Last active February 6, 2017 20:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shayanjm/1ab27912f94219728fbc3cb8b4eb704b to your computer and use it in GitHub Desktop.
Save shayanjm/1ab27912f94219728fbc3cb8b4eb704b to your computer and use it in GitHub Desktop.
(ns proj.consumer
(:require [clojure.data.json :as json]
[clojure.core.async :as >]
[proj.kafka :as kafka]
[proj.db.core :as db]
[clj-time.core :as t]
[clj-time.coerce :as c])
(:use clj-kafka.consumer.zk
proj.config
proj.utils)
(:import [java.io ByteArrayInputStream ByteArrayOutputStream]
[java.util.concurrent Executors]
[kafka.consumer ConsumerIterator Consumer KafkaStream]))
(defonce ^Consumer c
(atom nil))
(defn init! [config]
(reset! c (kafka/make-consumer (assoc config :group "CONSUMERGROUP"))))
(defn init-pool [num]
(Executors/newFixedThreadPool num))
(defn consume-a-stream [stream]
(let [it (.iterator stream)
get-next-message #(.message (.next it))]
(loop [msg (get-next-message)]
(try
(let [match-end-ts (str (t/now))
match (json/read-str (String. msg) :key-fn keyword)]
(db/insert-match<! {:meta (:meta match)
:content (:content match)
:uuid (:uuid match)
:start_ts (c/to-sql-time (:start_ts match))
:end_ts (c/to-sql-time match-end-ts)
:filter_ids (:matched_regex_ids match)}))
(catch Exception e
(println (str "Caught exception while processing matches: " (.getMessage e)))))
(recur (get-next-message)))))
(defn start [topic threads]
(let [thread-pool (init-pool threads)
topic->streams (create-message-streams @c {topic threads})
jobs (doall (for [[topic streams] topic->streams
stream streams]
(.execute thread-pool #(consume-a-stream stream))))]
thread-pool))
(defn stop [thread-pool]
(shutdown @c)
(.shutdownNow thread-pool))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment