Skip to content

Instantly share code, notes, and snippets.

@KadekM

KadekM/blog.md Secret

Created March 17, 2021 15:07
Show Gist options
  • Save KadekM/7145328201b5fec22f3cfda77ad72968 to your computer and use it in GitHub Desktop.
Save KadekM/7145328201b5fec22f3cfda77ad72968 to your computer and use it in GitHub Desktop.

ZIO Kafka with offset storage in Postgres

Intro

Scala often gets reputation of how complicated it is to write and setup and how much boilerplate there is. Same goes for functional programming. My goal is to show you the opposite - how even with limited knowledge of following topics you can productively use competing libraries in same field, and how well they fit together. How you can easily treat kafka topics as streams of data, how simply you can compose sql statements into bigger transaction, and overall how productive you can be in today Scala's ecosystem.

This article relies on some very limited amount of knowledge of Scala, SBT, Kafka, Postgres, Docker and Functional Programming. If I did my homework right, you shouldn't feel intimidated at any point by any of these technologies. Before using anything, I'll try to provide the briefest description that I think is sufficient for you - dear reader. Now if you know those technologies, go ahead and skip those brief descriptions, as I'll be hugely simplifying :-)

The overall program is short, concise, and I believe no matter how experienced you are within these topics, you might discover something useful. Within the article you'll find following:

  • how to use ZIO Kafka to produce and consume messages.
  • how to serialize and deserialize our data to Json
  • how to use Skunk a native, functional non-blocking Scala driver for Postgres to setup and query schema
  • how to manually commit Kafka offset partitions in Postgres in transaction
  • how nicely Cats and ZIO play together

Following a code on a blogpost may get a bit confusing, so I recommend checking out and following the runnable code can be found in my github project.

Setting up the stage

We have bank accounts. Each bank account has owner's name and current balance in dollars. Our account owners want to transfer money between themselves. Peter might want to send 300$ to me. I might want to send some money to my brother, Marius. We issue these commands to bank, and once they are processed, our balance changes. If something goes wrong during processing, neither sender nor receiver observes any changes to their bank account, at any time.

To have some concrete example, let's define how such command could look: {"from": "Peter", "to": "Marek", amount: 300} - this would mean, that Peter is sending 300$ to Marek, and thus if this gets processed, Peter's balance should be -300$ (how unfortunate for Peter), while Marek's would be +300$. We could designate the type of this command, such as type:"TransferBalance", but since we support only single command we don't need to.

Once we issue the command, it is enqueued into Kafka. Kafka is distributed queue, at least for this example ;-). You have probably used Queue data structure before - think of Kafka's Topic as such Queue, one that is externalized from your program, has more capabilities, scales much better, is resilient, etc.

Bank has application that consumes these events from Kafka, processes them, and updates the Postgres database with new balances of each user. Postgres is a relational database - a place where we can store and manipulate our business data.

A money transfer will be two SQL statements - one to decrement sender's balance, other one to increment receiver's balance. We want these to happen in transaction - we don't want to end up in state where we removed money from one account, and never added it to other account, because our app crashed. Luckily Postgres (as pretty much every other relational db) supports transactions. So our users shouldn't complain that money vanished from their account and were never received.

So if processing of balance transfer command crashes, we won't execute the actual transfer. It's good that users won't observe any discrepancies in their accounts, but they will see that even though they issued the command to transfer funds, nothing had happened. We need to tell Kafka (our queue), that we aren't done with this processing. Since we have these transactions (in our relational database), that designate the consistency of accounts, could we use them? Kafka stores internally what you have processed, under number called offset. We can tell Kafka that we'll store this offset ourselves - in Postgres - and we'll update it in the same transaction that we update the users' funds. Thus if anything goes crashes processing of messages, the offset will stay unchanged (because transaction will fail) and the messages will be processed again (once the service restarts or we fix the bug).

During this example, we'll create both process that issues these commands (the user's side, so to speak), and a process that consumes them (the bank side).

Preparations

We'll need some containers running. Let's start Kafka with Zookeeper and Postgres.

https://gist.github.com/14b4a67a984eb39689afd067db560f5a

The overall app will be based on ZIO. ZIO is, first of all, functional programming library for building asynchronous and concurrent programs, though it has a lot more goodies.

We'll use SBT as our build tool - the most used build tool within Scala ecosystem. Think of it as Maven or Gradle, but for Scala.

We now need a means to communicate with Kafka - to consume and produce messages. There is a Java client we could use. But that would mean we have to deal with unsafe impure code, and as I mentioned we'd like to use ZIO. Thus we reach for ZIO Kafka as our library of choice. Don't be alarmed - underneath it uses the same Java library I had linked, but it wraps it nicely, exposing functional-programming-friendly interface. Let's add it to our build.sbt file:

https://gist.github.com/2e91fdfbb3c77a9acd2533848363f074

It transitively pulls zio-core and zio-streams. Core are the main components that you need anytime you're building ZIO app. ZIO Streams represents stream of values that are pulled by some effectful computation - such as asking Kafka to give them to us :-). In my opinion, streams and stream-based applications are still very underused and underappreciated.

Let's be a good citizen, and while we're dealing with ZIO ecosystem, pull in logging library. https://gist.github.com/c844994c7f3c7b91e0b991300f454c52

We could integrate it with various backends, such as slf4j - but we don't need any of that for this toy app, we'll just use console. But should we need it, there are nice libraries already integrated.

Let's serialize our data in JSON as we write it to Kafka, because it's such as common format after all. We'll use Circe. https://gist.github.com/6e4a487fcade676fc047ac6ccd9afb44

We added also macro support to compiler options, as we'll be generating JSON codecs from case classes (as we are too lazy to write them by hand).

When it comes to dependencies, we covered the part that help us integrating with Kafka, using ZIO, while using Circe for serialization (and deserialization) of messages.

How do we talk to Postgres? At this moment, there is no native ZIO driver we could use. We can pick any of existing Java or Scala library, and wrap it with ZIO - it's a bit of a hassle. On the other side, there is a native postgres library called _Skunk. Skunk is based off Cats and fs2, another fantastic functional programming libraries, that are competing with ZIO.

Let's add Skunk to our dependencies:

https://gist.github.com/5d9d18326a05bcb0c4be0f4aa19cc308

Now we have two competing libraries (ZIO and Cats). Let them become friends via interoperability layer: https://gist.github.com/4ece7ea6b302ffbe44e2a60ac33ea263

That's it - we have everything we need. Let's tie it all together.

Producer

Common functionality

Let's create a producer that issues a command to transfer some money from one person to another every second.

We can define our command as follows: https://gist.github.com/5365d1083811876520f3fac9af7d00ca

There is only one possible command - TransferBalance. I encourage you to implement more commands (i.e. CloseAccount) and see how compiler guides you. You get automatic JSON format derivation via @JsonCodec annotation. This is the data that will be stored in Kafka's topic. Let's give the topic a name, define some people, and means to select random person.

https://gist.github.com/e52fe23b680db6565d53445665980af4

Even though we have defined JSON representation for our, we need to tell Kafka about it. We'll use Circe Printer to print it to String and store in Kafka. Kafka already has support for String serialization and deserialization (called Serde), that ZIO Kafka library conveniently wraps in zio.kafka.serde.Serde.string. We can inmap over it - essentially we tell how to go from String to Json (for deserialization), and how to go from Json to String (for serialization).

https://gist.github.com/fdade1b135e5839eb5456bc7d493e553

The first function of inmapM (x => ZIO.fromEither(parser.parse(x).flatMap(_.as[Command]))) instructs how to read it the String stored in Kafka. We parse it into JSON format (parser.parse(x)) and we decode it into our Command (.flatMap(_.as[Command])). There can be errors in parsing to JSON (actual string is not JSON) and in decoding into our Command structure (the json we read does not match our Command), and thus we want to fail if that happens. This error is represented in Either, which we can lift into ZIO by ZIO.fromEither method.

The second function of inmapM (x => ZIO.succeed(x.asJson.printWith(commandSerdePrinter))) first encodes our Command into JSON x.asJson using Circe. Nothing can go wrong here - we can definitely encode every instance of Command into JSON. Second part (.printWith(commandSerdePrinter)) prints it to JSON. Again, no error can happen, and thus we lift it to successful ZIO.

Producer Implementation

Now we can implement actual producer in 30ish lines of code.

https://gist.github.com/926388a1527a3e6763bec662ae0987e2

We had defined some ProducerSettings, then created Kafka Producer Producer.make(producerSettings, Serde.int, CommandSerde) with our Serde. We generate a command with some random values, and append it to our topic p.produce(TopicName, event.hashCode(), event). Don't worry about the hashCode part - that is related to Kafka Key and is not relevant for this example. We want this program to run forever (produce *> ZIO.sleep(1.second)).forever, producing values every second.

Because we added few logging statements, we need to provide Logging layer (from zio-logging library) via .provideCustomLayer(loggingLayer). here learn more about ZIO layers .

That's it - running this app will continuosly send events into Kafka: https://gist.github.com/f4c661253193e4df4fbead9376504a79

Consumer

SQL layer

We want to store in our database two structures. Account balances and topic partition offsets. Let's start by account balances: https://gist.github.com/c53a97b685b730036c28f9bb512db4f9

Balance holds data about account owner name and owner's current balance. We'll update this balance based on commands we'll process.

Let's prepare DDL statement for creation of table to represent our Balance: https://gist.github.com/9a09d787ce3f9386f36e311708757638

We use Skunk's sql interpolator followed by command to create this into Command. So far we are not executing anything, we're simply preparing all the SQL we need.

Balance will be updated during processing, so we need a statement that will insert the balance if it does not exist, or otherwise increment/decrement it based on the desired amount. We prepare parametrized query, that takes two arguments - name, and amount that we want to update the balance with.

https://gist.github.com/933a0f7d7a2b907d965ada80d2177e1f


We also want to store topic partitions in our relational database, so let's prepare the representation and sql statements.

https://gist.github.com/78beb80e9e2f62b44c5f8c3f4fd764b4

In our table, we can store TopicPartition as varchar, with format of $topicname-$partition. Our DDL statement would be:

https://gist.github.com/79df3ca165688c96afd04905f66d53ad

Let's see how we could query these offsets for some partitions:

We create parametrized query: https://gist.github.com/9018e012fd3e3c6f646359cce875e1f8

We are creating a query that takes some variable list of strings and returns tuple of String and Int (the topic and offset from our database). Our case class topic consists of TopicPartition, not String, and not every String we get from database may be representing topic partition. Let's parse them and pick only valid topic partitions.

https://gist.github.com/17c401165bb810b5e1d05db946e23dc2

Last missing piece is sql to upsert partition offsets. If they are missing, we will insert them, otherwise we'll update the existing offset to new one.

https://gist.github.com/5c0cd96b9a0cab1c87249ad90974f2dc

We have prepared SQL statements for creating offsets table, for querying for topic partitions data and for upserting data.

Consumer Implementation

Let's set expectations for our consumer's implementation. We expect to

  • establish session to Postgres (so we can create tables and perform updates based on events)
  • use the SQL statements that we prepared in previous section
  • subscribe to Kafka to consume events
  • process bunch of events, committing both Kafka offsets and changes to account balances in single transaction
  • some code to integrate Skunk (based on Cats) and ZIO-Kafka.

Let's start by defining our ZIO app, and preparing our Postgres session. We'll represent it as ZIO Layer. https://gist.github.com/3bb3debc8be9a375b039060f8a8f41af

What's going on here? We introduce type alias type SessionTask = Session[Task] for clarity. It represents Session that can execute statements within ZIO Task.

The Session.single call creates a Session, which comes from Skunk library. It is wrapped in Resource to ensure that we both acquire it and close it properly. But those are Cats construct. We convert it to ZIO equivalent - ZManaged via .toManaged call. For that, we need help of import zio.interop.catz._ as well as having implicit ZIO runtime in scope. We can get it from ZManaged.runtime, over which we flatMap to introduce it to current scope ZManaged.runtime.flatMap { implicit r: Runtime[Any] =>. Lastly, we convert the ZManaged to layer via toLayer call.

Don't worry about import natchez.Trace.Implicits.noop too much, it's just our tracing instance.

Overall, whole integration of ZIO and Cats took single import and bringing in implicit Runtime into scope. That's pretty simple.

Let's define our program. We want to create a Kafka consumer with manual offset retrieval (from Postgres).

https://gist.github.com/304a07f78bd977bfa828dd966e74701a

We again introduce implicit runtime into scope, as we're working with Cats based Session. We specify manual offset retrieval Consumer.OffsetRetrieval.Manual { partitions =>. In it, we specify, how to fetch offsets for set of partitions. We first need to convert our partitions into string representation that we use in database ($topic-$partition) via partitions.map(partitionToString).toList.

We prepare the query (that we had created in previous section) for selecting offsets session.prepare(query).toManaged and use it (with our list of partitions) to stream all results .use(_.stream(list, 64).compile.toVector). Our query for fetching offsets for many partitions may return many offsets, and thus having to think in terms of streams helps us not forget about it. Not every query you may run against Postgres may have nice characteristics of returning only few rows.

If you remember well in previous section, we mentioned that not everything stored in database may be a valid TopicPartition string representation. We handle this encoding in our SQL definition, and thus now we only need to collect the ones that are valid .map(xs => xs.collect { case Some(t) => t }. We convert it to structure that Consumer.OffsetRetrieval.Manual expects, which is map of String (PartitionTopic) to Long (Offset) via .map(x => x.topic -> x.offset).toMap.

In last step we create Consumer based on settings (for manual retrieval) to connect to our local Kafka instance.


So far we've just been preparing things, such as SQL statements, Kafka Consumer, Postgres Session, but nothing really had happened. Let's create the tables and actually consume the Kafka Topic to process events.

Building on previous code:

https://gist.github.com/65bd68e94bae21403a383c0426046325

We use Postgres Session to execute DDL statements to setup out database, and subscribe to Kafka topic consumer.subscribeAnd(Subscription.topics(TopicName)). This is same topic Subscription.topics(TopicName) that we produce messages into, and we use same serializer-deserializer CommandSerde.

We tap into the stream and log every message .tap(x => log.info(x.value.toString)), and we run the stream to exhaustion - which will never happen, as the stream of events from Kafka is infinite. At any time, producer can append new event to it.

Let's add another operation to the stream, where we actually process the messages and offset management in single transaction. This is the beefy part - and it's way simpler than it looks. https://gist.github.com/19c4991152743155f41440dc94b27ee4

We want to process the stream by chunks of Kafka records (think of it as pulling a bunch of messages at once in effective manner), via .foreachChunk { chunk =>. For every chunk's offset, we create OffsetBatch - instance that merges offsets into single data structure via OffsetBatch(chunk.map(_.offset)).

We create a list of these offsets for our offset update command offsets.map{ case (topicPartition, offset) => ... - it's the argument that we need for Sql.updatePartitionCommand.

Mapping over chunk we can get chunk of actual commands - the commands our producer has pushed to Kafka - chunk.map(_.value).

We open and use a transaction session.transaction.toManaged.use { transaction =>. Inside transaction, we want to make two SQL statements for every command - one to withdraw funds from sender's account, and other one to deposit receiver's funds. For each command that we have (in our chunk), we want to create sql statements them using ZIO.foreach_(commands) {.

Executing statement is a matter of taking the SQL we wrote, preparing it and feeding it the values from command that we are processing. https://gist.github.com/cb4b1b52474d5f9fddd58e0cfdbcc6bf

After all statements are executed within transaction, we also append committing Kafka offset statement, and we execute the transaction:

https://gist.github.com/c0c49435b774bb0c3930c082f06cea69

That's it - that's our main zio computation. We feed it our Logging and Session layer that we and run it:

https://gist.github.com/b67608d11ee7c13979a16c81d553be54

If we run the producer and consumer we should see that messages are being both produced and consumed. If we check our Postgres instance, we can find our offsets (your will differ): https://gist.github.com/64014d0ee36e27adbc77b5bdcefbcde1

Checking the balances

https://gist.github.com/f4e45df30d0d32c724807cc711268f45

Because everyone started with 0$, and we handle every transfer in transaction, at any given time the sum of these accounts should equal to 0 - even if you introduce random failures into the pipeline.

https://gist.github.com/a43a8f576843970e3697ce268ddb8d6f

Next steps

If you want to have some more fun, or test it more, try implementing following:

  • inject random failures into computation
  • support more commands
  • disallow going to negative balance
  • in case of error, commit offset, but enqueue the message again at the end of your Kafka Topic (maybe store how many times this operation was tried)

Conclusion

We successfully used libraries from competing ecosystems to play together with just few lines. We were able to stay at all time within pure functional Scala, composing simple programs into bigger, more complex ones.

We saw two kind of streams - first one (fs2 (Cats-based)) from Skunk library, when querying a table for entries (possibly millions of rows should the use case of the table be different). Second one (ZIO Stream) was representing our subscription to Kafka Topic, to which our producer pushed events. Our consumer program processed them within chunks, transformed them into single transaction, and committed them together with offsets of our Kafka Topic.

Both effect and streaming libraries played together very nicely and allowed us focus on domain problem, which shows the length to which Scala library authors are willing to go for good ergonomics for end users.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment