Test 1)
Forcing the seed broker to fail with ECONNRESET
. The consumer example on the repo was changed to:
const kafka = new Kafka({
brokers: ['127.0.0.1:9094'],
// other configs
})
const fs = require('fs') | |
const ip = require('ip') | |
const cluster = require('cluster') | |
const { Kafka, logLevel } = require('../index') | |
const errorTypes = ['unhandledRejection', 'uncaughtException'] | |
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] | |
const host = process.env.HOST_IP || ip.address() |
module ApplicationHelper | |
# Using raw files | |
def embedded_svg filename, options = {} | |
root = options[:root] || Rails.root.join("app", "assets", "svgs") | |
file = File.read(File.join(root, filename)) | |
doc = Nokogiri::HTML::DocumentFragment.parse file | |
svg = doc.at_css 'svg' | |
svg['class'] = options[:class] if options[:class].present? | |
doc.to_html.html_safe | |
end |
Test 1)
Forcing the seed broker to fail with ECONNRESET
. The consumer example on the repo was changed to:
const kafka = new Kafka({
brokers: ['127.0.0.1:9094'],
// other configs
})
const createProducer = require('../../producer') | |
const createConsumer = require('../index') | |
const { Types, Codecs } = require('../../protocol/message/compression') | |
const LZ4 = require('kafkajs-lz4') | |
Codecs[Types.LZ4] = new LZ4().codec | |
const { | |
secureRandom, | |
createCluster, |
const { promisify } = require('util') | |
const snappy = require('snappy') | |
const snappyCompress = promisify(snappy.compress) | |
const snappyDecompress = promisify(snappy.uncompress) | |
const XERIAL_HEADER = Buffer.from([130, 83, 78, 65, 80, 80, 89, 0]) | |
const SIZE_BYTES = 4 | |
const SIZE_OFFSET = 16 |
{ | |
module: { | |
loaders: [ | |
{ | |
test: /\.hbs$/, | |
include: /app\/templates/, // or whatever directory you have | |
loader: 'ember-webpack-loaders/htmlbars-loader' | |
}, | |
{ | |
test: /app\/index\.js/, // the main app file |
class MyHandler | |
include Phobos::Handler | |
def consume(payload, metadata) | |
# payload - This is the content of your Kafka message, Phobos does not attempt to | |
# parse this content, it is delivered raw to you | |
# metadata - A hash with useful information about this event, it contains: key, | |
# partition, offset, retry_count, topic, group_id, and listener_id | |
end | |
end |
$ phobos start | |
[2016-08-13T17:29:59:218+0200Z] INFO -- Phobos : <Hash> {:message=>"Phobos configured", | |
:env=>"development"} | |
______ _ _ | |
| ___ \ | | | | |
| |_/ / |__ ___ | |__ ___ ___ | |
| __/| '_ \ / _ \| '_ \ / _ \/ __| | |
| | | | | | (_) | |_) | (_) \__ \ | |
\_| |_| |_|\___/|_.__/ \___/|___/ |
class MyHandler | |
include Phobos::Handler | |
def self.start(kafka_client) | |
# setup handler | |
end | |
def self.stop | |
# teardown | |
end |
class MyHandler | |
include PhobosDBCheckpoint::Handler | |
def consume(payload, metadata) | |
my_event = JSON.parse(payload) | |
# <-- your logic (which possibly skips messages) here | |
ack(my_event['id'], Time.now) | |
end | |
end |