Last active
December 18, 2015 01:49
-
-
Save nfisher/5706682 to your computer and use it in GitHub Desktop.
pipejine shutdown
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns pipedream.core | |
(:gen-class) | |
(:require [clojure.tools.logging :as log] | |
[pipejine.core :as pipe])) | |
(defn pipeline [] | |
(let [q1 (pipe/new-queue {:name "q1" | |
:queue-size 5 | |
:number-of-consumer-threads 5 | |
:number-of-producers 1}) | |
q2 (pipe/new-queue {:name "q2" | |
:queue-size 2 | |
:number-of-consumer-threads 1 | |
:number-of-producers 1 | |
:partition 2}) ;; partition queues should only have one thread! | |
q3 (pipe/new-queue {:name "q3" | |
:queue-size 3 | |
:number-of-consumer-threads 3 | |
:number-of-producers 1}) | |
q4 (pipe/new-queue {:name "q4" | |
:queue-size 10 | |
:number-of-consumer-threads 1 | |
:number-of-producers 2 | |
:partition :all | |
:debug true}) | |
logger (pipe/spawn-logger q1 q2 q3 q4)] | |
(pipe/spawn-consumers q1 #(do | |
(pipe/produce q2 (inc %)) ;; q1 workers puts stuff on q2 | |
(pipe/produce q3 (dec %)) ;; .. and q3 | |
)) | |
(pipe/spawn-consumers q2 #(do | |
(log/info "q2 got: " %) | |
(doseq [d %] | |
(pipe/produce q4 (* d d))))) | |
(pipe/spawn-consumers q3 #(do | |
(Thread/sleep 10) | |
(pipe/produce q4 (/ % 2)))) | |
(pipe/producer-of q1 q2 q3) | |
(pipe/producer-of q2 q4) | |
(pipe/producer-of q3 q4) | |
(pipe/spawn-supervisor q4 #(log/info "pipeline exhausted!")) | |
;; example of read-seq, could just as well be another consumer (as above) | |
(future (log/info "***" (first (pipe/read-seq q4)))) | |
(dotimes [i 20] ;; Seed q1 with data | |
(Thread/sleep 10) | |
(log/info "prod: " i) | |
(pipe/produce q1 i)) | |
(pipe/produce-done q1) ;; Mark that we're done putting data in q1 | |
;; wait for q4 to finish...not sure if this is the best way | |
(.await (:producers-done q4)) | |
;; (logger) | |
)) | |
(defn -main | |
"I don't do a whole lot ... yet." | |
[& args] | |
;; expected that pipeline stops after all queues are shutdown | |
(pipeline) | |
(shutdown-agents) | |
(println "Hello, World!")) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(defproject pipedream "0.1.0-SNAPSHOT" | |
:description "FIXME: write description" | |
:url "http://example.com/FIXME" | |
:license {:name "Eclipse Public License" | |
:url "http://www.eclipse.org/legal/epl-v10.html"} | |
:dependencies [[org.clojure/clojure "1.5.1"] | |
[pipejine "0.1.1"]] | |
:main pipedream.core) |
The updated works as you suggested. Thanks! :)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Also, add (shutdown-agents) as the final thing in -main