-
-
Save nemosupremo/5ab73d650a2cf0ae6f2c49b8dddf4767 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import string | |
import struct | |
import wallaroo | |
import datetime | |
from device_file import device_file | |
from sitemap import dev_site_map | |
config = { | |
"kafka": { | |
"in": "adx-device-events-prod", | |
"brokers": "kafka01.internal", | |
"log_level": "Warn" | |
} | |
} | |
dev_map = dev_site_map(None) | |
def application_setup(args): | |
#(in_topic, in_brokers, | |
# in_log_level) = wallaroo.kafka_parse_source_options(args) | |
#(out_topic, out_brokers, out_log_level, out_max_produce_buffer_ms, | |
# out_max_message_size) = wallaroo.kafka_parse_sink_options(args) | |
ab = wallaroo.ApplicationBuilder("Analytics Pipeline") | |
ab.new_pipeline("compute", | |
wallaroo.KafkaSourceConfig(config["kafka"]["in"], [(config["kafka"]["brokers"], "9092"),], config["kafka"]["log_level"], | |
decoder)) | |
#ab.new_pipeline("Celsius Conversion", | |
# wallaroo.TCPSourceConfig("127.0.0.1", 1234, decoder)) | |
ab.to(compute2) | |
ab.to_sink(wallaroo.TCPSinkConfig("localhost", "5555", encoder)) | |
#ab.to(add) | |
#ab.to_sink(wallaroo.KafkaSinkConfig(out_topic, out_brokers, out_log_level, | |
# out_max_produce_buffer_ms, | |
# out_max_message_size, | |
# encoder)) | |
return ab.build() | |
@wallaroo.decoder(header_length=4, length_fmt=">I") | |
def decoder(bs): | |
if len(bs) < 4: | |
return 0.0 | |
return struct.unpack('>f', bs[:4])[0] | |
@wallaroo.computation(name="compute step2") | |
def compute2(message): | |
return "xyz" | |
@wallaroo.encoder | |
def encoder(data): | |
return str(data) + "\n" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment