Skip to content

Instantly share code, notes, and snippets.

@tdantas
Created November 1, 2019 09:42
Show Gist options
  • Save tdantas/f092564ac6a902bbf01d1ab9f228791a to your computer and use it in GitHub Desktop.
Save tdantas/f092564ac6a902bbf01d1ab9f228791a to your computer and use it in GitHub Desktop.
code review queue
(ns accounting.queue
(:require [mount.core :refer [defstate]]
[accounting.config :refer [config]]
[cognitect.aws.client.api :as aws]
[cognitect.aws.credentials :as credentials]
[cheshire.core :as json] ))
(defprotocol QueueProtocol
(messages [queue])
(delete [queue receipt])
(run [queue])
(stop [queue]))
(defn build-client [{:keys [aws_region aws_key aws_secret_key]} (aws/client ...))
(defn find-queue [client name] ... )
(defn create-queue [client name] ...)
(defn init-queue [client queue-name]
(or (find-queue client queue-name)
(create-queue client queue-name)))
(defn delete* [{client :client queue-url :endpoint} receipt] (aws/invoke client ...))
(defn messages* [{client :client queue-url :endpoint}] (aws/invoke client ...))
;; workers - responsible to handle the message
(defmulti worker (fn [{type :type}] (keyword type)))
(defmethod worker :invoice-created [message]
(generate-invoice message))
(defmethod worker :default [envelope]
(log/info "envelope received"))
(defn- handle [queue messages]
(doseq [{receipt :ReceiptHandle body :Body } messages]
(try
(worker (json/parse-string body true))
(delete queue receipt)
(catch Exception e
(println (ex-message e))))))
(defn run* [queue]
(future
(loop []
(let [messages (messages queue)]
(doseq [message (:Messages messages)] (handle queue message)))
(recur))))
(defrecord Queue [client endpoint runner]
QueueProtocol
(messages [queue] (messages* queue))
(delete [queue receipt] (delete* queue receipt))
(run [queue] (assoc queue runner (run* queue)))
(stop [_] (when (and runner (not (future-done? runner))
(future-cancel runner)))))
(defn build-queues [{queues :aws_sqs_queues :as cfg}]
(let [client (build-client cfg)]
(mapv #(run (map->Queue {:client client :endpoint (init-queue client %)})) queues)))
(defstate queue
:start {:queues (build-queues config)}
:stop (doseq [q (:queues queue)] (stop q)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment