Skip to content

Instantly share code, notes, and snippets.

@krishnabhargav
Created September 27, 2019 01:00
Show Gist options
  • Save krishnabhargav/836ab745eab7d6448b10e6fcfbce0937 to your computer and use it in GitHub Desktop.
Save krishnabhargav/836ab745eab7d6448b10e6fcfbce0937 to your computer and use it in GitHub Desktop.
Example of using Eventstore.JVM client from a kotlin program without fully subscribing to Akka actors.
import akka.actor.ActorSystem
import eventstore.core.*
import eventstore.j.EsConnectionFactory
import eventstore.j.SettingsBuilder
import kotlinx.coroutines.future.await
import kotlinx.coroutines.runBlocking
import scala.compat.java8.FutureConverters
import scala.concurrent.Future
import java.net.InetSocketAddress
import java.util.*
private suspend fun <T> Future<T>.await() = FutureConverters.toJava(this).await()!!
data class Employee(val name: String, val age: Int)
fun main() {
val settings = SettingsBuilder()
.address(InetSocketAddress("127.0.0.1", 1113))
.defaultCredentials("admin", "changeit")
.heartbeatInterval(10)
.build()!!
val actorSystem = ActorSystem.create()
val connection = EsConnectionFactory.create(actorSystem, settings)
//we don't need to fully adopt the Akka model to work with the eventstore client.
//internally it seems to be relying on actors to manage the communication with Es.
//the connection object here has methods that returns Future<> objects which can be awaited on.
val jsonPayload = Json.toJson(Employee("Krishna", 34))
//for Json.toJson -> see https://gist.github.com/krishnabhargav/7b1832eeb86aa213ba5bb239153977ea
val newEvent = EventData(
"NewEmployeeCreated",
UUID.randomUUID(),
Content.apply(jsonPayload),
Content.Empty()
)
runBlocking {
val writeEvents =
connection.writeEvents(
"Employees",
ExpectedVersion.`Any$`(),
listOf(newEvent),
settings.defaultCredentials().get()
).await()
println("Write result => Position=${writeEvents.nextExpectedVersion()}")
val eventsRead = connection.readStreamEventsForward(
"Employees",
EventNumber.First(),
4096,
false,
settings.defaultCredentials().get()
).await()
eventsRead.events().foreach {
val eventData = it.data()
println (eventData.data().value().utf8String())
}
//kill the actor system when you are done so your process can exist.
//otherwise background threads will prevent your process from closing.
actorSystem.terminate().await()
}
}
// using the HTTP API
//val httpClient = HttpClient.newHttpClient()
//
//sealed class ExpectedVersion(val value: Int) {
// object NoStream : ExpectedVersion(-1)
// object EmptyStream : ExpectedVersion(0)
// object Ignore : ExpectedVersion(-2)
// data class Number(val x: Int) : ExpectedVersion(x)
//}
//
//fun <T> makeWriteEventRequest(
// streamName: String,
// eventType: String,
// expectedVersion: ExpectedVersion,
// payload: T
//): HttpRequest? {
// return HttpRequest.newBuilder()
// .header("Content-Type", "application/json")
// .header("ES-EventType", eventType)
// .header("ES-EventId", UUID.randomUUID().toString())
// .header("ES-ExpectedVersion", expectedVersion.value.toString())
// .uri(URI.create("http://localhost:2113/streams/$streamName?embed=rich"))
// .POST(HttpRequest.BodyPublishers.ofString(Json.toJson(payload)))
// .build()
//
// //What do we need?
// //1. Append event to a stream. With support for version
// //2. Read events back from a stream.
// //3. Read all streams.
//// val writeEvent = makeWriteEventRequest(
//// "Employees", "NewEmployeeCreated",
//// ExpectedVersion.Number(4), Employee("Archana", 30)
//// )
//// val result =
//// httpClient.sendAsync(writeEvent, HttpResponse.BodyHandlers.ofString())?.await()!!
//// println(result.headers().map()["ES-CurrentVersion"])
//// println("Result: ${result.statusCode()}")
//}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment