Skip to content

Instantly share code, notes, and snippets.

@eslick
Created March 4, 2011 20:34
Show Gist options
  • Save eslick/855663 to your computer and use it in GitHub Desktop.
Save eslick/855663 to your computer and use it in GitHub Desktop.
An example custom HBase Sink for Flume written in Clojure
;;
;; Flume HBase User Sink
;;
(gen-class
:name compass.flume.UserSink
:extends "com.cloudera.flume.core.EventSink$Base"
:prefix "sink-"
:main false
:constructors {[String] []
[String String] []}
:state state
:init constructor
:methods [ #^{:static true} [getSinkBuilders [] java.util.ArrayList]
#^{:static true} [builder [] com.cloudera.flume.core.EventSink] ])
(defn sink-constructor
"Return a clojure structure containing the provided name"
([#^String table-name]
[[] {:name table-name
:threshold *flush-size*}])
([#^String table-name #^String threshold]
[[] {:name table-name
:threshold (Integer/parseInt threshold)}]))
(defn sink-table [this]
(:name (.state this)))
(defn sink-threshold [this]
(:threshold (.state this)))
(defn sink-open
"Force connect on open"
[#^UserSink sink]
(client/get (sink-table sink) 0))
(defn sink-append
[#^UserSink sink event]
;; If valid parse, push message onto queue
(when-let [rec (event->message event)]
(queue-push rec))
;; When queue is full, write messages to HBase
(when (queue-full? (sink-threshold sink))
(client/put-multi (sink-table sink)
(map as-row+fmap (queue-records)))
(queue-reset)))
(import '[compass.flume UserSink])
(defn sink-close
[#^UserSink sink]
(when (queue-has-data?)
(client/put-multi (sink-table sink)
(map as-row+fmap (queue-records)))
(queue-reset)))
(defn sink-builder
[]
(proxy [SinkFactory$SinkBuilder] []
(build [#^Context ctx args]
(UserSink. (first args)))))
(defn sink-getSinkBuilders
[]
(doto (new ArrayList)
(.add (new Pair "UserSink" (sink-builder)))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment