Created
March 5, 2021 15:08
-
-
Save ffan07039/46b8faa4e64de28739a04e398cc3362b to your computer and use it in GitHub Desktop.
a spark sql UDF that looks up memcachd with a local cache
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var localCache:scala.collection.mutable.Map[Int, Int] = null | |
// try to serialize null reference of memCachedClient to the executors | |
// for now we don't know how to close this memCachedClient inside executors | |
var memCachedClient:Any = null | |
// method runs in executors to lookup memcache, it maintains a local cache as well | |
def readPubEntity(id: Int): Int = { | |
if (localCache == null){ | |
localCache = scala.collection.mutable.Map() | |
memCachedClient = new MemcachedClient(new InetSocketAddress(configEndpoint, clusterPort)) | |
} | |
if (localCache contains id){ | |
return localCache(id) | |
} else { | |
val client = memCachedClient.asInstanceOf[MemcachedClient] | |
// memcached get could return null | |
val value = client.get(id.toString()) | |
if (value == null){ | |
val result = -1 | |
localCache(id) = result | |
result | |
} else { | |
val result = value.toString.toInt | |
localCache(id) = result | |
result | |
} | |
} | |
} | |
// function object | |
val getPubId = (passbackId: Int) => { | |
readPubEntity(passbackId) | |
} | |
// registering UDF | |
val udfGetPubId = spark.udf.register("udfGetPubId", getPubId) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment