Skip to content

Instantly share code, notes, and snippets.

View service-curl.bash
# Submit an order. Immediately retrieving it will block until validation completes.
$ curl -X POST ... --data {"id":"1"...} http://server:8081/orders/
$ curl -X GET http://server:8081/orders/validated/1?timeout=500
View sidecar.js
//In Node.js service
var nodemailer = require('nodemailer');
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.Client(),
consumer = new Consumer(client, [ { topic: 'platinum_emails', partition: 0 } ] );
consumer.on('message', function (orderConsumerTuple) {
sendMail(orderConsumerTuple);
View KSQL.bash
//Execute query in a sidecar
ksql> CREATE STREAM orders (ORDERID string, ORDERTIME bigint...) WITH (kafka_topic='orders', value_format='JSON');
ksql> CREATE STREAM platinum_emails as select * from orders, customers where client_level == ‘PLATINUM’ and state == ‘CONFIRMED’;
View Email.java
orders.join(customers, Tuple::new) //join customers and orders
.filter((k, tuple) → tuple.customer.level().equals(PLATINUM) //filter platinum customers
&& tuple.order.state().equals(CONFIRMED)) //only consider confirmed orders
.peek((k, tuple) → emailer.sendMail(tuple)); //send email for each cust/order tuple
View UnbundledDatabaseSudoJavaExample2.java
//Inside our request-response thread (e.g. the webserver’s thread)
ReadOnlyKeyValueStore ordersStore = streams.store(“enriched-orders-store”,...);
ordersStore.get(orderId);
View UnbundledDatabaseSudoJavaExample2.java
//Inside the Kafka Streams thread (extending the example above) push the enriched orders into a topic of their own.
enrichedOrders.through(“enriched-orders-topic”);
//Now build a table from that topic, which will be pushed into the “enriched-orders-store”
KTable enrichedOrdersTable = builder.table(..., “enriched-orders-topic”, “enriched-orders-store”);
View UnbundledDatabaseSudoJavaExample.java
//Build a stream from the orders topic
KStream ordersStream = builder.stream(..., “orders-topic”);
//Build a table from the products topic, stored in a local state store called “product-store”
GlobalKTable productsTable = builder.globalTable(...,“product-topic”,“product-store”);
//Join the orders and products to do the enrichment
KStream enrichedOrders = ordersStream.leftJoin(productsTable, (id, order) -> order.productId,...);
@benstopford
benstopford / KafkaMostBasicTest
Last active Sep 14, 2020
Kafka Testing at its Most Simple
View KafkaMostBasicTest
package com.confluent.benstopford;
import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
import kafka.serializer.StringDecoder;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.clients.producer.KafkaProducer;
@benstopford
benstopford / SSLBenchmark
Created Sep 15, 2015
Simple, hacky SSL Benchmark ported from oracle example
View SSLBenchmark
package kafka.tools;
/*
* Copyright (c) 2004, Oracle and/or its affiliates. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* -Redistribution of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
@benstopford
benstopford / TextAdventure
Created Apr 22, 2015
Text Adventure Game (West London Hack Night 21st April 2015)
View TextAdventure
import java.util.*;
public class TextAdventure {
public static void main(String[] args) {
new TextAdventure().run();
}
class Question {
String text;