Skip to content

Instantly share code, notes, and snippets.

@ixixi
Last active August 29, 2015 14:03
Show Gist options
  • Save ixixi/0ff51a853a78b5e710cd to your computer and use it in GitHub Desktop.
Save ixixi/0ff51a853a78b5e710cd to your computer and use it in GitHub Desktop.
norikra-udf-expdecay
package in.ixixi.norikra.udf
import com.espertech.esper.epl.agg.aggregator.AggregationMethod
import scala.collection.JavaConversions.{seqAsJavaList,mutableMapAsJavaMap}
import java.util.Calendar
import scala.math._
class AggregationScore extends AggregationMethod {
val counts = new collection.mutable.HashMap[Long, Double].withDefaultValue(0)
var halfLife: Double = 0.0
def getValueType: Class[_] = classOf[Double]
def enter(value: AnyRef) {
if (!filter(value)) { return }
val (timestamp,rawValue) = convertValue(value)
counts(timestamp) += rawValue
}
def leave(value: AnyRef) {
if (!filter(value)) { return }
val (timestamp,rawValue) = convertValue(value)
counts(timestamp) -= rawValue
if (counts(timestamp)<=0){
counts.remove(timestamp)
}
}
def getValue: AnyRef = {
val now = Calendar.getInstance().getTimeInMillis
val score = counts.map{case (timestamp,rawValue)=> rawValue * exponentialDecay(timestamp,now)}.sum
return (score :java.lang.Double) : AnyRef
}
def clear: Unit = counts.clear
// 第1,2引数値をキャストして返す。
private def convertValue(value: AnyRef): (Long,Double) = {
val args = value.asInstanceOf[Array[AnyRef]]
val (timestamp, rawValue, halfLifeArg) = (args(0).toString.toLong, args(1).toString.toDouble, args(2).toString.toDouble)
halfLife = halfLifeArg
return (timestamp,rawValue)
}
// 第3引数が指定された時は、その真偽値でfilterする。
private def filter(value: AnyRef): Boolean = {
val args = value.asInstanceOf[Array[AnyRef]]
return args.length == 3 || args(3).toString.toBoolean
}
// 指数関数的減衰の計算。経過時間と半減期から現在の量を返す。
def exponentialDecay(timestamp: Long, now: Long):Double = {
return pow(2,(timestamp-now)/(halfLife*1000))
}
}
package in.ixixi.norikra.udf
import com.espertech.esper.client.hook.AggregationFunctionFactory
import com.espertech.esper.epl.agg.aggregator.AggregationMethod
import com.espertech.esper.epl.agg.service.AggregationValidationContext
class ExponentioalDecayFactory extends AggregationFunctionFactory{
def setFunctionName(functionName: String)
def validate(validationContext: AggregationValidationContext) {
// 引数チェックはパターンマッチでいい感じに書けるかも。
}
def getValueType: Class[_] = classOf[Double]
def newAggregator: AggregationMethod = new ExponentioalDecay
}
require 'java'
require 'norikra/udf'
module Norikra
module UDF
class ExponentitalDecayScore < Norikra::UDF::AggregationSingle
def self.init
require 'norikra-udf-expdecay.jar'
end
def definition
["exp_decay_score", "in.ixixi.norikra.udf.ExponentioalDecayFactory"]
end
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment