Skip to content

Instantly share code, notes, and snippets.

Bill Bejeck bbejeck

Block or report user

Report or block bbejeck

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
View StreamsJoinWithRepartitioning.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
View StreamsDSLAndProcessorExample.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
View ProcessorAPIExample.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
View StreamsDSLExample.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
View OptimizingStreams.java
// imports and license left out for clarity
public class OptimizedStreams {
public static void main(String[] args) {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092 ");
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
@bbejeck
bbejeck / punctuateMethodForProcessor.java
Last active Oct 18, 2017
Updating the Prediction Model
View punctuateMethodForProcessor.java
// process call left out for clarity; it places airline data
// in a list, storing in state store by airport code
@Override
public void punctuate(long timestamp) {
KeyValueIterator<String, List<String>> allFlights = flights.all();
while (allFlights.hasNext()) {
KeyValue<String, List<String>> kv = allFlights.next();
List<String> flightList = kv.value;
String key = kv.key;
if(flightList.size() >= 100) {
@bbejeck
bbejeck / predictionPredictMethod.java
Last active Sep 15, 2017
Showing the Prediction Process
View predictionPredictMethod.java
public static String predict(DataRegression dataRegression) {
try (OnlineLogisticRegression logisticRegression = new OnlineLogisticRegression()) {
FlightData flightData = new FlightData(dataRegression.data);
logisticRegression.readFields(new DataInputStream(new ByteArrayInputStream(dataRegression.coefficients)));
double prediction = logisticRegression.classifyScalar(flightData.vector);
String arrivalPrediction = prediction > 0.5 ? "on-time" : "late";
return String.format("%s predicted to be %s", new Flight(dataRegression.data), arrivalPrediction);
} catch (Exception e) {
LOG.error("Problems with predicting " + dataRegression.data, e);
return null;
@bbejeck
bbejeck / mappingValues.java
Last active Jul 25, 2019
Mapping the Values with Prediction
View mappingValues.java
dataByAirportStream.join(regressionsByAirPortTable,
(k, v) -> k,
DataRegression::new)
.mapValues(Predictor::predict)
@bbejeck
bbejeck / buildingKStreamAndGlobalKTable.java
Created Sep 15, 2017
Builds The KStream and GlobalKTable
View buildingKStreamAndGlobalKTable.java
// configuration and Serde creation left out for clarity
KStream<String, String> dataByAirportStream = builder.stream("raw-airline-data");
GlobalKTable<String, byte[]> regressionsByAirPortTable = builder.globalTable(Serdes.String(),
byteArraySerde,
"onlineRegression-by-airport");
// stream reads raw data joins with coefficients then makes prediction
dataByAirportStream.join(regressionsByAirPortTable,
(k, v) -> k,
DataRegression::new)
@bbejeck
bbejeck / dataByAirportStream.java
Created Sep 15, 2017
Kafka Streams Topology for online predictions
View dataByAirportStream.java
dataByAirportStream.join(regressionsByAirPortTable,(k, v) -> k, DataRegression::new)
.mapValues(Predictor::predict)
.filter((k, v) -> v != null)
.peek((k, v) -> System.out.println("Prediction " + v))
.to("predictions");
You can’t perform that action at this time.