-
-
Save tdantas/f092564ac6a902bbf01d1ab9f228791a to your computer and use it in GitHub Desktop.
code review queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns accounting.queue | |
(:require [mount.core :refer [defstate]] | |
[accounting.config :refer [config]] | |
[cognitect.aws.client.api :as aws] | |
[cognitect.aws.credentials :as credentials] | |
[cheshire.core :as json] )) | |
(defprotocol QueueProtocol | |
(messages [queue]) | |
(delete [queue receipt]) | |
(run [queue]) | |
(stop [queue])) | |
(defn build-client [{:keys [aws_region aws_key aws_secret_key]} (aws/client ...)) | |
(defn find-queue [client name] ... ) | |
(defn create-queue [client name] ...) | |
(defn init-queue [client queue-name] | |
(or (find-queue client queue-name) | |
(create-queue client queue-name))) | |
(defn delete* [{client :client queue-url :endpoint} receipt] (aws/invoke client ...)) | |
(defn messages* [{client :client queue-url :endpoint}] (aws/invoke client ...)) | |
;; workers - responsible to handle the message | |
(defmulti worker (fn [{type :type}] (keyword type))) | |
(defmethod worker :invoice-created [message] | |
(generate-invoice message)) | |
(defmethod worker :default [envelope] | |
(log/info "envelope received")) | |
(defn- handle [queue messages] | |
(doseq [{receipt :ReceiptHandle body :Body } messages] | |
(try | |
(worker (json/parse-string body true)) | |
(delete queue receipt) | |
(catch Exception e | |
(println (ex-message e)))))) | |
(defn run* [queue] | |
(future | |
(loop [] | |
(let [messages (messages queue)] | |
(doseq [message (:Messages messages)] (handle queue message))) | |
(recur)))) | |
(defrecord Queue [client endpoint runner] | |
QueueProtocol | |
(messages [queue] (messages* queue)) | |
(delete [queue receipt] (delete* queue receipt)) | |
(run [queue] (assoc queue runner (run* queue))) | |
(stop [_] (when (and runner (not (future-done? runner)) | |
(future-cancel runner))))) | |
(defn build-queues [{queues :aws_sqs_queues :as cfg}] | |
(let [client (build-client cfg)] | |
(mapv #(run (map->Queue {:client client :endpoint (init-queue client %)})) queues))) | |
(defstate queue | |
:start {:queues (build-queues config)} | |
:stop (doseq [q (:queues queue)] (stop q))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment