Skip to content

Instantly share code, notes, and snippets.

@halgari
Created September 24, 2013 18:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save halgari/6688934 to your computer and use it in GitHub Desktop.
Save halgari/6688934 to your computer and use it in GitHub Desktop.
(ns dataflow-prototype.core
(:require [clojure.core.async :refer :all]
[clojure.edn :refer [read-string]]))
(defprotocol Lifecycle
(start [service])
(stop [service]))
(defprotocol DataFlowNode
(process-msg [service port value]))
(defn map-service [f]
(reify
Lifecycle
(start [service]
service)
(stop [service]
service)
DataFlowNode
(process-msg [service port value]
(when (identical? port :input)
{:output [(f value)]}))))
(defn function-sink [f]
(reify
Lifecycle
(start [service]
service)
(stop [service]
service)
DataFlowNode
(process-msg [service port value]
(when (identical? port :input)
(f value))
nil)))
(def dataflow
{:start [:parser :input]
[:parser :output] :parsed
:parsed [:squarer :input]
[:squarer :output] :printer
:printer [:printer :input]})
(defn make-queues [n]
{:start (chan n)
:parsed (chan n)
:printer (chan n)})
(defn make-services []
{:parser (map-service read-string)
:squarer (map-service #(* % %))
:printer (function-sink (partial println "Printer: "))})
(defn process-output [queues dataflow node-name output]
(doseq [[port msgs] output
msg msgs]
(let [queue (dataflow [node-name port])]
(assert queue [node-name port])
(>!! (queues queue) msg))))
(defn process-input [services dataflow queue-name value]
(let [[node-name port] (dataflow queue-name)
processor (services node-name)]
(assert processor queue-name)
[node-name (process-msg processor port value)]))
(defn startup [queues services dataflow]
(doseq [[queue-name queue] queues]
(thread
(try
(loop []
(println "waiting" queue-name)
(let [value (<!! queue)
_ (println queue-name value)
[node-name output] (process-input services dataflow queue-name value)]
(println node-name output)
(process-output queues dataflow node-name output))
(recur))
(catch Throwable ex
(println ex))))))
(defn -main []
(let [queues (make-queues 10)
services (make-services)]
(startup queues services dataflow)
(>!! (:start queues) "42")
(<!! (timeout 10000))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment