Skip to content

Instantly share code, notes, and snippets.

View tulios's full-sized avatar

Túlio Ornelas tulios

View GitHub Profile
@tulios
tulios / KafkaJS_ECONNRESET_retry.md
Last active February 27, 2019 13:13
KafkaJS_ECONNRESET_retry.md

Test 1)

Forcing the seed broker to fail with ECONNRESET. The consumer example on the repo was changed to:

const kafka = new Kafka({
  brokers: ['127.0.0.1:9094'],
  // other configs
})
@tulios
tulios / kafkajs_lz4.spec.js
Created October 8, 2018 18:34
Unit test for KafkaJS LZ4 compression
const createProducer = require('../../producer')
const createConsumer = require('../index')
const { Types, Codecs } = require('../../protocol/message/compression')
const LZ4 = require('kafkajs-lz4')
Codecs[Types.LZ4] = new LZ4().codec
const {
secureRandom,
createCluster,
@tulios
tulios / kafkajs_cluster_consumer_per_worker.js
Created October 3, 2018 17:16
kafkajs_cluster_consumer_per_worker
const fs = require('fs')
const ip = require('ip')
const cluster = require('cluster')
const { Kafka, logLevel } = require('../index')
const errorTypes = ['unhandledRejection', 'uncaughtException']
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
const host = process.env.HOST_IP || ip.address()
@tulios
tulios / kafkajs-snappy-codec.js
Created August 25, 2018 09:22
kafkajs snappy codec
const { promisify } = require('util')
const snappy = require('snappy')
const snappyCompress = promisify(snappy.compress)
const snappyDecompress = promisify(snappy.uncompress)
const XERIAL_HEADER = Buffer.from([130, 83, 78, 65, 80, 80, 89, 0])
const SIZE_BYTES = 4
const SIZE_OFFSET = 16
$ phobos start
[2016-08-13T17:29:59:218+0200Z] INFO -- Phobos : <Hash> {:message=>"Phobos configured",
:env=>"development"}
______ _ _
| ___ \ | | |
| |_/ / |__ ___ | |__ ___ ___
| __/| '_ \ / _ \| '_ \ / _ \/ __|
| | | | | | (_) | |_) | (_) \__ \
\_| |_| |_|\___/|_.__/ \___/|___/
class MyHandler
include PhobosDBCheckpoint::Handler
def consume(payload, metadata)
my_event = JSON.parse(payload)
# <-- your logic (which possibly skips messages) here
ack(my_event['id'], Time.now)
end
end
class MyHandler
include Phobos::Handler
def self.start(kafka_client)
# setup handler
end
def self.stop
# teardown
end
class MyHandler
include Phobos::Handler
def consume(payload, metadata)
# payload - This is the content of your Kafka message, Phobos does not attempt to
# parse this content, it is delivered raw to you
# metadata - A hash with useful information about this event, it contains: key,
# partition, offset, retry_count, topic, group_id, and listener_id
end
end
# Gemfile
platforms :ruby do
gem 'byebug'
end
platforms :jruby do
gem 'pry'
end
# ...
Gem::Specification.new do |spec|
# ...
if RUBY_PLATFORM =~ /java/
spec.platform = 'java'
spec.add_dependency 'activerecord-jdbcpostgresql-adapter'
else
spec.add_dependency 'pg'
end
end