Skip to content

Instantly share code, notes, and snippets.

@oskarth
Created July 13, 2016 15:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oskarth/cd4b49ed0c798f5a0720a058080e2188 to your computer and use it in GitHub Desktop.
Save oskarth/cd4b49ed0c798f5a0720a058080e2188 to your computer and use it in GitHub Desktop.
Flush fun lightning talk
(ns flushfun.core
(:require [clojure.core.async :refer [chan go go-loop close! <!! timeout]]
[amazonica.aws.sqs :as sqs]))
(def queue (sqs/find-queue {:endpoint "eu-west-1"} "flushfun"))
(defn commit-fn [message] (fn [] (sqs/delete-message (assoc message :queue-url queue))))
(defn flush-messages [{:keys [messages commits]}]
(println "FLUSHING" (map :body messages))
(doseq [commit commits]
(commit)))
(defn cache-updater [cache]
(fn [{:keys [commits messages]} {:keys [commit] :as incoming-message}]
(when (empty? messages)
(go (<!! (timeout 5000))
(let [to-flush (dosync (let [curr @cache]
(ref-set cache {:commits [] :messages []})
curr))]
(flush-messages to-flush))))
{:commits (conj commits commit)
:messages (conj messages incoming-message)}))
(def update-cache
(let [cache (ref {:commits [] :messages []})
update-fn (cache-updater cache)]
(fn [message] (dosync (alter cache update-fn message)))))
(defn recv []
(go-loop []
(let [{:keys [messages]} (sqs/receive-message :queue-url queue)]
(doseq [message messages]
(update-cache (assoc message :commit (commit-fn message))))
(recur))))
(defn spam []
(doseq [m (range 100)]
(timeout 10) (println "SEND" m) (sqs/send-message queue m)))
(ns flushfun.core
(:require [clojure.core.async :refer [chan go go-loop close! <!! timeout]]
[amazonica.aws.sqs :as sqs]))
nil
flushfun.core> (recv)
#object[clojure.core.async.impl.channels.ManyToManyChannel 0xfd1ed5c "clojure.core.async.impl.channels.ManyToManyChannel@fd1ed5c"]
flushfun.core> (spam)
SEND 0
SEND 1
SEND 2
SEND 3
SEND 4
SEND 5
SEND 6
SEND 7
SEND 8
SEND 9
SEND 10
SEND 11
SEND 12
SEND 13
SEND 14
SEND 15
SEND 16
SEND 17
SEND 18
SEND 19
SEND 20
SEND 21
SEND 22
SEND 23
SEND 24
SEND 25
SEND 26
SEND 27
SEND 28
SEND 29
SEND 30
SEND 31
SEND 32
SEND 33
SEND 34
FLUSHING (0 1 13 14 2 23 28 3 4 5 6 7 8)
SEND 35
SEND 36
SEND 37
SEND 38
SEND 39
SEND 40
SEND 41
SEND 42
SEND 43
SEND 44
SEND 45
SEND 46
SEND 47
FLUSHING (10 11 15 16 17 19 20 21 22 24 25 34 36 42 9)
SEND 48
SEND 49
SEND 50
SEND 51
SEND 52
SEND 53
SEND 54
SEND 55
SEND 56
SEND 57
SEND 58
SEND 59
SEND 60
SEND 61
SEND 62
SEND 63
SEND 64
SEND 65
SEND 66
SEND 67
SEND 68
SEND 69
SEND 70
SEND 71
FLUSHING (12 18 26 27 29 30 32 33 35 38 39 40 41 43 44 45 46 47 48 49 52 54 55 58 63 64)
SEND 72
SEND 73
SEND 74
SEND 75
SEND 76
SEND 77
SEND 78
SEND 79
SEND 80
SEND 81
SEND 82
SEND 83
SEND 84
FLUSHING (50 51 53 56 57 59 60 61 62 65 69 70 78)
SEND 85
SEND 86
SEND 87
SEND 88
SEND 89
SEND 90
SEND 91
SEND 92
SEND 93
SEND 94
SEND 95
SEND 96
SEND 97
SEND 98
SEND 99
nil
flushfun.core> FLUSHING (31 37 66 67 68 71 74 75 76 77 81 82 83 85 88 92 94)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment