Created
December 3, 2018 21:13
-
-
Save mping/1e784c1c81108dd61d301863a06b3e28 to your computer and use it in GitHub Desktop.
Kafka props in clojure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns com.heroku.sdk.CljKeyStore | |
"Dynamically generated class to allow creating custom key stores by directly passing the certs and keys | |
instead of relying on env vars" | |
(:gen-class | |
:name com.heroku.sdk.CljKeyStore | |
:extends com.heroku.sdk.EnvKeyStore | |
:methods [#^{:static true} [fromTrustWithRandomPassword [String] com.heroku.sdk.EnvKeyStore] | |
#^{:static true} [fromKeyAndCertWithRandomPassword [String String] com.heroku.sdk.EnvKeyStore]] | |
:constructors {[String String] [String String], | |
[String String String] [String String String]})) | |
(defn- rand-bigint [] | |
(.toString (java.math.BigInteger. 130 (java.security.SecureRandom.)) 32)) | |
(defn -fromTrustWithRandomPassword [^String trust] | |
(com.heroku.sdk.CljKeyStore. trust (rand-bigint))) | |
(defn -fromKeyAndCertWithRandomPassword [^String key ^String cert] | |
(com.heroku.sdk.CljKeyStore. key cert (rand-bigint))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns worker.config | |
(:require [clojure.string :as str] | |
[com.heroku.sdk.CljKeyStore]) ;;required to trigger AOT compilation of CljKeyStore first | |
(:import [java.util Properties] | |
[java.net URI] | |
[com.heroku.sdk CljKeyStore])) | |
(defn- merge-kafka-url | |
[cfg ^Properties props ^URI uri] | |
(case (.getScheme uri) | |
"kafka" | |
(.put props :security.protocol "PLAINTEXT") | |
"kafka+ssl" | |
(let [env-trust-store (CljKeyStore/fromTrustWithRandomPassword (:kafka_trusted_cert cfg)) | |
env-key-store (CljKeyStore/fromKeyAndCertWithRandomPassword (:kafka_client_cert_key cfg) (:kafka_client_cert cfg))] | |
(doto props | |
(.put :security.protocol "SSL") | |
(.put :ssl.truststore.type (.type env-trust-store)) | |
(.put :ssl.truststore.location (.getAbsolutePath (.storeTemp env-trust-store))) | |
(.put :ssl.truststore.password (.password env-trust-store)) | |
(.put :ssl.keystore.type (.type env-key-store)) | |
(.put :ssl.keystore.location (.getAbsolutePath (.storeTemp env-key-store))) | |
(.put :ssl.keystore.password (.password env-key-store))))) | |
props) | |
;; https://github.com/heroku/heroku-kafka-demo-java/blob/master/src/main/java/com/heroku/kafka/demo/KafkaConfig.java | |
(defn ^Properties build-properties | |
"Returns a `java.util.Properties` instance for usage with Kafka client. | |
If the `:kafka` url has the `kafka+ssl` scheme, uses the following config keys: | |
- :kafka_url | |
- :kafka_trusted_cert | |
- :kafka_client_cert_key | |
- :kafka_client_cert | |
" | |
[cfg] | |
(let [properties (Properties.)] | |
(loop [mapped [] | |
props properties | |
hosts (str/split (:kafka_url cfg) #",")] | |
(if (empty? hosts) | |
(do | |
(.put props :bootstrap.servers (str/join "," mapped)) | |
props) | |
(let [host (first hosts) | |
uri (URI. host) | |
fmt-uri (format "%s:%d" (.getHost uri) (.getPort uri))] | |
(recur (conj mapped fmt-uri) | |
(merge-kafka-url cfg props uri) | |
(rest hosts))))))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment