Skip to content

Instantly share code, notes, and snippets.

@samedhi
Created May 6, 2018 18:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samedhi/7765488682548c4cd4f220b2b320b0d2 to your computer and use it in GitHub Desktop.
Save samedhi/7765488682548c4cd4f220b2b320b0d2 to your computer and use it in GitHub Desktop.
Getting a `java.lang.IllegalArgumentException: No matching method: addCallback, compiling:(/Users/stephen/inferno/src/inferno/server.clj:111:6)`
(ns inferno.server
(:require
[clojure.core.async :as async])
(:import
[com.google.api.core ApiFuture]
[com.google.api.core ApiFutureCallback]
[com.google.api.core ApiFutures]
[com.google.api.gax.rpc ApiException]
[com.google.cloud ServiceOptions]
[com.google.cloud.pubsub.v1 SubscriptionAdminClient TopicAdminClient]
[com.google.cloud.pubsub.v1 Publisher]
[com.google.protobuf ByteString]
[com.google.pubsub.v1 ProjectName]
[com.google.pubsub.v1 ProjectSubscriptionName]
[com.google.pubsub.v1 ProjectTopicName]
[com.google.pubsub.v1 PubsubMessage]
[com.google.pubsub.v1 PushConfig]
[com.google.pubsub.v1 Subscription]))
(defn default-project-id []
(. ServiceOptions (getDefaultProjectId)))
(defn project-name [project-id]
(. ProjectName (of project-id)))
(defn topic-admin-client []
(. TopicAdminClient (create)))
(defn project-topic-name [project-id topic-name]
(. ProjectTopicName (of project-id topic-name)))
(defmacro try-catch-api [& body]
`(try
~@body
(catch ApiException e#
{:error true
:status-code (.. e# getStatusCode getCode)
:retryable? (.isRetryable e#)})))
;; TOPICS
(defn create-topic! [project-id topic-name]
(try-catch-api
(.createTopic (topic-admin-client) (project-topic-name project-id topic-name))))
(defn delete-topic! [project-id topic-name]
(try-catch-api
(.deleteTopic (topic-admin-client) (project-topic-name project-id topic-name))))
(defn list-topics [project-id]
(->> (.listTopics (topic-admin) (project-name project-id))
.iterateAll
(map #(.getName %))))
;; SUBSCRIPTIONS
(defn project-subscription-name [project-id subscription-name]
(. ProjectSubscriptionName (of project-id subscription-name)))
(defn subscription-admin-client []
(. SubscriptionAdminClient (create)))
(defn create-subscription!
([project-id topic-name subscription-name]
(let [push-config (. PushConfig (getDefaultInstance))]
(create-subscription project-id topic-name subscription-name push-config)))
([project-id topic-name subscription-name push-config]
(create-subscription project-id topic-name subscription-name push-config 10))
([project-id topic-name subscription-name push-config ack-deadline]
(try-catch-api
(.createSubscription
(subscription-admin-client)
(project-subscription-name project-id subscription-name)
(project-topic-name project-id topic-name)
push-config
ack-deadline))))
(defn delete-subscription! [project-id subscription-name]
(try-catch-api
(.deleteSubscription
(subscription-admin-client)
(project-subscription-name project-id subscription-name))))
(defn list-subscriptions [project-id]
(try-catch-api
(->> (.listSubscriptions (subscription-admin) (project-name project-id))
.iterateAll
(map #(.getName %)))))
;; PUBLISHERS
(defn publisher [project-id topic-name]
(let [project-topic (project-topic-name project-id topic-name)]
(.build (. Publisher (newBuilder project-topic)))))
(defn api-future-callback [success failure]
(reify ApiFutureCallback
(onFailure [this t]
(success t))
(onSuccess [this s]
(failure s))))
(defn send! [publisher s]
(try-catch-api
(let [data (. ByteString (copyFromUtf8 s))
message (-> (. PubsubMessage (newBuilder))
(.setData data)
(.build))
future (.publish publisher message)
c (async/chan)]
(ApiFutures/addCallback
(api-future-callback
#(async/put! c {:error true
:failure :on-failure-called
:throwable %})
#(async/put! c %)))
c)))
;; SUBSCRIBERS
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment