Skip to content

Instantly share code, notes, and snippets.

View fhueske's full-sized avatar

Fabian Hueske fhueske

View GitHub Profile
@fhueske
fhueske / UserTest.java
Last active August 18, 2020 07:49
Simple test for checking side outputs from onTimer() method.
package com.ververica;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
@fhueske
fhueske / .scala
Created June 15, 2017 13:51
Elasticsearch5 Sink
/*
* Copyright 2015 data Artisans GmbH
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@fhueske
fhueske / dgen.py
Created June 7, 2017 19:23
Simple data generator with skew
import random
numRecords = 10000000
numProds = 100
numCust = 1000
# probability that a parameter combination is in each class
freqProb = 0.1
medProb = freqProb + 0.35
// get TypeInformation
TypeInformation pType = TypeExtractor.createTypeInfo(ProtocolEvent.class);
// print TypeInformation
printTypeInfo(pType);
// prints the type information recursively
public static void printTypeInfo(TypeInformation t) {
System.out.println(t.getClass() + " : " + t.getTypeClass());
if(t instanceof CompositeType) {
System.out.println("(");
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple1<String>> stream = ...
DataStream<Tuple1<String>> filters = ...
stream
.keyBy(0)
.connect(filters.keyBy(0)) // partition both streams on same field and connect them
.flatMap(new StreamFilter()) // apply CoFlatMap function to update filters and to filter data