public
Last active

Google I/O 2012 - Go Concurrency Patterns ported to Clojure Video: http://www.youtube.com/watch?v=f6kdp27TYZs

  • Download Gist
golang-vs-clojure-async.md
Markdown

My efforts to port http://www.youtube.com/watch?v=f6kdp27TYZs to Clojure.

A boring function (6:22)

func boring(msg string) {
  for i := 0; ; i++ {
    fmt.Println(msg, i)
    time.Sleep(time.Second)
  }
}
(defn boring [msg]
  (loop [i 0]
    (println msg i)
    (Thread/sleep 1000)
    (recur (inc i))))

Output:

(boring "boring!")
; boring! 0
; boring! 1
; boring! 2
; boring! 3
; ...

Slightly less boring (7:00)

Added random sleep interval of 1 to 1000ms.

func boring(msg string) {
    for i := 0; ; i++ {
        fmt.Println(msg, i)
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

func main() {
    boring("boring!")
}
(defn boring [msg]
  (loop [i 0]
    (println msg i)
    (Thread/sleep (rand-int 1000))
    (recur (inc i))))

(defn -main [& args]
  (boring "boring!"))

main runs forever.


Launch boring in goroutine (8:13)

func main() {
    go boring("boring!")
}
(defn -main [& args]
  (go (boring "boring!")))

main now returns immediately.


Ignoring it a little less (8:19)

Now, while the goroutine is executing concurrently, we sleep for two seconds before exiting.

func main() {
    go boring("boring!")
    fmt.Println("I'm listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You're boring; I'm leaving.")
}
(defn -main [& args]
  (go (boring "boring!"))
  (println "I'm listening.")
  (Thread/sleep 2000)
  (println "You're boring; I'm leaving."))

Output:

I'm listening.
boring! 0
boring! 1
boring! 2
boring! 3
boring! 4
You're boring; I'm leaving.

Using channels (12:01)

func boring(msg string, c chan string) {
    for i := 0; ; i++ {
        c <- fmt.Sprintf("%s %d", msg, i) // Expression to be sent can be any val.
        time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
    }
}

func main() {
    c := make(chan string)
    go boring("boring!", c)
    for i := 0; i < 5; i++ {
        fmt.Printf("You say: %q\n", <-c) // Receive expression is just a value.
    }
    fmt.Println("You're boring; I'm leaving")
}
(defn boring [msg c]
  (loop [i 0]
    (>!! c (str msg " " i))
    (recur (inc i))))

(defn -main [& args]
  (let [c (chan)]
    (go (boring "boring!" c))
    (dotimes [_ 5]
      (println (<!! c)))
    (println "You're boring; I'm leaving.")))

Generator: function that returns a channel (14:26)

func boring(msg string) <-chan string { // Returns receive-only channel of strs.
    c := make(chan string)
    go func() { // We launch the goroutine from inside the function.
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c // Return channel to the caller
}

func main() {
    c:= boring("boring!") // Function returning a channel.
    for i := 0; i < 5; i++ {
        fmt.Printf("You say: %q\n", <-c)
    }
    fmt.Println("You're boring; I'm leaving.")
}
(defn boring [msg]
  (let [c (chan)]
    (go (loop [i 0]
          (>! c (str msg " " i))
          (Thread/sleep (rand-int 1000))
          (recur (inc i))))
    c))

(defn -main [& args]
  (let [c (boring "boring!")]
    (dotimes [_ 5]
      (println (<!! c)))
    (println "You're boring; I'm leaving.")))

Channels as a handle on a service (16:24)

func main() {
    joe := boring("Joe")
    ann := boring("Ann")
    for i := 0; i < 5; i++ {
        fmt.Println(<-joe)
        fmt.Println(<-ann)
    }
    fmt.Println("You're both boring; I'm leaving.")
}
(defn -main [& args]
  (let [joe (boring "Joe")
        ann (boring "Ann")]
    (dotimes [_ 5]
      (println (<!! joe))
      (println (<!! ann)))
    (println "You're both boring; I'm leaving.")))

Multiplexing (17:37)

func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() { for { c <- <-input1 } }()
    go func() { for { c <- <-input2 } }()
    return c
}

func main() {
    c := fanIn(boring("Joe"), boring("Ann"))
    for i:= 0; i < 10; i++ {
        fmt.Println(<-c)
    }
    fmt.Println("You're both boring; I'm leaving.")
}

I improved the Clojure version of fan-in by letting it accept any number of channels. It'll sequentially iterate over them forever.

(defn fan-in [& ports]  ; joe ->\___c
  (let [c (chan)]       ; ann ->/   
    (go (doseq [port (cycle ports)]
          (>! c (<! port))))             
    c))

(defn -main [& args]
  (let [c (fan-in (boring "Joe")
                  (boring "Ann"))]
    (dotimes [_ 10]
      (println (<!! c)))
    (println "You're both boring; I'm leaving.")))

Using Thread/sleep inside of a go block is questionable. Using >!! and <!! inside a go block is simply wrong. Please switch to <! and >! and if you aren't simulating actual work, remove Thread/sleep. Failing to do so can seriously mess with the core.async thread pool.

Doing blocking operations inside a go is a serious anti-pattern.

Doing blocking operations inside a go is a serious anti-pattern.

@halgari What do you mean by this? Surely the purpose of entering a go block is so that you can do something that would otherwise block the flow of execution in such a way that it doesn't actually seem to block. For instance, reading off a socket.

Reading off of channels is fine, reading off an arbitrary JVM socket is not, as that will block the entire thread. The way this should be performed is thus:

1) spin up a (thread) block for every JVM socket (or groups of sockets), and then create core.async channels for each socket.

2) have the thread put data either via >!! or put! into the channel.

3) service the channels via (go) blocks.

Use thread for IO bound tasks, go for CPU bound tasks. With this sort of model you'll find that most of your application will end up looking like a core of go blocks that are surrounded by "side-effecting" thread blocks. Keep IO operations at the edges of your application and do just enough to get the data into channels. Then do everything else with go blocks and <! and >!.

You could also use Pulsar. It has the same API as core.async, but none of the limitations mentioned by @halgari (only you would need to call Fiber/sleep or Strand/sleep instead of Thread/sleep). Other than that, !< and !!< etc. are interchangeable in Pulsar (and either could appear in a function called by the go block; they don't need to appear in the same expression). You could then also do IO in go blocks.

To simulate work inside a go, you can use (<! (timeout 1000)) instead of Thread/sleep.

The formatting for the first snippet of Go code is borked. Please fix manually or use the "go fmt" tool for formatting Go source code in a standardized way.

I think your multiplexing example defeats the purpose here..as if the second channel has data and its waiting on first one it would just block the second channel. How about using

(defn fan-in [& ports]
  (let [c (chan)]
    (go (while true (let [[v _] (alts! ports)]
          (>! c v))))
    c))

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.