Skip to content

Instantly share code, notes, and snippets.

View apurvam's full-sized avatar

Apurva Mehta apurvam

View GitHub Profile
@apurvam
apurvam / KafkaConsumerToSupportTransactions.java
Created November 23, 2016 18:50
Updating the KakfaConsumer to support transactions
public interface Consumer<K,V> extends Closeable {
/**
* Returns the currently consumed but uncommitted offsets.
*/
Map<TopicPartition, OffsetAndMetadata> uncommittedOffsets();
/**
* Returns the consumer group id.
*/
@apurvam
apurvam / TransactionMetadata.java
Last active November 28, 2016 23:39
TransactionMetadata class
public class TransactionResponse {
/**
* All the error codes that may be returned by transactional methods.
*/
public enum TransactionError {
PRODUCER_FENCED,
NO_PID_MAPPING,
NOT_COORDINATOR_FOR_APP_ID,
INVALID_TXN_REQUEST,
@apurvam
apurvam / TransactionalKafkaProducer.java
Last active November 30, 2016 00:00
Extending the Kafka Producer to support transactions.
public interface Producer<K,V> extends Closeable {
/**
* Needs to be called before any of the other transaction methods. Assumes that
* the producer.app.id is specified in the producer configuration.
*
* This method does the following:
* 1. Ensures any transactions initiated by previous instances of the producer
* are committed or rolled back.
* 2. Gets the internal producer id and epoch, used in all future transactional
@apurvam
apurvam / KafkaTransactionsExample.java
Last active November 30, 2016 21:26
An example for using Kafka Transactions
public class KafkaTransactionsExample {
public static void main(String args[]) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
// Note that the ‘transaction.app.id’ configuration _must_ be specified in the
// producer config in order to use transactions.
KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
@apurvam
apurvam / show-topics.sh
Created January 24, 2018 01:26
An example of the updates to the 'show topics' command.
ksql> show topics;
Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | Consumer Groups
----------------------------------------------------------------------------------------------------------
__confluent.support.metrics | false | 1 | 1 | 0 | 0
_schemas | false | 1 | 1 | 0 | 0
connect-configs | false | 1 | 1 | 0 | 0
connect-offsets | false | 25 | 1 | 0 | 0
connect-statuses | false | 5 | 1 | 0 | 0
groupbydevice | true | 4 | 1 | 0 | 0
@apurvam
apurvam / print-topic.sh
Last active January 25, 2018 00:31
An example of how to dump the contents of a topic using 'print'.
ksql> print 'pageviews_kafka_topic_json';
Format:JSON
{"ROWTIME":1516753301093,"ROWKEY":"1","viewtime":1516753300961,"pageid":"Page_53","userid":"User_9"}
{"ROWTIME":1516753301458,"ROWKEY":"11","viewtime":1516753301458,"pageid":"Page_38","userid":"User_5"}
{"ROWTIME":1516753301660,"ROWKEY":"21","viewtime":1516753301660,"pageid":"Page_25","userid":"User_2"}
^C
@apurvam
apurvam / topk.sql
Created January 30, 2018 00:57
KSQL TopK example
SELECT orderzip_code, TOPK(order_total, 5) FROM orders WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY order_zipcode;
@apurvam
apurvam / topkdistinct.sql
Last active January 31, 2018 02:06
KSQL TOPKDISTINCT example
SELECT pageid, TOPKDISTINCT(viewtime, 5) FROM pageviews_users GROUP BY pageid;
@apurvam
apurvam / ksql-print-metrics-example.txt
Last active January 31, 2018 02:13
Example output for the ksql-print-metrics-tool
$ ./bin/ksql-print-metrics
messages-consumed-avg: 96416.96196183885
messages-consumed-min: 88900.3329377909
error-rate: 0.0
num-persistent-queries: 2.0
messages-consumed-per-sec: 193024.78294586178
messages-produced-per-sec: 193025.4730374501
num-active-queries: 2.0
num-idle-queries: 0.0
messages-consumed-max: 103397.81191436431
@apurvam
apurvam / safe-drop-example.sh
Created April 24, 2018 22:15
Example of how KSQL protects against dropping tables and streams which are in use.
ksql> show queries;
Query ID | Kafka Topic | Query String
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CSAS_PAGEVIEWS_ENRICHED | PAGEVIEWS_ENRICHED | CREATE STREAM pageviews_enriched AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid;
CSAS_PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT * FROM pageviews_enriched WHERE gender = 'FEMALE';
CSAS_PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kaf