Skip to content

Instantly share code, notes, and snippets.

@nfo
nfo / stderr
Created March 6, 2017 17:30
Kafka Streams - Stopped on stopped broker
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_23, processor=KSTREAM-SOURCE-0000000000, topic=abc, partition=23, offset=388592
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_23] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:76)
at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:79)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
at org.apache.kafka.streams.ks
@nfo
nfo / RoutingTopology.java
Last active April 26, 2017 10:20
Kafka Streams: Calling series `KStream.branch()` to route records to n topics
KStream<Key, Value>[] branches = stream.branch((k, v) -> v.getType().equals("abc"));
branches[0].to(keySerde, valueSerde, "topic-abc");
branches = branches[1].branch((k, v) -> v.getType().equals("def"));
branches[0].to(keySerde, valueSerde, "topic-def");
branches = branches[1].branch((k, v) -> v.getType().equals("ghi"));
branches[0].to(keySerde, valueSerde, "topic-ghi");
// ....
@nfo
nfo / InValueTimestampExtractor.java
Created January 16, 2017 22:13
Kafka Streams - Custom timestamp extractor, from a `long` field named "timestamp"
/**
* Handle records with a timestamp in their Avro value.
* Expects a LONG field named "timestamp".
* Any problem makes this extractor return the record's internal timestamp.
*/
public class InValueTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record) {
if (record != null && record.value() != null) {
@nfo
nfo / README.md
Created January 16, 2017 18:41
Kafka Streams - A serde for timed windows (start + end)
@nfo
nfo / README.md
Last active January 16, 2017 22:54
Kafka Streams - Handling records with same ID with updated metrics.
@nfo
nfo / README.md
Created September 10, 2012 15:54
A microgem to query the Amazon Web Services (S3/EC2/etc) usage reports.
@nfo
nfo / gist:1244830
Created September 27, 2011 11:09
Compiling jenkins_campfire_plugin ... really ?
[user@server yum.repos.d]# yum install maven2
Loaded plugins: fastestmirror, presto
Loading mirror speeds from cached hostfile
* base: centos.crazyfrogs.org
* epel: mirror.bytemark.co.uk
* extras: centos.crazyfrogs.org
* jpackage-generic: jpackage.netmindz.net
* jpackage-generic-updates: jpackage.netmindz.net
* rpmforge: apt.sw.be
* rpmforge-extras: apt.sw.be
@nfo
nfo / screenme.rb
Created December 1, 2010 11:27
Sends a screenshot via HTTP (faster than logging to LogMeIn for simple checks)
require 'rubygems'
require 'sinatra'
get '/' do
Dir.mktmpdir('screenme') do |dir|
file = File.join(dir, Time.now.strftime('%Y%m%d%H%M%S') + '.png')
%x(/usr/sbin/screencapture -x #{file})
send_file file
end
end
@nfo
nfo / riak_curb_logger.rb
Created November 4, 2010 19:11
Prints all the sent HTTP queries, with the headers and data.
Riak::Client::CurbBackend.class_eval do
private
def perform_with_logs(method, uri, headers, expect, data=nil)
puts "\e[31m#{method.upcase} #{uri}\e[0m"
puts "\t\e[33m#{headers}\e[0m"
# puts "\t\e[36m#{expect}\e[0m"
puts "\t\e[34m#{data}\e[0m" if data
# puts "\t\e[37m#{caller.last}\e[0m"
puts
perform_without_logs(method, uri, headers, expect, data)