Skip to content

Instantly share code, notes, and snippets.

@ytaras
Last active June 14, 2019 08:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ytaras/a2f7ec60ff9e6e3128a561f969baecfd to your computer and use it in GitHub Desktop.
Save ytaras/a2f7ec60ff9e6e3128a561f969baecfd to your computer and use it in GitHub Desktop.
package org.apache.beam.examples.usercount;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import java.io.Serializable;
import org.apache.beam.examples.usercount.ChangeStateTraker.ChangeResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
public class ChangeStateTraker<K, V> extends DoFn<KV<K, V>, ChangeResult<K, V>> {
public static class ChangeResult<K, V> implements Serializable {
private V previousValue;
private V newValue;
private K key;
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ChangeResult<?, ?> that = (ChangeResult<?, ?>) o;
return Objects.equal(previousValue, that.previousValue) &&
Objects.equal(newValue, that.newValue) &&
Objects.equal(key, that.key);
}
@Override
public int hashCode() {
return Objects.hashCode(previousValue, newValue, key);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("previousValue", previousValue)
.add("newValue", newValue)
.add("key", key)
.toString();
}
public V getPreviousValue() {
return previousValue;
}
public ChangeResult<K, V> setPreviousValue(V previousValue) {
this.previousValue = previousValue;
return this;
}
public V getNewValue() {
return newValue;
}
public ChangeResult<K, V> setNewValue(V newValue) {
this.newValue = newValue;
return this;
}
public K getKey() {
return key;
}
public ChangeResult<K, V> setKey(K key) {
this.key = key;
return this;
}
}
@StateId("previouslySeen")
private final StateSpec<ValueState<V>> previousValue;
public ChangeStateTraker(Coder<V> coder) {
previousValue = StateSpecs.value(coder);
}
@ProcessElement
public void process(@Element KV<K, V> element,
@StateId("previouslySeen") ValueState<V> valueState,
OutputReceiver<ChangeResult<K, V>> receiver) {
if(valueState.read() == null) {
valueState.write(element.getValue());
receiver.output(newResult(element));
} else {
if (!valueState.read().equals(element.getValue())) {
receiver.output(existingResult(valueState.read(), element));
valueState.write(element.getValue());
}
}
}
private ChangeResult<K,V> existingResult(V read, KV<K,V> element) {
return new ChangeResult<K, V>()
.setNewValue(element.getValue())
.setKey(element.getKey())
.setPreviousValue(read);
}
private ChangeResult<K,V> newResult(KV<K,V> element) {
return new ChangeResult<K, V>()
.setKey(element.getKey())
.setNewValue(element.getValue());
}
}
package org.apache.beam.examples.usercount.transformexample;
import org.apache.beam.examples.usercount.domain.ProgressEvent;
import org.apache.beam.examples.usercount.Sout;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
public class DistinctStreamsExample extends PTransform<PCollection<ProgressEvent>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<ProgressEvent> input) {
return input
.apply("Get movie id",
MapElements.into(TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.strings()
)).via(x -> KV.of(x.getUserId(), x.getTitleId())))
.apply("Assign windows", Window.into(
SlidingWindows.of(Duration.standardMinutes(10))
.every(Duration.standardSeconds(60))))
.apply("Distinct", Distinct.create())
.apply("Count", Count.perKey())
.apply("After count", ParDo.of(new Sout<>("After count")))
;
}
}
package org.apache.beam.examples.usercount.transformexample;
import org.apache.beam.examples.usercount.Sout;
import org.apache.beam.examples.usercount.domain.ProgressEvent;
import org.apache.beam.examples.usercount.domain.ProgressEventKey;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Latest;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
/**
* Return latest value for a session window with timestamp assigned to a window. If window is not
* closed in 120 seconds, emit preliminary result
*/
public class SessionWindowExample extends
PTransform<PCollection<ProgressEvent>, PCollection<ProgressEvent>> {
@Override
public PCollection<ProgressEvent> expand(PCollection<ProgressEvent> input) {
PCollection<KV<ProgressEventKey, ProgressEvent>> keyd = input
.apply("Assign Windows", Window.<ProgressEvent>into(
Sessions
.withGapDuration(Duration.standardSeconds(10)))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(120))))
.accumulatingFiredPanes()
.withOnTimeBehavior(OnTimeBehavior.FIRE_ALWAYS)
.withAllowedLateness(Duration.standardMinutes(5)))
.apply("Assign key", WithKeys
.of(ProgressEvent::createKey)
.withKeyType(TypeDescriptor.of(ProgressEventKey.class)));
keyd
.apply(Count.perKey())
.apply("Counts", ParDo.of(new Sout<>("Counts in session")))
;
return keyd
.apply("Get latest", Latest.perKey())
.apply(Values.create())
.apply("Debug", ParDo.of(new Sout<>("Latest per session")))
;
}
}
.apply(Window.into(new GlobalWindows()))
.apply("Log before state", ParDo.of(new Sout<>("Before state")))
.apply("Detect changes", ParDo.of(new ChangeStateTraker<>(
BigEndianIntegerCoder.of())))
.apply("Log after state", ParDo.of(new Sout<>("After state")))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment