Skip to content

Instantly share code, notes, and snippets.

@benstopford
benstopford / KafkaMostBasicTest
Last active October 20, 2021 19:47
Kafka Testing at its Most Simple
package com.confluent.benstopford;
import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.producer.KafkaProducer;
# Submit an order. Immediately retrieving it will block until validation completes.
$ curl -X POST ... --data {"id":"1"...} http://server:8081/orders/
$ curl -X GET http://server:8081/orders/validated/1?timeout=500
//In Node.js service
var nodemailer = require('nodemailer');
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(client, [ { topic: 'platinum_emails', partition: 0 } ] );
consumer.on('message', function (orderConsumerTuple) {
sendMail(orderConsumerTuple);
//Execute query in a sidecar
ksql> CREATE STREAM orders (ORDERID string, ORDERTIME bigint...) WITH (kafka_topic='orders', value_format='JSON');
ksql> CREATE STREAM platinum_emails as select * from orders, customers where client_level == ‘PLATINUM’ and state == ‘CONFIRMED’;
orders.join(customers, Tuple::new) //join customers and orders
.filter((k, tuple) → tuple.customer.level().equals(PLATINUM) //filter platinum customers
&& tuple.order.state().equals(CONFIRMED)) //only consider confirmed orders
.peek((k, tuple) → emailer.sendMail(tuple)); //send email for each cust/order tuple
//Inside our request-response thread (e.g. the webserver’s thread)
ReadOnlyKeyValueStore ordersStore = streams.store(“enriched-orders-store”,...);
ordersStore.get(orderId);
//Inside the Kafka Streams thread (extending the example above) push the enriched orders into a topic of their own.
enrichedOrders.through(“enriched-orders-topic”);
//Now build a table from that topic, which will be pushed into the “enriched-orders-store”
KTable enrichedOrdersTable = builder.table(..., “enriched-orders-topic”, “enriched-orders-store”);
//Build a stream from the orders topic
KStream ordersStream = builder.stream(..., “orders-topic”);
//Build a table from the products topic, stored in a local state store called “product-store”
GlobalKTable productsTable = builder.globalTable(...,“product-topic”,“product-store”);
//Join the orders and products to do the enrichment
KStream enrichedOrders = ordersStream.leftJoin(productsTable, (id, order) -> order.productId,...);
@benstopford
benstopford / poker-scoring-clojure
Last active December 29, 2015 11:59
Scoring a poker hand in Clojure
(require '[clojure.test :refer :all ])
;flush
(defn flush?
[hand]
(= 1 (count
(distinct
(map first hand)))))
;straight
@benstopford
benstopford / SSLBenchmark
Created September 15, 2015 14:38
Simple, hacky SSL Benchmark ported from oracle example
package kafka.tools;
/*
* Copyright (c) 2004, Oracle and/or its affiliates. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* -Redistribution of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.