Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
(defn cascalog-map
[op-var output-fields & {:keys [stateful?]}]
(let [ser (ops/fn-spec op-var)]
(proxy [BaseOperation Function] [^Fields output-fields]
(prepare [^FlowProcess flow-process ^OperationCall op-call]
(let [op (serfn/deserialize-val ser)]
(-> op-call
(.setContext [op (if stateful? (op))]))))
(operate [^FlowProcess flow-process ^FunctionCall fn-call]
(let [[op] (.getContext fn-call)
collector (-> fn-call .getOutputCollector)
^Tuple tuple (-> fn-call .getArguments .getTuple)]
(->> (Util/coerceFromTuple tuple)
(apply op)
(Util/coerceToTuple)
(.add collector))))
(cleanup [flow-process ^OperationCall op-call]
(if stateful?
(let [[op state] (.getContext op-call)]
(op state)))))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment