Skip to content

Instantly share code, notes, and snippets.

@ffan07039
Created March 5, 2021 15:08
Show Gist options
  • Save ffan07039/46b8faa4e64de28739a04e398cc3362b to your computer and use it in GitHub Desktop.
Save ffan07039/46b8faa4e64de28739a04e398cc3362b to your computer and use it in GitHub Desktop.
a spark sql UDF that looks up memcachd with a local cache
var localCache:scala.collection.mutable.Map[Int, Int] = null
// try to serialize null reference of memCachedClient to the executors
// for now we don't know how to close this memCachedClient inside executors
var memCachedClient:Any = null
// method runs in executors to lookup memcache, it maintains a local cache as well
def readPubEntity(id: Int): Int = {
if (localCache == null){
localCache = scala.collection.mutable.Map()
memCachedClient = new MemcachedClient(new InetSocketAddress(configEndpoint, clusterPort))
}
if (localCache contains id){
return localCache(id)
} else {
val client = memCachedClient.asInstanceOf[MemcachedClient]
// memcached get could return null
val value = client.get(id.toString())
if (value == null){
val result = -1
localCache(id) = result
result
} else {
val result = value.toString.toInt
localCache(id) = result
result
}
}
}
// function object
val getPubId = (passbackId: Int) => {
readPubEntity(passbackId)
}
// registering UDF
val udfGetPubId = spark.udf.register("udfGetPubId", getPubId)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment