Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Redis PING Implementation for JGroups
package luna.lib
import io.lettuce.core.RedisClient
import io.lettuce.core.RedisException
import io.lettuce.core.cluster.RedisClusterClient
import io.lettuce.core.cluster.api.sync.RedisClusterCommands
import org.jgroups.Address
import org.jgroups.annotations.MBean
import org.jgroups.annotations.Property
import org.jgroups.conf.ClassConfigurator
import org.jgroups.protocols.Discovery
import org.jgroups.protocols.FILE_PING
import org.jgroups.protocols.PingData
import org.jgroups.util.Responses
import java.util.Base64
@MBean(description = "Redis based discovery protocol")
open class REDIS_PING : FILE_PING() {
companion object {
private val base64Encoder = Base64.getEncoder()
private val base64Decoder = Base64.getDecoder()
init {
ClassConfigurator.addProtocol(2200, REDIS_PING::class.java)
}
}
@Property(description = "URL of Redis client")
protected var url: String = "redis://localhost:6379/0"
@Property(description = "Is redis cluster")
protected var cluster: Boolean = false
@Property(description = "Name of the group of pods")
protected var group: String = "jgroups"
@Property(description = "Don't allow cluster to operate without redis instance")
protected var strict: Boolean = false
private var clusterMembers: Map<String, String> = mapOf()
protected val redisCommands: RedisClusterCommands<String, String> by lazy {
if (cluster)
RedisClusterClient.create(url).connect().sync()
else
RedisClient.create(url).connect().sync()
}
override fun init() {
super.init()
failOpen {
redisCommands.ping()
}
}
override fun write(data: MutableList<PingData>, clustername: String) {
data.forEach { d ->
val serData = serializeWithoutView(d).toBase64()
failOpen {
redisCommands.hset(clustername, Discovery.addressAsString(d.address), serData)
}
}
}
override fun removeAll(clustername: String?) {
if (clustername == null) {
return
}
failOpen { redisCommands.del(clustername) }
}
override fun remove(clustername: String?, addr: Address?) {
if (clustername == null || addr == null) {
return
}
failOpen { redisCommands.hdel(clustername, Discovery.addressAsString(addr)) }
}
override fun readAll(members: MutableList<Address>?, clustername: String, responses: Responses) {
clusterMembers = failOpen (mapOf()) { redisCommands.hgetall(clustername) }
for ((ownAddr, serData) in clusterMembers) {
val data = try {
Discovery.deserialize(serData.fromBase64())
} catch (e: Throwable) {
log.error("Unable to read member %s %s; error: %s", local_addr, ownAddr, e)
null
}
if (data == null) {
failOpen {
redisCommands.hdel(clustername, ownAddr)
}
continue
}
if (members != null && !members.contains(data.address)) {
continue
}
responses.addResponse(data, false)
if (local_addr != null && local_addr != data.address) {
addDiscoveryResponseToCaches(data.address, data.logicalName, data.physicalAddr)
}
}
}
private fun failOpen(call: () -> Unit) {
try {
call()
} catch (e: RedisException) {
if (strict) {
throw e
}
log.warn("Running in non-strict mode ignoring redis exception: ${e.message}")
}
}
private fun <T> failOpen(default: T, call: () -> T): T {
return try {
call()
} catch (e: RedisException) {
if (strict) {
throw e
}
log.warn("Running in non-strict mode ignoring redis exception", e)
default
}
}
fun ByteArray.toBase64(): String = base64Encoder.encodeToString(this)
fun String.fromBase64(): ByteArray = base64Decoder.decode(this)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment