Skip to content

Instantly share code, notes, and snippets.

@hecvd
Last active April 18, 2024 09:26
Show Gist options
  • Save hecvd/0cb04903289988224fdd2a0120167dba to your computer and use it in GitHub Desktop.
Save hecvd/0cb04903289988224fdd2a0120167dba to your computer and use it in GitHub Desktop.
Building a reusable Data Model for Approximate String Matching with Spark

Building a reusable Data Model for Approximate String Matching with Spark

Context

At the Paid Search team in GetYourGuide, we manage a collection of over 30 million active digital ads to be displayed in every mayor ads provider in each of our supported 19 languages to announce thousands of activities across the globe. We take our motto "Find the best things to do wherever you’re going" very seriously. Whenever someone inputs a query in their search engine of preference, we make a huge effort to accomplish 2 goals:

  1. being as descriptive as possible
  2. being as contextually relevant to our user inputs as possible.

Our ads inventory helps us with the former and for the latter we need appropriate keywords, which are either a word or phrase that describe closely enough our product but from the user's perspective. Basically, we make assumptions ahead of time to identify what particular words our users input when trying to look for something we have an ad for. So far we've made about 24 million "assumptions". My team spends a lot of time looking for innovative ways to manage, organize, optimize, test, and, more importantly improve and expand our keywords at scale.

Even though that it might appear like we have a rather sufficient amount of keywords, we have discovered there's a never ending process to try and catch all the immense complexity of human search queries from many sources in order to improve our relevance and reach. We have many keywords that refer to a landmark, for example "Eiffel Tower" or to a place like "Cancún" but those are very generic assumptions, you might be looking for many things rather than visiting those places. To add an intent, we add other words like "visit" or "things to do in". Maybe there are specific things you might want to do at the place in our inventory, like taking a tour or buy tickets so we add them too. Doing that improves the relevance of our ads significantly.

Having made all these assumptions we can cover a good range of possible search terms, however, they leave out queries that we might have not considered or even thought of at first, e.g. "dinner cruise cancún with kids", which is something that an actual person typed and not just an assumption. By observing this variations we can catch a lot of what our users search for, identify patterns that might be applicable in other locations with similar activities, and show up our ads exactly when they are relevant.

To catch those queries we use a few sources like Google's Search Terms Report, which also gives metadata of specific search terms and how they were matched to our more broad, current inventory and how they performed. However, how can we identify those patterns and make decisions of what to include or exclude from that report at scale? How can we avoid adding keywords that are way too similar to our current inventory (like only having one letter removed or having the same meaning and intent but with the words in different order)? What about searches that landed on our inventory by accident and it's better to ignore them? Our Marketing Managers are able to take all those decisions, but at our scale it would be a huge endeavour to go through a report of search terms of millions of rows, so we came up with a scheme to maximize possible impact of the contents of said report.

Fuzzy Matching with Levenshtein distance. Does it scale?

Our fist instinct was to find everything that closely resembles our inventory but is not exactly equal using an algorithm named "Levenshtein distance", which compares 2 strings and calculates the "edit distance" between them, i.e. the number of changes the target string needs to be equal to the source, 0 means it is equal.

Spark's implementation is quite simple and doesn't allow more advanced features like sorting the words of the keyword or keeping a set of all the words used (unlike this implementation).

For our use case, however, it seemed as good enough at first glance, as we could just implement missing features using a UDF.

Let's see a quick example:

import org.apache.spark.sql.functions.{col, levenshtein}

val keywords = Seq(
  "cancun",
  "rowling",
  "peter parker")
.toDF("keyword")

val searchTerms = Seq(
  "carcur",
  "bowling",
  "parker peter "
).toDF("target")

keywords
  .crossJoin(searchTerms)
  .withColumn("distance", levenshtein(col("keyword"), col("target")))
  .show(truncate = false)


/* Result:

+------------+------------+--------+
|keyword     |target      |distance|
+------------+------------+--------+
|cancun      |carcur      |2       |
|cancun      |bowling     |6       |
|cancun      |peter parker|11      |
|rowling     |carcur      |7       |
|rowling     |bowling     |1       |
|rowling     |peter parker|11      |
|peter parker|carcur      |9       |
|peter parker|bowling     |12      |
|peter parker|peter parker|0       |
+------------+------------+--------+

*/

After trying this approach we noticed 3 things:

  1. To find outliers we need to make a cross join to all search terms that don't have exact equality with our keyword inventory, which means at least 24 million comparisons if there's only one new search term to consider.

  2. The distance between words, rather unintuitively, the distance metric is not granular enough to express similarity: The algorithm looks for changes on individual chars, however, to recognize text we humans look for groups of chars, so it would be better if we looked at how n-grams composed of keywords and search terms compare, which is unfortunately not available in the spark implementation.

  3. Levenshtein's implementation in spark is not cheap to run. We did some efforts to reduce the number of comparisons but still we came up with about a runtime of 60 hours for only a few thousand entries. We considered doing the comparisons in code to avoid a full cross join, as we could discard everything bellow the threshold distance we defined, but it was too much effort compared with the alternatives.

So, what now?

Locality-Sensitive hashing (LSH) to the rescue.

LSH, is part of Spark's MLlib, and allows to do Approximate Similarity Joins (i.e. fuzzy matching search) using multiple algorithms, including MinHash which is used very often for NLP applications, and has linear time complexity. Using it dramatically reduced our run time of ~60 hours to ~4 hours. The only downside is that it might return a few false positives, however, the impact of this can be largely reduced using the correct params.

MinHash compares how similar from each other are two binary sparse vector sets. So, first we need to transform our keywords inventory into vectors.

Step 0: Load the starting data.

val keywords = loadKeywords(...)

keywords.show(truncate = false)

/* Keyword inventory

+---------------------------------------+
|keyword                                |
+---------------------------------------+
|things to do in Acapulco               |
|visit Acapulco                         |
|Acapulco tour                          |
|things to do in Agua Azul Waterfalls   |
|visit Agua Azul Waterfalls             |
|Agua Azul Waterfalls tour              |
|things to do in Akumal                 |
|visit Akumal                           |
|Akumal tour                            |
|visit Basilica of Our Lady of Guadalupe|
|Basilica of Our Lady of Guadalupe tour |
|things to do in Bonampak Ruins         |
|tickets Bonampak Ruins                 |
|visit Bonampak Ruins                   |
|Bonampak Ruins tour                    |
|things to do in Cabo San Lucas         |
|visit Cabo San Lucas                   |
|Cabo San Lucas tour                    |
|things to do in Campeche               |
|visit Campeche                         |
+---------------------------------------+
only showing top 20 rows

*/

Step 1: Separate words into NGrams

As we want to have granularity to groups of chars, we separate them into tri-grams

import org.apache.spark.ml.feature.{RegexTokenizer, NGram}

val tokenizer = new RegexTokenizer()
  .setInputCol("keywords")
  .setOutputCol("tokens")
  .setToLowercase(true)
  .setPattern("")

val trigrams = new NGram()
  .setN(3)
  .setInputCol("tokens")
  .setOutputCol("tri-grams")

val keywordsTokenized = tokenizer.transform(keywords)

val keywordTrigrams = trigrams.transform(keywordsTokenized)

keywordTrigrams.show()

/*
+--------------------+--------------------+--------------------+
|             keyword|              tokens|           tri-grams|
+--------------------+--------------------+--------------------+
|things to do in A...|[t, h, i, n, g, s...|[t h i, h i n, i ...|
|      visit Acapulco|[v, i, s, i, t,  ...|[v i s, i s i, s ...|
|       Acapulco tour|[a, c, a, p, u, l...|[a c a, c a p, a ...|
|things to do in A...|[t, h, i, n, g, s...|[t h i, h i n, i ...|
|visit Agua Azul W...|[v, i, s, i, t,  ...|[v i s, i s i, s ...|
|Agua Azul Waterfa...|[a, g, u, a,  , a...|[a g u, g u a, u ...|
|things to do in A...|[t, h, i, n, g, s...|[t h i, h i n, i ...|
|        visit Akumal|[v, i, s, i, t,  ...|[v i s, i s i, s ...|
|         Akumal tour|[a, k, u, m, a, l...|[a k u, k u m, u ...|
|visit Basilica of...|[v, i, s, i, t,  ...|[v i s, i s i, s ...|
|Basilica of Our L...|[b, a, s, i, l, i...|[b a s, a s i, s ...|
|things to do in B...|[t, h, i, n, g, s...|[t h i, h i n, i ...|
|tickets Bonampak ...|[t, i, c, k, e, t...|[t i c, i c k, c ...|
|visit Bonampak Ruins|[v, i, s, i, t,  ...|[v i s, i s i, s ...|
| Bonampak Ruins tour|[b, o, n, a, m, p...|[b o n, o n a, n ...|
|things to do in C...|[t, h, i, n, g, s...|[t h i, h i n, i ...|
|visit Cabo San Lucas|[v, i, s, i, t,  ...|[v i s, i s i, s ...|
| Cabo San Lucas tour|[c, a, b, o,  , s...|[c a b, a b o, b ...|
|things to do in C...|[t, h, i, n, g, s...|[t h i, h i n, i ...|
|      visit Campeche|[v, i, s, i, t,  ...|[v i s, i s i, s ...|
+--------------------+--------------------+--------------------+
*/

Step 2: Convert NGrams to binary sparse vectors

We use HashingTF instead of CountVectorizer because it doesn't need to ingest the data to extract vocabulary, it only transforms it with a hashing function. The HashinTF uses internally a table to keep track of the hash numbers the algorithm uses, the larger the size of the table the fewer collitions, fewer collitions means more sensitivity for the result. However if it is too large it might require way more resources. We use a sufficiently large number to avoid collitions but not more than we need. The default is 2^20^. If you use a smaller set of data you might reduce the number of features to suit your needs.

We also need to remove all vectors with empty articles, otherwise MinHash will break.

import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.ml.linalg.Vector


val vectorizer = new HashingTF()
  .setInputCol("tri-grams")
  .setOutputCol("vectors")
  .setNumFeatures(2 << 20)


val invalidVectorRemover: UserDefinedFunction = udf({ v: Vector =>
    v.numNonzeros > 0
  }, DataTypes.BooleanType)


val vectorizedKeywords = vectorizer
  .transform(keywordTrigrams)
  .filter(invalidVectorRemover(col("vectors")))

Step 3: Fit the LSH Model!

We use 4 hash tables for MinHashLSH operations. The number of tables improves accuracy but also increase communication cost at runtime. The model first needs to be fit with the vectors before it can transform them into hash values used for the fuzzy match.

import org.apache.spark.ml.feature.MinHashLSH

val minHashLSH = new MinHashLSH()
  .setInputCol("vectors")
  .setOutputCol("lsh")
  .setNumHashTables(4)

val model = minHashLSH.fit(vectorizedKeywords)

model
.transform(vectorizedKeywords)
.show()

/*
+--------------------+--------------------+--------------------+
|             keyword|             vectors|                 lsh|
+--------------------+--------------------+--------------------+
|things to do in A...|(2097152,[206019,...|[[7.2576143E7], [...|
|      visit Acapulco|(2097152,[35902,4...|[[7.2576143E7], [...|
|       Acapulco tour|(2097152,[69673,5...|[[7.2576143E7], [...|
|things to do in A...|(2097152,[206019,...|[[9232508.0], [34...|
|visit Agua Azul W...|(2097152,[35902,5...|[[9232508.0], [34...|
|Agua Azul Waterfa...|(2097152,[69673,5...|[[9232508.0], [2....|
|things to do in A...|(2097152,[200203,...|[[1.15904996E8], ...|
|        visit Akumal|(2097152,[35902,2...|[[3.57158387E8], ...|
|         Akumal tour|(2097152,[69673,5...|[[3.57158387E8], ...|
|visit Basilica of...|(2097152,[35902,4...|[[3.1436822E7], [...|
|Basilica of Our L...|(2097152,[45640,6...|[[3.1436822E7], [...|
|things to do in B...|(2097152,[12528,6...|[[1.15904996E8], ...|
|tickets Bonampak ...|(2097152,[12528,6...|[[1.00243949E8], ...|
|visit Bonampak Ruins|(2097152,[12528,3...|[[4.26830719E8], ...|
| Bonampak Ruins tour|(2097152,[12528,6...|[[4.26830719E8], ...|
|things to do in C...|(2097152,[206019,...|[[1.15904996E8], ...|
|visit Cabo San Lucas|(2097152,[35902,3...|[[4.41644288E8], ...|
| Cabo San Lucas tour|(2097152,[69673,3...|[[4.41644288E8], ...|
|things to do in C...|(2097152,[39171,9...|[[1.15904996E8], ...|
|      visit Campeche|(2097152,[35902,3...|[[1.86233861E8], ...|
+--------------------+--------------------+--------------------+
*/

Step 4: Creating a reusable Model using a Pipeline.

Now we have a Dataframe and a fitted model that can do approximate similarity joins, however we would need to run the same steps to any other Dataframe model we want to use for the join.

There's a way to do all that process in one step if we create a pipeline, that will allow us to apply all the steps sequentially to a Dataframe and also save a fitted model ready for similarity joins to disk. This is very convenient, because fitting the model might take a considerable amount of time if it has to ingest too much data.

So we are going to need 2 classes:

BEWARE:

  • If the data changes too much it would be better to fit the model every time for better results.

  • If you want to run this code in Databricks, this code must be compiled as a jar and uploaded as a library. Otherwise the loadModel function will not run.

// CustomModel.scala

import org.apache.spark.ml.feature._
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.sql.DataFrame

object CustomModel {

  val MODEL_PATH = "PATH_TO_MODEL"



  final private val hashNumFeatures = 2 << 20
  final private val numMinHashTables = 4

  def setupStages(targetColumn: String): Array[PipelineStage] = {

    val tokenizer = new RegexTokenizer()
      .setInputCol(targetColumn)
      .setOutputCol("tokens")
      .setToLowercase(true)
      .setPattern("")

    val trigrams = new NGram()
      .setN(3)
      .setInputCol(tokenizer.getOutputCol)
      .setOutputCol(s"tri-grams")

    val vectorizer = new HashingTF()
      .setInputCol(trigrams.getOutputCol)
      .setOutputCol("vectors")
      .setNumFeatures(hashNumFeatures)

    val vectorRemover = new EmptyVectorsRemover()
      .setInputCol(vectorizer.getOutputCol)

    val minHashLSH = new MinHashLSH()
      .setInputCol(vectorizer.getOutputCol)
      .setOutputCol("lsh")
      .setNumHashTables(numMinHashTables)

    Array(
      tokenizer,
      trigrams,
      vectorizer,
      vectorRemover,
      minHashLSH
    )
  }

  def fit(targetColumn: String, dataFrame: DataFrame): PipelineModel = {
    new Pipeline()
      .setStages(setupStages(targetColumn))
      .fit(dataFrame)
  }

  def saveModel(pipeline: PipelineModel): Unit = {
    pipeline.write
      .overwrite()
      .save(MODEL_PATH)
  }


  def loadModel(): PipelineModel = {
    PipelineModel.load(MODEL_PATH)
  }
}

And, since we don't have a feature to remove empty vectors, we need to create a custom one to wrap our UDF

// EmptyVectorsRemover.scala

import org.apache.spark.ml.Transformer
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.param.{Param, ParamMap}
import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf}
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{DataFrame, Dataset}


class EmptyVectorsRemover(override val uid: String) extends Transformer with DefaultParamsWritable {
  def this() = this(Identifiable.randomUID("removeemptyvectors"))

  final val inputCol = new Param[String](this, "inputCol", "Vector Column to filter")

  def setInputCol(value: String): this.type = set(inputCol, value)

  setDefault(inputCol, "vectors")

  val invalidVectorRemover: UserDefinedFunction = udf({ v: Vector =>
    v.numNonzeros > 0
  }, DataTypes.BooleanType)

  def transform(df: Dataset[_]): DataFrame = {
    df.select(col("*")).filter(invalidVectorRemover(col($(inputCol))))
  }

  def copy(extra: ParamMap): EmptyVectorsRemover = {
    defaultCopy(extra)
  }

  def transformSchema(schema: StructType): StructType = {
    schema
  }
}

object EmptyVectorsRemover extends DefaultParamsReadable[EmptyVectorsRemover] {
  override def load(path: String): EmptyVectorsRemover = super.load(path)
}

Step 5: Approximate Similarity Join!

Now we can use the pipeline to transform any dataframe we want easily and load from disk a model to make our similarity joins.

The join has a sensitivity threshold nicknamed "confidence". The scale goes from 0 to 1, 0 being equal to the source and the closer proximity to 1 the bigger the difference.

val keywords = loadKeywords(...)
val pipelineModel = CustomModel.fit("keyword", keywords)

// NOTE: Remember to upload code as a library if you want to save and load a fitted model!
CustomModel.saveModel(pipelineModel)

val model = CustomModel.loadModel()

val inputs = Seq(
  "murales Diego Rivera",
  "Cathedral",
  "visit cancun with kids",
  "acapul"
).toDF("keyword")

val keywords = loadKeywords(...)

val join = model
.stages
.last
.asInstanceOf[MinHashLSHModel]
.approxSimilarityJoin(
  model.transform(keywords),
  model.transform(inputs),
  0.8,
  distCol = "confidence"
)
.select(
  col("datasetA.keyword").alias("source"),
  col("datasetB.keyword").alias("target"),
  col("confidence")
).orderBy("confidence")
.show(truncate = false)

/*

+-------------------------------------------------------------------+----------------------+------------------+
|source                                                             |target                |confidence        |
+-------------------------------------------------------------------+----------------------+------------------+
|visit Cancun                                                       |visit cancun with kids|0.5               |
|Acapulco tour                                                      |acapul                |0.6363636363636364|
|visit Acapulco                                                     |acapul                |0.6666666666666667|
|Murales De Diego Rivera En La Secretaria De Educacion Publica tour |murales Diego Rivera  |0.7096774193548387|
|visit Murales De Diego Rivera En La Secretaria De Educacion Publica|murales Diego Rivera  |0.7142857142857143|
|Cancun tour                                                        |visit cancun with kids|0.7916666666666666|
|visit Metropolitan Cathedral in Mexico City                        |Cathedral             |0.825             |
|Metropolitan Cathedral in Mexico City tour                         |Cathedral             |0.825             |
|things to do in Metropolitan Cathedral in Mexico City              |Cathedral             |0.851063829787234 |
|things to do in Cancun                                             |visit cancun with kids|0.8571428571428572|
+-------------------------------------------------------------------+----------------------+------------------+

*/

Conclusion

Using minHashLSH might look a bit daunting at first as it has seemingly many steps, however setting it up once understood it's quite simple, using it is straight forward, and gives striking performance in return.

The final step is removing all the stuff that has a confidence level close to 0, i.e all search terms that are already too similar to our inventory, so we can focus on an upper bound what is performant according to the Search Term Report.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment