Last active
June 14, 2019 08:15
-
-
Save ytaras/a2f7ec60ff9e6e3128a561f969baecfd to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.beam.examples.usercount; | |
import com.google.common.base.MoreObjects; | |
import com.google.common.base.Objects; | |
import java.io.Serializable; | |
import org.apache.beam.examples.usercount.ChangeStateTraker.ChangeResult; | |
import org.apache.beam.sdk.coders.AvroCoder; | |
import org.apache.beam.sdk.coders.Coder; | |
import org.apache.beam.sdk.coders.DefaultCoder; | |
import org.apache.beam.sdk.state.StateSpec; | |
import org.apache.beam.sdk.state.StateSpecs; | |
import org.apache.beam.sdk.state.ValueState; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.values.KV; | |
public class ChangeStateTraker<K, V> extends DoFn<KV<K, V>, ChangeResult<K, V>> { | |
public static class ChangeResult<K, V> implements Serializable { | |
private V previousValue; | |
private V newValue; | |
private K key; | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) { | |
return true; | |
} | |
if (o == null || getClass() != o.getClass()) { | |
return false; | |
} | |
ChangeResult<?, ?> that = (ChangeResult<?, ?>) o; | |
return Objects.equal(previousValue, that.previousValue) && | |
Objects.equal(newValue, that.newValue) && | |
Objects.equal(key, that.key); | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hashCode(previousValue, newValue, key); | |
} | |
@Override | |
public String toString() { | |
return MoreObjects.toStringHelper(this) | |
.add("previousValue", previousValue) | |
.add("newValue", newValue) | |
.add("key", key) | |
.toString(); | |
} | |
public V getPreviousValue() { | |
return previousValue; | |
} | |
public ChangeResult<K, V> setPreviousValue(V previousValue) { | |
this.previousValue = previousValue; | |
return this; | |
} | |
public V getNewValue() { | |
return newValue; | |
} | |
public ChangeResult<K, V> setNewValue(V newValue) { | |
this.newValue = newValue; | |
return this; | |
} | |
public K getKey() { | |
return key; | |
} | |
public ChangeResult<K, V> setKey(K key) { | |
this.key = key; | |
return this; | |
} | |
} | |
@StateId("previouslySeen") | |
private final StateSpec<ValueState<V>> previousValue; | |
public ChangeStateTraker(Coder<V> coder) { | |
previousValue = StateSpecs.value(coder); | |
} | |
@ProcessElement | |
public void process(@Element KV<K, V> element, | |
@StateId("previouslySeen") ValueState<V> valueState, | |
OutputReceiver<ChangeResult<K, V>> receiver) { | |
if(valueState.read() == null) { | |
valueState.write(element.getValue()); | |
receiver.output(newResult(element)); | |
} else { | |
if (!valueState.read().equals(element.getValue())) { | |
receiver.output(existingResult(valueState.read(), element)); | |
valueState.write(element.getValue()); | |
} | |
} | |
} | |
private ChangeResult<K,V> existingResult(V read, KV<K,V> element) { | |
return new ChangeResult<K, V>() | |
.setNewValue(element.getValue()) | |
.setKey(element.getKey()) | |
.setPreviousValue(read); | |
} | |
private ChangeResult<K,V> newResult(KV<K,V> element) { | |
return new ChangeResult<K, V>() | |
.setKey(element.getKey()) | |
.setNewValue(element.getValue()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.beam.examples.usercount.transformexample; | |
import org.apache.beam.examples.usercount.domain.ProgressEvent; | |
import org.apache.beam.examples.usercount.Sout; | |
import org.apache.beam.sdk.transforms.Count; | |
import org.apache.beam.sdk.transforms.Distinct; | |
import org.apache.beam.sdk.transforms.MapElements; | |
import org.apache.beam.sdk.transforms.PTransform; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.windowing.SlidingWindows; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TypeDescriptors; | |
import org.joda.time.Duration; | |
public class DistinctStreamsExample extends PTransform<PCollection<ProgressEvent>, PCollection<KV<String, Long>>> { | |
@Override | |
public PCollection<KV<String, Long>> expand(PCollection<ProgressEvent> input) { | |
return input | |
.apply("Get movie id", | |
MapElements.into(TypeDescriptors.kvs( | |
TypeDescriptors.strings(), | |
TypeDescriptors.strings() | |
)).via(x -> KV.of(x.getUserId(), x.getTitleId()))) | |
.apply("Assign windows", Window.into( | |
SlidingWindows.of(Duration.standardMinutes(10)) | |
.every(Duration.standardSeconds(60)))) | |
.apply("Distinct", Distinct.create()) | |
.apply("Count", Count.perKey()) | |
.apply("After count", ParDo.of(new Sout<>("After count"))) | |
; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.beam.examples.usercount.transformexample; | |
import org.apache.beam.examples.usercount.Sout; | |
import org.apache.beam.examples.usercount.domain.ProgressEvent; | |
import org.apache.beam.examples.usercount.domain.ProgressEventKey; | |
import org.apache.beam.sdk.transforms.Count; | |
import org.apache.beam.sdk.transforms.Latest; | |
import org.apache.beam.sdk.transforms.PTransform; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.Values; | |
import org.apache.beam.sdk.transforms.WithKeys; | |
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; | |
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; | |
import org.apache.beam.sdk.transforms.windowing.Sessions; | |
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TypeDescriptor; | |
import org.joda.time.Duration; | |
/** | |
* Return latest value for a session window with timestamp assigned to a window. If window is not | |
* closed in 120 seconds, emit preliminary result | |
*/ | |
public class SessionWindowExample extends | |
PTransform<PCollection<ProgressEvent>, PCollection<ProgressEvent>> { | |
@Override | |
public PCollection<ProgressEvent> expand(PCollection<ProgressEvent> input) { | |
PCollection<KV<ProgressEventKey, ProgressEvent>> keyd = input | |
.apply("Assign Windows", Window.<ProgressEvent>into( | |
Sessions | |
.withGapDuration(Duration.standardSeconds(10))) | |
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW) | |
.triggering( | |
AfterWatermark.pastEndOfWindow() | |
.withEarlyFirings( | |
AfterProcessingTime.pastFirstElementInPane() | |
.plusDelayOf(Duration.standardSeconds(120)))) | |
.accumulatingFiredPanes() | |
.withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS) | |
.withAllowedLateness(Duration.standardMinutes(5))) | |
.apply("Assign key", WithKeys | |
.of(ProgressEvent::createKey) | |
.withKeyType(TypeDescriptor.of(ProgressEventKey.class))); | |
keyd | |
.apply(Count.perKey()) | |
.apply("Counts", ParDo.of(new Sout<>("Counts in session"))) | |
; | |
return keyd | |
.apply("Get latest", Latest.perKey()) | |
.apply(Values.create()) | |
.apply("Debug", ParDo.of(new Sout<>("Latest per session"))) | |
; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
.apply(Window.into(new GlobalWindows())) | |
.apply("Log before state", ParDo.of(new Sout<>("Before state"))) | |
.apply("Detect changes", ParDo.of(new ChangeStateTraker<>( | |
BigEndianIntegerCoder.of()))) | |
.apply("Log after state", ParDo.of(new Sout<>("After state"))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment