Skip to content

Instantly share code, notes, and snippets.

@HeartSaVioR
Created May 8, 2020 15:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save HeartSaVioR/fab85734b5be85198c48f45004c8e0ca to your computer and use it in GitHub Desktop.
Save HeartSaVioR/fab85734b5be85198c48f45004c8e0ca to your computer and use it in GitHub Desktop.
Mismatched pair of getter/setter in Encoders.bean
package net.heartsavior.spark.trial;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.MapGroupsWithStateFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import java.util.Objects;
public class ProblematicApp {
public static void main(String[] args) throws StreamingQueryException {
SparkSession spark = SparkSession
.builder()
.appName("Sample")
.master("local[*]")
.getOrCreate();
Dataset df = spark.readStream()
.format("rate")
.option("rowsPerSecond", "1000")
.option("numPartitions", "1")
.load();
Dataset<Long> df2 = df.select("value").as(Encoders.LONG());
MapGroupsWithStateFunction<Long, Long, StateInfo, UpdateInfo> stateUpdateFunc =
(MapGroupsWithStateFunction<Long, Long, StateInfo, UpdateInfo>) (key, values, state) -> {
StateInfo stateObj;
long maxValue = Long.MIN_VALUE;
if (state.exists()) {
stateObj = state.get();
maxValue = stateObj.getMaxValue();
} else {
stateObj = new StateInfo();
stateObj.setKey(key);
}
while (values.hasNext()) {
long value = values.next();
if (value > maxValue) {
maxValue = value;
}
}
stateObj.setMaxValue(maxValue);
state.update(stateObj);
UpdateInfo update = new UpdateInfo();
update.setKey(stateObj.getKey());
update.setMaxValue(stateObj.getMaxValue());
return update;
};
Dataset<UpdateInfo> updates = df2.groupByKey(
(MapFunction<Long, Long>) value -> value % 10,
Encoders.LONG()
).mapGroupsWithState(
stateUpdateFunc,
Encoders.bean(StateInfo.class),
Encoders.bean(UpdateInfo.class)
);
StreamingQuery query = updates
.writeStream()
.outputMode("update")
.format("console")
.start();
query.awaitTermination();
}
public static class KeyValueRecord {
private long key;
private long value;
public long getKey() {
return key;
}
public void setKey(long key) {
this.key = key;
}
public long getValue() {
return value;
}
public void setValue(long value) {
this.value = value;
}
@Override
public String toString() {
return com.google.common.base.Objects.toStringHelper(this)
.add("key", key)
.add("value", value)
.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
KeyValueRecord that = (KeyValueRecord) o;
return key == that.key &&
value == that.value;
}
@Override
public int hashCode() {
return Objects.hash(key, value);
}
}
public static class StateInfo {
private long key;
private long maxValue;
// this
public long getATimestamp() {
// dummy method to trigger the issue when there's only a getter method
return System.currentTimeMillis();
}
public long getKey() {
return key;
}
public void setKey(long key) {
this.key = key;
}
public long getMaxValue() {
return maxValue;
}
public void setMaxValue(long maxValue) {
this.maxValue = maxValue;
}
}
public static class UpdateInfo {
private long key;
private long maxValue;
public long getKey() {
return key;
}
public void setKey(long key) {
this.key = key;
}
public long getMaxValue() {
return maxValue;
}
public void setMaxValue(long maxValue) {
this.maxValue = maxValue;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment