Skip to content

Instantly share code, notes, and snippets.

@nemosupremo
Created February 22, 2018 23:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nemosupremo/8d7d7ca00615031802f11cae5322b470 to your computer and use it in GitHub Desktop.
Save nemosupremo/8d7d7ca00615031802f11cae5322b470 to your computer and use it in GitHub Desktop.
from __future__ import division
import string
import struct
import wallaroo
from datetime import datetime
#from device_events import DeviceEvents, DeviceEventsWrapper
#from device_file import device_file
#from sitemap import dev_site_map
CONCURRENCY = 13
config = {
"kafka": {
"in": "adx-device-events-prod",
"brokers": "kafka01.infra.internal",
"log_level": "Warn"
}
}
dev_map = None
def application_setup(args):
global dev_map
dev_map = dev_site_map(None).instance
#(in_topic, in_brokers,
# in_log_level) = wallaroo.kafka_parse_source_options(args)
#(out_topic, out_brokers, out_log_level, out_max_produce_buffer_ms,
# out_max_message_size) = wallaroo.kafka_parse_sink_options(args)
ab = wallaroo.ApplicationBuilder("Analytics Pipeline")
ab.new_pipeline("compute",
wallaroo.KafkaSourceConfig(config["kafka"]["in"], [(config["kafka"]["brokers"], "9092"),], config["kafka"]["log_level"],
decoder))
#ab.new_pipeline("Celsius Conversion",
# wallaroo.TCPSourceConfig("127.0.0.1", 1234, decoder))
ab.to(compute_safe)
ab.to_state_partition(join, DeviceEventsWrapper, "analyze",
partition_a, list(range(31)))
#ab.to_state_partition(analyze, DeviceEventsWrapper, "analyze",
# partition_b, list(range(31)))
ab.to_sink(wallaroo.TCPSinkConfig("127.0.0.1", "5555", encoder))
#ab.to(add)
#ab.to_sink(wallaroo.KafkaSinkConfig(out_topic, out_brokers, out_log_level,
# out_max_produce_buffer_ms,
# out_max_message_size,
# encoder))
return ab.build()
gdec = 0
@wallaroo.decoder(header_length=4, length_fmt=">I")
def decoder(bs):
global gdec
#print bs
#dst = ks.filter(lambda x: x[0][:21] in dev_map['device_id'].values).transform(lambda x: x.flatMap(lambda y: compute(y, dev_map)))
gdec += 1
if gdec % 10 == 0:
print "Message Point: %d" % gdec
return bs
@wallaroo.computation(name="compute step2")
def compute2(message):
print message[:20]
return "xyz"
@wallaroo.computation_multi("parse")
def compute_safe(message):
#print(message)
#header = message[0]
content = "GARBAGE"
file_day = "4343"
sites = list(['a','b', 'c'])
games = list(['d','e', 'f'])
i_s = [[] for i in range(CONCURRENCY)]
for site_x in sites:
for macid in games:
k = hash((site_x,int(file_day), macid)) % CONCURRENCY
i_s[k].append(
((site_x,int(file_day), macid),
[set("a"),
set("b"),
[{ 'time_epoch': str("123"),
'device_id': str("xyz"),
'signal_strength': str("lmao")
}]]))
ret = [(list(filter(None, i)) or None) for i in i_s]
#print [len(x) if x != None else None for x in ret]
#print ret
return ret
@wallaroo.state_computation_multi("join")
def join(data, state):
if data:
#de = state.lookup(data)
pass
print data
return "xyz"
@wallaroo.partition
def partition_a(data):
#print data[0][0] % CONCURRENCY
#if len(data) == 0 || len(data[0][0]) == 0:
# return
if data == None:
return 0
return hash(data[0][0]) % CONCURRENCY
@wallaroo.partition
def partition_b(data):
return hash(data[0]) % CONCURRENCY
@wallaroo.state_computation(name="analyze")
def analyze(data, state):
de = state.update(data)
return (dict(de.rowDict), True)
@wallaroo.encoder
def encoder(data):
print data
return str(data) + "\n"
@nisanharamati
Copy link

The segfault is happening because the return type from join is incorrect.
For both state_computation and state_computation_multi the return type is a tuple of (output, True/False) where the second argument denotes whether state should be persisted (so you can avoid writing an unchanged state to the log, for example, where performance matters).
The first argument would be a single output object for state_computation, or a list of outputs for state_computation_multi.

@nisanharamati
Copy link

Specifically, instead of return "xyz" it should be return (list("xyz"), False)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment