Skip to content

Instantly share code, notes, and snippets.

@igrishaev
Created August 3, 2018 14:40
Show Gist options
  • Save igrishaev/3ee0d3d014c8d8f93afc1c37034719c9 to your computer and use it in GitHub Desktop.
Save igrishaev/3ee0d3d014c8d8f93afc1c37034719c9 to your computer and use it in GitHub Desktop.
(ns project.sqs
(:require [cheshire.core :as json])
(:import [com.amazonaws.services.sqs
AmazonSQS
AmazonSQSClientBuilder]
[com.amazonaws.auth
BasicAWSCredentials
AWSStaticCredentialsProvider]
[com.amazonaws.services.sqs.model
CreateQueueRequest
DeleteMessageBatchResultEntry
DeleteMessageBatchResult
DeleteMessageResult
SendMessageBatchResultEntry
BatchResultErrorEntry
SendMessageBatchResult
SendMessageResult
SendMessageBatchRequestEntry
SendMessageRequest
SendMessageBatchRequest
ReceiveMessageResult
DeleteMessageRequest
DeleteMessageBatchRequest
DeleteMessageBatchRequestEntry
ReceiveMessageRequest
Message])
(:refer-clojure :exclude [send]))
(def enumerate (partial map-indexed vector))
;;
;; Client
;;
(defn amazon-sqs
[aws-key aws-secret region]
(let [creds (new BasicAWSCredentials aws-key aws-secret)]
(-> (AmazonSQSClientBuilder/standard)
(.withRegion region)
(.withCredentials (new AWSStaticCredentialsProvider creds))
(.build))))
;;
;; Queue
;;
;;
;; Send
;;
(defn message-request
[queue-url {:keys [group-id deduplication-id body]}]
(-> (new SendMessageRequest)
(.withQueueUrl queue-url)
(.withMessageBody body)
(.withMessageGroupId group-id)
(.withMessageDeduplicationId deduplication-id)))
(defn message-batch-request
[queue-url messages]
(-> (new SendMessageBatchRequest)
(.withQueueUrl queue-url)
(.withEntries
(for [[index {:keys [group-id deduplication-id body]}] (enumerate messages)]
(-> (new SendMessageBatchRequestEntry)
(.withId (str index))
(.withMessageBody body)
(.withMessageGroupId group-id)
(.withMessageDeduplicationId deduplication-id))))))
(defn send-message
[client request]
(.sendMessage client request))
(defn send-message-batch
[client request]
(.sendMessageBatch client request))
;;
;; Receive
;;
(defn receive-request
[queue-url {:keys [visibility-timeout
wait-timeout
max-number-of-messages]}]
(-> (new ReceiveMessageRequest)
(.withQueueUrl queue-url)
(.withMaxNumberOfMessages (int max-number-of-messages))
(.withVisibilityTimeout (int visibility-timeout))
(.withWaitTimeSeconds (int wait-timeout))))
(defn receive-message
[client request]
(.receiveMessage client request))
;;
;; Delete
;;
(defn delete-request
[queue-url handle]
(new DeleteMessageRequest queue-url handle))
(defn delete-batch-request
[queue-url handles]
(-> (new DeleteMessageBatchRequest)
(.withQueueUrl queue-url)
(.withEntries
(for [[index handle] (enumerate handles)]
(-> (new DeleteMessageBatchRequestEntry)
(.withId (str index))
(.withReceiptHandle handle))))))
(defn delete-message
[client request]
(.deleteMessage client request))
(defn delete-message-batch
[client request]
(.deleteMessageBatch client request))
;;
;; to Clojure
;;
(defprotocol ToClojure
(->clj [obj]))
(extend-type Message
ToClojure
(->clj [obj]
{:body (.getBody obj)
:body-md5 (.getMD5OfBody obj)
:id (.getMessageId obj)
:handle (.getReceiptHandle obj)}))
(extend-type ReceiveMessageResult
ToClojure
(->clj [obj]
{:messages (mapv ->clj (.getMessages obj))}))
(extend-type SendMessageResult
ToClojure
(->clj [obj]
{:id (.getMessageId obj)
:seq-number (.getSequenceNumber obj)}))
(extend-type SendMessageBatchResult
ToClojure
(->clj [obj]
{:failed (map ->clj (.getFailed obj))
:successful (map ->clj (.getSuccessful obj))}))
(extend-type BatchResultErrorEntry
ToClojure
(->clj [obj]
{:id (.getId obj)
:message (.getMessage obj)
:sender-fault (.getSenderFault obj)}))
(extend-type SendMessageBatchResultEntry
ToClojure
(->clj [obj]
{:id (.getId obj)
:message-id (.getMessageId obj)
:seq-number (.getSequenceNumber obj)}))
(extend-type DeleteMessageResult
ToClojure
(->clj [obj]))
(extend-type DeleteMessageBatchResult
ToClojure
(->clj [obj]
{:failed (map ->clj (.getFailed obj))
:successful (map ->clj (.getSuccessful obj))}))
(extend-type DeleteMessageBatchResultEntry
ToClojure
(->clj [obj]
{:id (.getId obj)}))
;;
;; Shortcuts
;;
(defn uuid []
(str (java.util.UUID/randomUUID)))
(defn ->message
[group-id data]
{:group-id group-id
:deduplication-id (-> data hash str)
:body (json/generate-string data)})
(defn msg-encode
[msg]
(update msg :body json/generate-string))
(defn msg-decode
[msg]
(update msg :body json/parse-string true))
(def map-msg-decode
(partial mapv msg-decode))
(defn send
[client queue-url message]
(let [req (message-request queue-url message)]
(->clj (send-message client req))))
(defn send-many
[client queue-url messages]
(let [req (message-batch-request queue-url messages)]
(->clj (send-message-batch client req))))
(def receive-default
{:visibility-timeout 30
:max-number-of-messages 10
:wait-timeout 5})
(defn receive
[client queue-url & [opt]]
(let [opt (merge receive-default opt)
req (receive-request queue-url opt)
res (receive-message client req)
res (->clj res)
{:keys [messages]} res]
(update res :messages map-msg-decode)))
(defn delete
[client queue-url handle]
(let [req (delete-request queue-url handle)]
(->clj (delete-message client req))))
(defn delete-many
[client queue-url handles]
(let [req (delete-batch-request queue-url handles)]
(->clj (delete-message-batch client req))))
(defn queue-url
[client queue-name]
(-> client (.getQueueUrl queue-name) .getQueueUrl))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment