Skip to content

Instantly share code, notes, and snippets.

@nfisher
Last active December 18, 2015 01:49
Show Gist options
  • Save nfisher/5706682 to your computer and use it in GitHub Desktop.
Save nfisher/5706682 to your computer and use it in GitHub Desktop.
pipejine shutdown
(ns pipedream.core
(:gen-class)
(:require [clojure.tools.logging :as log]
[pipejine.core :as pipe]))
(defn pipeline []
(let [q1 (pipe/new-queue {:name "q1"
:queue-size 5
:number-of-consumer-threads 5
:number-of-producers 1})
q2 (pipe/new-queue {:name "q2"
:queue-size 2
:number-of-consumer-threads 1
:number-of-producers 1
:partition 2}) ;; partition queues should only have one thread!
q3 (pipe/new-queue {:name "q3"
:queue-size 3
:number-of-consumer-threads 3
:number-of-producers 1})
q4 (pipe/new-queue {:name "q4"
:queue-size 10
:number-of-consumer-threads 1
:number-of-producers 2
:partition :all
:debug true})
logger (pipe/spawn-logger q1 q2 q3 q4)]
(pipe/spawn-consumers q1 #(do
(pipe/produce q2 (inc %)) ;; q1 workers puts stuff on q2
(pipe/produce q3 (dec %)) ;; .. and q3
))
(pipe/spawn-consumers q2 #(do
(log/info "q2 got: " %)
(doseq [d %]
(pipe/produce q4 (* d d)))))
(pipe/spawn-consumers q3 #(do
(Thread/sleep 10)
(pipe/produce q4 (/ % 2))))
(pipe/producer-of q1 q2 q3)
(pipe/producer-of q2 q4)
(pipe/producer-of q3 q4)
(pipe/spawn-supervisor q4 #(log/info "pipeline exhausted!"))
;; example of read-seq, could just as well be another consumer (as above)
(future (log/info "***" (first (pipe/read-seq q4))))
(dotimes [i 20] ;; Seed q1 with data
(Thread/sleep 10)
(log/info "prod: " i)
(pipe/produce q1 i))
(pipe/produce-done q1) ;; Mark that we're done putting data in q1
;; wait for q4 to finish...not sure if this is the best way
(.await (:producers-done q4))
;; (logger)
))
(defn -main
"I don't do a whole lot ... yet."
[& args]
;; expected that pipeline stops after all queues are shutdown
(pipeline)
(shutdown-agents)
(println "Hello, World!"))
(defproject pipedream "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "Eclipse Public License"
:url "http://www.eclipse.org/legal/epl-v10.html"}
:dependencies [[org.clojure/clojure "1.5.1"]
[pipejine "0.1.1"]]
:main pipedream.core)
@martintrojer
Copy link

There's no requirement to shutdown queues manually, all workers stop when all it's producers are done. The supervisor also finishes when the queue shuts down.

In this case you shutdown the queues before the calculation finishes, since q4 is batching you'll not see the final result. Try to add a (Thread/sleep 1000) before the shutdowns to notice the calculation finish.

@martintrojer
Copy link

Also, add (shutdown-agents) as the final thing in -main

@nfisher
Copy link
Author

nfisher commented Jun 4, 2013

The updated works as you suggested. Thanks! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment