-
-
Save nemosupremo/8d7d7ca00615031802f11cae5322b470 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
import string | |
import struct | |
import wallaroo | |
from datetime import datetime | |
#from device_events import DeviceEvents, DeviceEventsWrapper | |
#from device_file import device_file | |
#from sitemap import dev_site_map | |
CONCURRENCY = 13 | |
config = { | |
"kafka": { | |
"in": "adx-device-events-prod", | |
"brokers": "kafka01.infra.internal", | |
"log_level": "Warn" | |
} | |
} | |
dev_map = None | |
def application_setup(args): | |
global dev_map | |
dev_map = dev_site_map(None).instance | |
#(in_topic, in_brokers, | |
# in_log_level) = wallaroo.kafka_parse_source_options(args) | |
#(out_topic, out_brokers, out_log_level, out_max_produce_buffer_ms, | |
# out_max_message_size) = wallaroo.kafka_parse_sink_options(args) | |
ab = wallaroo.ApplicationBuilder("Analytics Pipeline") | |
ab.new_pipeline("compute", | |
wallaroo.KafkaSourceConfig(config["kafka"]["in"], [(config["kafka"]["brokers"], "9092"),], config["kafka"]["log_level"], | |
decoder)) | |
#ab.new_pipeline("Celsius Conversion", | |
# wallaroo.TCPSourceConfig("127.0.0.1", 1234, decoder)) | |
ab.to(compute_safe) | |
ab.to_state_partition(join, DeviceEventsWrapper, "analyze", | |
partition_a, list(range(31))) | |
#ab.to_state_partition(analyze, DeviceEventsWrapper, "analyze", | |
# partition_b, list(range(31))) | |
ab.to_sink(wallaroo.TCPSinkConfig("127.0.0.1", "5555", encoder)) | |
#ab.to(add) | |
#ab.to_sink(wallaroo.KafkaSinkConfig(out_topic, out_brokers, out_log_level, | |
# out_max_produce_buffer_ms, | |
# out_max_message_size, | |
# encoder)) | |
return ab.build() | |
gdec = 0 | |
@wallaroo.decoder(header_length=4, length_fmt=">I") | |
def decoder(bs): | |
global gdec | |
#print bs | |
#dst = ks.filter(lambda x: x[0][:21] in dev_map['device_id'].values).transform(lambda x: x.flatMap(lambda y: compute(y, dev_map))) | |
gdec += 1 | |
if gdec % 10 == 0: | |
print "Message Point: %d" % gdec | |
return bs | |
@wallaroo.computation(name="compute step2") | |
def compute2(message): | |
print message[:20] | |
return "xyz" | |
@wallaroo.computation_multi("parse") | |
def compute_safe(message): | |
#print(message) | |
#header = message[0] | |
content = "GARBAGE" | |
file_day = "4343" | |
sites = list(['a','b', 'c']) | |
games = list(['d','e', 'f']) | |
i_s = [[] for i in range(CONCURRENCY)] | |
for site_x in sites: | |
for macid in games: | |
k = hash((site_x,int(file_day), macid)) % CONCURRENCY | |
i_s[k].append( | |
((site_x,int(file_day), macid), | |
[set("a"), | |
set("b"), | |
[{ 'time_epoch': str("123"), | |
'device_id': str("xyz"), | |
'signal_strength': str("lmao") | |
}]])) | |
ret = [(list(filter(None, i)) or None) for i in i_s] | |
#print [len(x) if x != None else None for x in ret] | |
#print ret | |
return ret | |
@wallaroo.state_computation_multi("join") | |
def join(data, state): | |
if data: | |
#de = state.lookup(data) | |
pass | |
print data | |
return "xyz" | |
@wallaroo.partition | |
def partition_a(data): | |
#print data[0][0] % CONCURRENCY | |
#if len(data) == 0 || len(data[0][0]) == 0: | |
# return | |
if data == None: | |
return 0 | |
return hash(data[0][0]) % CONCURRENCY | |
@wallaroo.partition | |
def partition_b(data): | |
return hash(data[0]) % CONCURRENCY | |
@wallaroo.state_computation(name="analyze") | |
def analyze(data, state): | |
de = state.update(data) | |
return (dict(de.rowDict), True) | |
@wallaroo.encoder | |
def encoder(data): | |
print data | |
return str(data) + "\n" |
Specifically, instead of return "xyz"
it should be return (list("xyz"), False)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The segfault is happening because the return type from
join
is incorrect.For both
state_computation
andstate_computation_multi
the return type is a tuple of(output, True/False)
where the second argument denotes whether state should be persisted (so you can avoid writing an unchanged state to the log, for example, where performance matters).The first argument would be a single output object for
state_computation
, or a list of outputs forstate_computation_multi
.