Skip to content

Instantly share code, notes, and snippets.

@maxpert
Created October 5, 2020 19:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maxpert/73afa69bf26fba38a44761594410b7cc to your computer and use it in GitHub Desktop.
Save maxpert/73afa69bf26fba38a44761594410b7cc to your computer and use it in GitHub Desktop.
Redis PING Implementation for JGroups
package luna.lib
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisException
import io.lettuce.core.cluster.RedisClusterClient
import io.lettuce.core.cluster.api.sync.RedisClusterCommands
import org.jgroups.Address
import org.jgroups.annotations.MBean
import org.jgroups.annotations.Property
import org.jgroups.conf.ClassConfigurator
import org.jgroups.protocols.Discovery
import org.jgroups.protocols.FILE_PING
import org.jgroups.protocols.PingData
import org.jgroups.util.Responses
import java.util.Base64
@MBean(description = "Redis based discovery protocol")
open class REDIS_PING : FILE_PING() {
companion object {
private val base64Encoder = Base64.getEncoder()
private val base64Decoder = Base64.getDecoder()
init {
ClassConfigurator.addProtocol(2200, REDIS_PING::class.java)
}
}
@Property(description = "URL of Redis client")
protected var url: String = "redis://localhost:6379/0"
@Property(description = "Is redis cluster")
protected var cluster: Boolean = false
@Property(description = "Name of the group of pods")
protected var group: String = "jgroups"
@Property(description = "Don't allow cluster to operate without redis instance")
protected var strict: Boolean = false
private var clusterMembers: Map<String, String> = mapOf()
protected val redisCommands: RedisClusterCommands<String, String> by lazy {
if (cluster)
RedisClusterClient.create(url).connect().sync()
else
RedisClient.create(url).connect().sync()
}
override fun init() {
super.init()
failOpen {
redisCommands.ping()
}
}
override fun write(data: MutableList<PingData>, clustername: String) {
data.forEach { d ->
val serData = serializeWithoutView(d).toBase64()
failOpen {
redisCommands.hset(clustername, Discovery.addressAsString(d.address), serData)
}
}
}
override fun removeAll(clustername: String?) {
if (clustername == null) {
return
}
failOpen { redisCommands.del(clustername) }
}
override fun remove(clustername: String?, addr: Address?) {
if (clustername == null || addr == null) {
return
}
failOpen { redisCommands.hdel(clustername, Discovery.addressAsString(addr)) }
}
override fun readAll(members: MutableList<Address>?, clustername: String, responses: Responses) {
clusterMembers = failOpen (mapOf()) { redisCommands.hgetall(clustername) }
for ((ownAddr, serData) in clusterMembers) {
val data = try {
Discovery.deserialize(serData.fromBase64())
} catch (e: Throwable) {
log.error("Unable to read member %s %s; error: %s", local_addr, ownAddr, e)
null
}
if (data == null) {
failOpen {
redisCommands.hdel(clustername, ownAddr)
}
continue
}
if (members != null && !members.contains(data.address)) {
continue
}
responses.addResponse(data, false)
if (local_addr != null && local_addr != data.address) {
addDiscoveryResponseToCaches(data.address, data.logicalName, data.physicalAddr)
}
}
}
private fun failOpen(call: () -> Unit) {
try {
call()
} catch (e: RedisException) {
if (strict) {
throw e
}
log.warn("Running in non-strict mode ignoring redis exception: ${e.message}")
}
}
private fun <T> failOpen(default: T, call: () -> T): T {
return try {
call()
} catch (e: RedisException) {
if (strict) {
throw e
}
log.warn("Running in non-strict mode ignoring redis exception", e)
default
}
}
fun ByteArray.toBase64(): String = base64Encoder.encodeToString(this)
fun String.fromBase64(): ByteArray = base64Decoder.decode(this)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment