Skip to content

Instantly share code, notes, and snippets.

@caioaao
Created February 25, 2019 16:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save caioaao/f7480cd9fa2e5f32766027c2e65bcef4 to your computer and use it in GitHub Desktop.
Save caioaao/f7480cd9fa2e5f32766027c2e65bcef4 to your computer and use it in GitHub Desktop.
(ns aeron-cluster-test.core
(:require [clojure.stacktrace :refer [print-stack-trace]])
(:import [org.agrona CloseHelper]
[org.agrona.concurrent AgentTerminationException]
[org.agrona ExpandableArrayBuffer]
[org.agrona ErrorHandler]
[io.aeron.cluster ClusteredMediaDriver ConsensusModule ConsensusModule$Context]
[io.aeron.cluster.client EgressListener AeronCluster AeronCluster$Context ]
[io.aeron.cluster.service ClusteredService ClusteredServiceContainer ClusteredServiceContainer$Context Cluster ClusteredServiceAgent ClientSession]
[io.aeron.driver MediaDriver$Context ThreadingMode]
[io.aeron.archive Archive$Context ArchiveThreadingMode]
[java.lang Throwable])
(:gen-class))
(def pst-error-handler
(reify ErrorHandler
(onError [_ throwable]
(print-stack-trace throwable))))
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L49
(defn make-clustered-media-driver! []
(ClusteredMediaDriver/launch
(-> (MediaDriver$Context.)
(.threadingMode ThreadingMode/SHARED)
(.termBufferSparseFile true)
(.dirDeleteOnStart true)
(.errorHandler pst-error-handler))
(-> (Archive$Context.)
(.maxCatalogEntries 1024)
(.threadingMode ArchiveThreadingMode/SHARED)
(.deleteArchiveOnStart true))
(-> (ConsensusModule$Context.)
(.errorHandler pst-error-handler)
(.deleteDirOnStart true)
(.terminationHook #(throw (AgentTerminationException.))))))
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L67
(defn stop! [clustered-media-driver container cluster]
(CloseHelper/close cluster)
(CloseHelper/close container)
(CloseHelper/close clustered-media-driver)
(-> clustered-media-driver
.consensusModule
.context
.deleteDirectory)
(-> clustered-media-driver
.archive
.context
.deleteArchiveDirectory)
[] (-> clustered-media-driver
.mediaDriver
.context
.deleteAeronDirectory))
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L100
(defn make-egress-listener! [msg-counter]
(reify EgressListener
(onMessage [this session-id timestamp-ms
buffer offset length header]
(swap! msg-counter inc))))
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L163
(defn launch-echo-service! []
(let [echo-service (reify ClusteredService
(onStart [this new-cluster])
(onSessionOpen [_this _session _timestamp-ms])
(onSessionClose [_this _session _timestamp-ms _close-reason])
(onSessionMessage [this session timestamp-ms buffer offset length header]
(while (< (.offer session buffer offset length) 0)
(Thread/yield)))
(onTimerEvent [_this _correlation-id _timestamp-ms])
(onTakeSnapshot [_this _snapshot-pub])
(onLoadSnapshot [_this _snapshot-image])
(onRoleChange [_this _new-role]))]
(ClusteredServiceContainer/launch
(-> (ClusteredServiceContainer$Context.)
(.clusteredService echo-service)
(.errorHandler pst-error-handler)))))
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L235
(defn connect-to-cluster! [listener]
(AeronCluster/connect
(-> (AeronCluster$Context.)
(.egressListener listener)
(.ingressChannel "aeron:udp")
(.clusterMemberEndpoints "0=localhost:9010,1=localhost:9011,2=localhost:9012"))))
(comment
;; https://github.com/real-logic/aeron/blob/master/aeron-cluster/src/test/java/io/aeron/cluster/ClusterNodeTest.java#L92
(def clustered-media-driver (make-clustered-media-driver!))
(def buff (ExpandableArrayBuffer.))
(.putStringWithoutLengthAscii buff 0 "sup")
(def msg-counter (atom 0))
(def egress-listener (make-egress-listener! msg-counter))
(def container (launch-echo-service!))
(def aeron-cluster (connect-to-cluster! egress-listener))
(while (< (.offer aeron-cluster buff 0 3) 0)
(Thread/yield))
(while (= @msg-counter 0)
(if (<= (.pollEgress aeron-cluster) 0)
(Thread/yield)))
(stop! clustered-media-driver container aeron-cluster)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment