Skip to content

Instantly share code, notes, and snippets.

@danneu
Last active November 6, 2023 04:09
Show Gist options
  • Star 58 You must be signed in to star a gist
  • Fork 9 You must be signed in to fork a gist
  • Save danneu/5941767 to your computer and use it in GitHub Desktop.
Save danneu/5941767 to your computer and use it in GitHub Desktop.
Google I/O 2012 - Go Concurrency Patterns ported to Clojure Video: http://www.youtube.com/watch?v=f6kdp27TYZs

My efforts to port http://www.youtube.com/watch?v=f6kdp27TYZs to Clojure.

A boring function (6:22)

func boring(msg string) {
  for i := 0; ; i++ {
    fmt.Println(msg, i)
    time.Sleep(time.Second)
  }
}
(defn boring [msg]
  (loop [i 0]
    (println msg i)
    (Thread/sleep 1000)
    (recur (inc i))))

Output:

(boring "boring!")
; boring! 0
; boring! 1
; boring! 2
; boring! 3
; ...

Slightly less boring (7:00)

Added random sleep interval of 1 to 1000ms.

func boring(msg string) {
	for i := 0; ; i++ {
		fmt.Println(msg, i)
		time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	}
}

func main() {
	boring("boring!")
}
(defn boring [msg]
  (loop [i 0]
    (println msg i)
    (Thread/sleep (rand-int 1000))
    (recur (inc i))))

(defn -main [& args]
  (boring "boring!"))

main runs forever.


Launch boring in goroutine (8:13)

func main() {
	go boring("boring!")
}
(defn -main [& args]
  (go (boring "boring!")))

main now returns immediately.


Ignoring it a little less (8:19)

Now, while the goroutine is executing concurrently, we sleep for two seconds before exiting.

func main() {
	go boring("boring!")
	fmt.Println("I'm listening.")
	time.Sleep(2 * time.Second)
	fmt.Println("You're boring; I'm leaving.")
}
(defn -main [& args]
  (go (boring "boring!"))
  (println "I'm listening.")
  (Thread/sleep 2000)
  (println "You're boring; I'm leaving."))

Output:

I'm listening.
boring! 0
boring! 1
boring! 2
boring! 3
boring! 4
You're boring; I'm leaving.

Using channels (12:01)

func boring(msg string, c chan string) {
	for i := 0; ; i++ {
		c <- fmt.Sprintf("%s %d", msg, i) // Expression to be sent can be any val.
		time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
	}
}

func main() {
	c := make(chan string)
	go boring("boring!", c)
	for i := 0; i < 5; i++ {
		fmt.Printf("You say: %q\n", <-c) // Receive expression is just a value.
	}
	fmt.Println("You're boring; I'm leaving")
}
(defn boring [msg c]
  (loop [i 0]
    (>!! c (str msg " " i))
    (recur (inc i))))

(defn -main [& args]
  (let [c (chan)]
    (go (boring "boring!" c))
    (dotimes [_ 5]
      (println (<!! c)))
    (println "You're boring; I'm leaving.")))

Generator: function that returns a channel (14:26)

func boring(msg string) <-chan string { // Returns receive-only channel of strs.
	c := make(chan string)
	go func() { // We launch the goroutine from inside the function.
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s %d", msg, i)
			time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c // Return channel to the caller
}

func main() {
	c:= boring("boring!") // Function returning a channel.
	for i := 0; i < 5; i++ {
		fmt.Printf("You say: %q\n", <-c)
	}
	fmt.Println("You're boring; I'm leaving.")
}
(defn boring [msg]
  (let [c (chan)]
    (go (loop [i 0]
          (>! c (str msg " " i))
          (Thread/sleep (rand-int 1000))
          (recur (inc i))))
    c))

(defn -main [& args]
  (let [c (boring "boring!")]
    (dotimes [_ 5]
      (println (<!! c)))
    (println "You're boring; I'm leaving.")))

Channels as a handle on a service (16:24)

func main() {
	joe := boring("Joe")
	ann := boring("Ann")
	for i := 0; i < 5; i++ {
		fmt.Println(<-joe)
		fmt.Println(<-ann)
	}
	fmt.Println("You're both boring; I'm leaving.")
}
(defn -main [& args]
  (let [joe (boring "Joe")
        ann (boring "Ann")]
    (dotimes [_ 5]
      (println (<!! joe))
      (println (<!! ann)))
    (println "You're both boring; I'm leaving.")))

Multiplexing (17:37)

func fanIn(input1, input2 <-chan string) <-chan string {
	c := make(chan string)
	go func() { for { c <- <-input1 } }()
	go func() { for { c <- <-input2 } }()
	return c
}

func main() {
	c := fanIn(boring("Joe"), boring("Ann"))
	for i:= 0; i < 10; i++ {
		fmt.Println(<-c)
	}
	fmt.Println("You're both boring; I'm leaving.")
}

I improved the Clojure version of fan-in by letting it accept any number of channels. It'll sequentially iterate over them forever.

(defn fan-in [& ports]  ; joe ->\___c
  (let [c (chan)]       ; ann ->/   
    (go (doseq [port (cycle ports)]
          (>! c (<! port))))             
    c))

(defn -main [& args]
  (let [c (fan-in (boring "Joe")
                  (boring "Ann"))]
    (dotimes [_ 10]
      (println (<!! c)))
    (println "You're both boring; I'm leaving.")))
@halgari
Copy link

halgari commented Sep 16, 2013

Using Thread/sleep inside of a go block is questionable. Using >!! and <!! inside a go block is simply wrong. Please switch to <! and >! and if you aren't simulating actual work, remove Thread/sleep. Failing to do so can seriously mess with the core.async thread pool.

Doing blocking operations inside a go is a serious anti-pattern.

@maxcountryman
Copy link

Doing blocking operations inside a go is a serious anti-pattern.

@halgari What do you mean by this? Surely the purpose of entering a go block is so that you can do something that would otherwise block the flow of execution in such a way that it doesn't actually seem to block. For instance, reading off a socket.

@halgari
Copy link

halgari commented Sep 16, 2013

Reading off of channels is fine, reading off an arbitrary JVM socket is not, as that will block the entire thread. The way this should be performed is thus:

  1. spin up a (thread) block for every JVM socket (or groups of sockets), and then create core.async channels for each socket.

  2. have the thread put data either via >!! or put! into the channel.

  3. service the channels via (go) blocks.

Use thread for IO bound tasks, go for CPU bound tasks. With this sort of model you'll find that most of your application will end up looking like a core of go blocks that are surrounded by "side-effecting" thread blocks. Keep IO operations at the edges of your application and do just enough to get the data into channels. Then do everything else with go blocks and <! and >!.

@pron
Copy link

pron commented Sep 16, 2013

You could also use Pulsar. It has the same API as core.async, but none of the limitations mentioned by @halgari (only you would need to call Fiber/sleep or Strand/sleep instead of Thread/sleep). Other than that, !< and !!< etc. are interchangeable in Pulsar (and either could appear in a function called by the go block; they don't need to appear in the same expression). You could then also do IO in go blocks.

@brandonbloom
Copy link

To simulate work inside a go, you can use (<! (timeout 1000)) instead of Thread/sleep.

@xyproto
Copy link

xyproto commented Sep 16, 2013

The formatting for the first snippet of Go code is borked. Please fix manually or use the "go fmt" tool for formatting Go source code in a standardized way.

@kul
Copy link

kul commented Sep 27, 2013

I think your multiplexing example defeats the purpose here..as if the second channel has data and its waiting on first one it would just block the second channel. How about using

(defn fan-in [& ports]
  (let [c (chan)]
    (go (while true (let [[v _] (alts! ports)]
          (>! c v))))
    c))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment