Created
July 13, 2016 15:35
-
-
Save oskarth/cd4b49ed0c798f5a0720a058080e2188 to your computer and use it in GitHub Desktop.
Flush fun lightning talk
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns flushfun.core | |
(:require [clojure.core.async :refer [chan go go-loop close! <!! timeout]] | |
[amazonica.aws.sqs :as sqs])) | |
(def queue (sqs/find-queue {:endpoint "eu-west-1"} "flushfun")) | |
(defn commit-fn [message] (fn [] (sqs/delete-message (assoc message :queue-url queue)))) | |
(defn flush-messages [{:keys [messages commits]}] | |
(println "FLUSHING" (map :body messages)) | |
(doseq [commit commits] | |
(commit))) | |
(defn cache-updater [cache] | |
(fn [{:keys [commits messages]} {:keys [commit] :as incoming-message}] | |
(when (empty? messages) | |
(go (<!! (timeout 5000)) | |
(let [to-flush (dosync (let [curr @cache] | |
(ref-set cache {:commits [] :messages []}) | |
curr))] | |
(flush-messages to-flush)))) | |
{:commits (conj commits commit) | |
:messages (conj messages incoming-message)})) | |
(def update-cache | |
(let [cache (ref {:commits [] :messages []}) | |
update-fn (cache-updater cache)] | |
(fn [message] (dosync (alter cache update-fn message))))) | |
(defn recv [] | |
(go-loop [] | |
(let [{:keys [messages]} (sqs/receive-message :queue-url queue)] | |
(doseq [message messages] | |
(update-cache (assoc message :commit (commit-fn message)))) | |
(recur)))) | |
(defn spam [] | |
(doseq [m (range 100)] | |
(timeout 10) (println "SEND" m) (sqs/send-message queue m))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns flushfun.core | |
(:require [clojure.core.async :refer [chan go go-loop close! <!! timeout]] | |
[amazonica.aws.sqs :as sqs])) | |
nil | |
flushfun.core> (recv) | |
#object[clojure.core.async.impl.channels.ManyToManyChannel 0xfd1ed5c "clojure.core.async.impl.channels.ManyToManyChannel@fd1ed5c"] | |
flushfun.core> (spam) | |
SEND 0 | |
SEND 1 | |
SEND 2 | |
SEND 3 | |
SEND 4 | |
SEND 5 | |
SEND 6 | |
SEND 7 | |
SEND 8 | |
SEND 9 | |
SEND 10 | |
SEND 11 | |
SEND 12 | |
SEND 13 | |
SEND 14 | |
SEND 15 | |
SEND 16 | |
SEND 17 | |
SEND 18 | |
SEND 19 | |
SEND 20 | |
SEND 21 | |
SEND 22 | |
SEND 23 | |
SEND 24 | |
SEND 25 | |
SEND 26 | |
SEND 27 | |
SEND 28 | |
SEND 29 | |
SEND 30 | |
SEND 31 | |
SEND 32 | |
SEND 33 | |
SEND 34 | |
FLUSHING (0 1 13 14 2 23 28 3 4 5 6 7 8) | |
SEND 35 | |
SEND 36 | |
SEND 37 | |
SEND 38 | |
SEND 39 | |
SEND 40 | |
SEND 41 | |
SEND 42 | |
SEND 43 | |
SEND 44 | |
SEND 45 | |
SEND 46 | |
SEND 47 | |
FLUSHING (10 11 15 16 17 19 20 21 22 24 25 34 36 42 9) | |
SEND 48 | |
SEND 49 | |
SEND 50 | |
SEND 51 | |
SEND 52 | |
SEND 53 | |
SEND 54 | |
SEND 55 | |
SEND 56 | |
SEND 57 | |
SEND 58 | |
SEND 59 | |
SEND 60 | |
SEND 61 | |
SEND 62 | |
SEND 63 | |
SEND 64 | |
SEND 65 | |
SEND 66 | |
SEND 67 | |
SEND 68 | |
SEND 69 | |
SEND 70 | |
SEND 71 | |
FLUSHING (12 18 26 27 29 30 32 33 35 38 39 40 41 43 44 45 46 47 48 49 52 54 55 58 63 64) | |
SEND 72 | |
SEND 73 | |
SEND 74 | |
SEND 75 | |
SEND 76 | |
SEND 77 | |
SEND 78 | |
SEND 79 | |
SEND 80 | |
SEND 81 | |
SEND 82 | |
SEND 83 | |
SEND 84 | |
FLUSHING (50 51 53 56 57 59 60 61 62 65 69 70 78) | |
SEND 85 | |
SEND 86 | |
SEND 87 | |
SEND 88 | |
SEND 89 | |
SEND 90 | |
SEND 91 | |
SEND 92 | |
SEND 93 | |
SEND 94 | |
SEND 95 | |
SEND 96 | |
SEND 97 | |
SEND 98 | |
SEND 99 | |
nil | |
flushfun.core> FLUSHING (31 37 66 67 68 71 74 75 76 77 81 82 83 85 88 92 94) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment