Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save ebernhardson/375a74924179cba7ea351c21b5d05fc2 to your computer and use it in GitHub Desktop.
Save ebernhardson/375a74924179cba7ea351c21b5d05fc2 to your computer and use it in GitHub Desktop.
public Dataset<Row> buildPairsForM0Prep(Dataset<Row> df, Dataset<Row> dfOld, GlentParams params) {
dfOld = dfOld
.where(col("part").equalTo(params.glentDfM0PrepPartOld)) // limit to previous portion of M0Prep dataframe
.drop(col("part"));
Column oldTsCondition = null;
if (dfOld.isEmpty()) {
oldTsCondition = lit(true);
} else {
Row[] oldTsRows = dfOld.agg(max("q1_ts").alias("tsmax")).collect();
int fieldIdx = oldTsRows[0].fieldIndex("q1_ts");
oldTsCondition = col("q2.ts").gt(oldTsRows[0].getInt(fieldIdx)); // latest timestamp of the records in previous portion of M0Prep
}
// to avoid double counting
df = df
.withColumn("suggCount", lit(1))
.withColumn("part", lit(params.glentDfM0PrepPartNew))
.where(oldTsCondition.and(col("q1.ts").gt(params.logTsFrom)).and(col("q1.ts").lt(params.logTsTo)))
.select(col("q1.query").alias("q1_query"), col("q1.queryNorm").alias("q1_queryNorm"), col("q1.queryType").alias("q1_queryType"),
col("q1.wikiid").alias("q1_wikiid"), col("q1.lang").alias("q1_lang"), col("q1.source").alias("q1_source"),
col("q2.query").alias("q2_query"), col("q2.queryNorm").alias("q2_queryNorm"), col("q2.queryType").alias("q2_queryType"),
col("q2.wikiid").alias("q2_wikiid"), col("q2.lang").alias("q2_lang"), col("q2.source").alias("q2_source"),
col("q1.ts").alias("q1_ts"), col("q1.hitsTotal").alias("q1_hitsTotal"), col("q2.hitsTotal").alias("q2_hitsTotal"),
col("q1q2LevenDist"), col("suggCount"), col("part"));
if (!dfOld.isEmpty())
df = df.union(dfOld); // add previous M0Prep
// creates new M0Prep
return df
.groupBy("q1_query", "q1_queryNorm", "q1_queryType", "q1_wikiid", "q1_source",
"q2_query", "q2_queryNorm", "q2_queryType", "q2_wikiid", "q2_source", "q1q2LevenDist")
.agg(
max("q1_ts").alias("q1_ts"),
max("q1_hitsTotal").alias("q1_hitsTotal"),
max("q2_hitsTotal").alias("q2_hitsTotal"),
sum("suggCount").alias("suggCount"))
.select(col("q1_query"), col("q1_queryNorm"), col("q1_queryType"), col("q1_wikiid"), col("q1_lang"), col("q1_source"),
col("q2_query"), col("q2_queryNorm"), col("q2_queryType"), col("q2_wikiid"), col("q2_lang"), col("q2_source"),
col("q1_ts"), col("q1_hitsTotal"), col("q2_hitsTotal"), col("q1q2LevenDist"), col("suggCount"), col("part"));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment