Skip to content

Instantly share code, notes, and snippets.

@nemosupremo
Created February 21, 2018 00:03
Show Gist options
  • Save nemosupremo/5ab73d650a2cf0ae6f2c49b8dddf4767 to your computer and use it in GitHub Desktop.
Save nemosupremo/5ab73d650a2cf0ae6f2c49b8dddf4767 to your computer and use it in GitHub Desktop.
from __future__ import division
import string
import struct
import wallaroo
import datetime
from device_file import device_file
from sitemap import dev_site_map
config = {
"kafka": {
"in": "adx-device-events-prod",
"brokers": "kafka01.internal",
"log_level": "Warn"
}
}
dev_map = dev_site_map(None)
def application_setup(args):
#(in_topic, in_brokers,
# in_log_level) = wallaroo.kafka_parse_source_options(args)
#(out_topic, out_brokers, out_log_level, out_max_produce_buffer_ms,
# out_max_message_size) = wallaroo.kafka_parse_sink_options(args)
ab = wallaroo.ApplicationBuilder("Analytics Pipeline")
ab.new_pipeline("compute",
wallaroo.KafkaSourceConfig(config["kafka"]["in"], [(config["kafka"]["brokers"], "9092"),], config["kafka"]["log_level"],
decoder))
#ab.new_pipeline("Celsius Conversion",
# wallaroo.TCPSourceConfig("127.0.0.1", 1234, decoder))
ab.to(compute2)
ab.to_sink(wallaroo.TCPSinkConfig("localhost", "5555", encoder))
#ab.to(add)
#ab.to_sink(wallaroo.KafkaSinkConfig(out_topic, out_brokers, out_log_level,
# out_max_produce_buffer_ms,
# out_max_message_size,
# encoder))
return ab.build()
@wallaroo.decoder(header_length=4, length_fmt=">I")
def decoder(bs):
if len(bs) < 4:
return 0.0
return struct.unpack('>f', bs[:4])[0]
@wallaroo.computation(name="compute step2")
def compute2(message):
return "xyz"
@wallaroo.encoder
def encoder(data):
return str(data) + "\n"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment