Skip to content

Instantly share code, notes, and snippets.

@hanishi
Created February 27, 2023 22:38
Show Gist options
  • Save hanishi/dc7a5d3c6a41a2ada38287f4e5c31139 to your computer and use it in GitHub Desktop.
Save hanishi/dc7a5d3c6a41a2ada38287f4e5c31139 to your computer and use it in GitHub Desktop.
package actors
import actors.CpcActuator.Actor.{ FetchMeasurementKey, TimeoutKey, Work }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer, TimerScheduler }
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy }
import akka.util.Timeout
import com.google.inject.Provides
import play.api.Configuration
import play.api.libs.concurrent.ActorModule
import usecase.SamplePublisherService
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{ Failure, Success }
case class CpcActuator(minPrice: Float = 0f, cpc: Float = 0f, maxCpc: Float = 0f)
object CpcActuator {
val OVERSHOOTS_LIMIT = 5
val UNDERSHOOTS_LIMIT = 5
val CPC_STEP = 10
sealed trait Command
private case class UpdateSucceeded(message: String) extends Command
private case class MeasurementsFetched(minPrice: Float, setpoint: Int, measurement: Int, cpc: Float, maxCpc: Float)
extends Command
private case class FetchMeasurementFailed(throwable: Throwable) extends Command
private class Actor(
samplePublisher: SamplePublisherService,
controller: ActorRef[PIController.Command],
context: ActorContext[Command],
buffer: StashBuffer[Command],
timers: TimerScheduler[Command],
)(
implicit timeout: Timeout,
ec: ExecutionContext
) {
val TIMEOUT_DURATION: FiniteDuration = 30.minutes
def idle(cpcActuator: CpcActuator): Behavior[Command] = {
if (timers.isTimerActive(TimeoutKey)) timers.cancel(TimeoutKey)
timers.startSingleTimer(TimeoutKey, Terminated, TIMEOUT_DURATION)
buffer.unstashAll(active(cpcActuator))
}
def active(cpcActuator: CpcActuator): Behavior[Command] = Behaviors.receiveMessagePartial {
case Work(input) =>
update(cpcActuator, input).pipe(context.pipeToSelf) {
case Success(cpc) =>
UpdateSucceeded(cpc)
case Failure(throwable) =>
throw throwable
}
Behaviors.same
case UpdateSucceeded(message) =>
context.log.info(message)
if (timers.isTimerActive(FetchMeasurementKey)) timers.cancel(FetchMeasurementKey)
timers.startSingleTimer(FetchMeasurementKey, FetchMeasurement, 6.seconds)
Behaviors.same
case FetchMeasurement =>
fetchMeasurements().pipe(context.pipeToSelf) {
case Success((minPrice, setpoint, measuremrnt, cpc, maxCpc)) =>
MeasurementsFetched(minPrice, setpoint, measuremrnt, cpc, maxCpc)
case Failure(throwable) =>
FetchMeasurementFailed(throwable)
}
fetchingMeasurements(cpcActuator)
}
private def fetchingMeasurements(cpcActuator: CpcActuator = CpcActuator()): Behavior[Command] =
Behaviors.receiveMessage {
case MeasurementsFetched(minPrice, setpoint, measurement, cpc, maxCpc) =>
context.log.info(s"setpoint=$setpoint, measurement=$measurement, cpc=$cpc")
context.askWithStatus(
controller,
replyTo => PIController.Output(setpoint, measurement, replyTo)
) {
case Success(PIController.Input(value, undershoots, overshoots)) =>
if (undershoots > UNDERSHOOTS_LIMIT) Work(value + 5)
else if (overshoots > OVERSHOOTS_LIMIT) Work(value - 5)
else Work(value)
case Failure(exception) =>
throw exception
}
idle(cpcActuator.copy(minPrice = minPrice, cpc = cpc, maxCpc = maxCpc))
case FetchMeasurementFailed(throwable) =>
throw throwable
case other =>
buffer.stash(other)
Behaviors.same
}
private def fetchMeasurements(): Future[(Float, Int, Int, Float, Float)] =
samplePublisher
.getSamplePublisher()
.map(
samplePublisher =>
(
samplePublisher.minPrice,
samplePublisher.averageClicks,
samplePublisher.clicksEveryMinute,
samplePublisher.cpc,
samplePublisher.maxCpc
)
)
private def update(cpcActuator: CpcActuator, input: Float): Future[String] = {
println(input)
val cpc = 0.02f * input + cpcActuator.cpc
val result = if (input > 0) Math.min(cpc, cpcActuator.maxCpc) else Math.max(cpc, cpcActuator.minPrice)
samplePublisher.bid(result)
Future.successful(s"CPC:${cpcActuator.cpc} -> $result, minPrice: ${cpcActuator.minPrice}, maxPrice: ${cpcActuator.maxCpc}")
}
}
case object FetchMeasurement extends Command
object Actor extends ActorModule {
override type Message = Command
@Provides def apply(configuration: Configuration, samplePublisher: SamplePublisherService)(
implicit ec: ExecutionContext
): Behavior[Command] =
Behaviors
.supervise[Command](
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) { buffer =>
Behaviors.setup {
context =>
Behaviors.withTimers { timers =>
implicit val timeout: Timeout =
configuration.getOptional[Int](ASK_TIME_OUT).getOrElse(DEFAULT_ASK_TIMEOUT).seconds
samplePublisher
.getSamplePublisher()
.map(
samplePublisher =>
(
samplePublisher.minPrice,
samplePublisher.averageClicks,
samplePublisher.clicksEveryMinute,
samplePublisher.cpc,
samplePublisher.maxCpc
)
)
.pipe(context.pipeToSelf) {
case Success((minPrice, setpoint, measurement, cpc, maxCpc)) =>
MeasurementsFetched(minPrice, setpoint, measurement, cpc, maxCpc)
case Failure(throwable) =>
FetchMeasurementFailed(throwable)
}
new Actor(
samplePublisher,
context.spawnAnonymous(PIController.Actor(configuration)),
context,
buffer,
timers
).fetchingMeasurements()
}
}
}
)
.onFailure(SupervisorStrategy.restart)
private case class Work(value: Float) extends Command
private case object FetchMeasurementKey
private case object TimeoutKey
}
private case object Terminated extends Command
}
package actors
import actors.PIController.Command
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, StashBuffer }
import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy }
import akka.pattern.StatusReply
import play.api.Configuration
import java.time.LocalDateTime
import java.time.temporal.ChronoUnit
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.chaining.scalaUtilChainingOps
import scala.util.{ Failure, Success }
case class PIController(
kp: Float,
ki: Float,
kd: Float, /* Derivative low-pass filter time constant */
tau: Float, /* Output limits */
limMin: Float,
limMax: Float,
/* Integrator limits */
limMinInt: Float,
limMaxInt: Float,
/* Sample time (in seconds) */
t: Float,
proportion: Float = 0.0f,
integrator: Float = 0.0f,
differentiator: Float = 0.0f,
prevError: Float = 0.0f,
prevMeasurement: Int = 0,
undershoots: Int = 0,
overshoots: Int = 0
) extends Command {
// def out = {
// println(s"P=$proportion, I=$integrator, undershoots=$undershoots, overshoots=$overshoots, out= ${proportion + integrator}")
// proportion + integrator
// }
def out = {
val out = proportion + integrator + differentiator
println(s"P=$proportion, I=$integrator, D=$differentiator, out=${out}")
if (out > limMax) limMax else if (out < limMin) limMin else out
}
// private def update(setpoint: Int, measurement: Int): PIController = {
// val error = setpoint - measurement
// val gain = if (prevMeasurement > 0) measurement - prevMeasurement else 0
// print(s"""
// |setpoint: $setpoint
// |measurement: $measurement
// |----------------------------------------
// |error: $error
// |gain: $gain,
// |measurement: $measurement,
// |prevMeasurement: $prevMeasurement
// |""".stripMargin)
// if (error > gain) {
// val effectiveError = (error - gain).toFloat / measurement
// print(
// s"""effective error: $effectiveError
// |----------------------------------------
// |""".stripMargin)
// this.copy(
// proportion = proportion(effectiveError),
// integrator = integrate(effectiveError, ki),
// prevError = effectiveError,
// prevMeasurement = measurement,
// undershoots = if (error > measurement) undershoots + 1 else undershoots,
// overshoots = 0
// )
// } else if (-error > gain) {
// val effectiveError = (error - gain).toFloat / measurement
// print(
// s"""effective error: $effectiveError
// |----------------------------------------
// |""".stripMargin)
// this.copy(
// proportion = proportion(effectiveError),
// integrator = integrate(effectiveError, ki),
// prevError = effectiveError,
// prevMeasurement = measurement,
// undershoots = 0,
// overshoots = if (-error > measurement) overshoots + 1 else overshoots
// )
// } else
// this.copy(
// proportion = proportion(0.0f),
// prevMeasurement = measurement,
// undershoots = if (undershoots > 0) undershoots - 1 else 0,
// overshoots = if (overshoots > 0) overshoots - 1 else 0
// )
// }
private def update(setpoint: Int, measurement: Int): PIController = {
val error = setpoint - measurement
print(
s"""
|setpoint: $setpoint
|measurement: $measurement
|----------------------------------------
|error: $error
|measurement: $measurement,
|prevMeasurement: $prevMeasurement
|""".stripMargin)
this.copy(
proportion = proportion(error.toFloat),
integrator = integrate(error.toFloat),
differentiator = differentiate(measurement),
prevError = error.toFloat,
prevMeasurement = measurement
)
}
private def proportion(error: Float): Float = error * kp
private def integrate(error: Float): Float = {
val i = integrator + 0.5f * ki * t * (error + prevError)
if (i > limMaxInt) limMaxInt else if (i < limMinInt) limMinInt else i
}
private def differentiate(measurement: Int) =
-(2.0f * kd * (measurement - prevMeasurement) + (2.0f * tau - t) * differentiator) / (2.0f * tau + t)
}
object PIController {
val Kp = 2.0f
val Ki = 0.5f
val Kd = 0.25f
val LIM_MIN = -10.0f
val LIM_MAX = 10.0f
val LIM_MIN_INTEGRATOR = -5.0f
val LIM_MAX_INTEGRATOR = 5.0f
val TAU = 0.04f
val SAMPLE_TIME_S = 0.02f //setpointの値が300秒毎のclick数の平均値とし、フィードバックが60秒間隔の場合、60/300 = 0.2
def load() =
Future.successful(
PIController(
Kp,
Ki,
Kd,
TAU,
LIM_MIN,
LIM_MAX,
LIM_MIN_INTEGRATOR,
LIM_MAX_INTEGRATOR,
SAMPLE_TIME_S
)
)
sealed trait Command
case class Output(setpoint: Int, measurement: Int, replyTo: ActorRef[StatusReply[Input]]) extends Command
case class Input(value: Float, undershoots: Int, overshoots: Int) extends Command
private case class InitializationSucceeded(controller: PIController) extends Command
private case class InitializationFailed(throwable: Throwable) extends Command
private class Actor(
context: ActorContext[Command],
buffer: StashBuffer[Command],
started: LocalDateTime = LocalDateTime.now()
) {
private def initializing(): Behavior[Command] = Behaviors.receiveMessage {
case InitializationSucceeded(controller) => buffer.unstashAll(active(controller))
case InitializationFailed(throwable) =>
context.log.error("Initialization Failed")
throw throwable
case other =>
buffer.stash(other)
Behaviors.same
}
private def active(controller: PIController): Behavior[Command] =
Behaviors.receiveMessagePartial {
case Output(setpoint, measurement, replyTo) =>
val updated = controller.update(setpoint, measurement)
replyTo ! StatusReply.Success(Input(updated.out, controller.undershoots, controller.overshoots))
idle(updated)
}
private def idle(controller: PIController): Behavior[Command] = {
println(
s"""duration: ${ChronoUnit.SECONDS.between(started, LocalDateTime.now())}
|----------------------------------------
|""".stripMargin)
active(controller)
}
}
case object Reset extends Command
object Actor {
def apply(configuration: Configuration)(implicit ec: ExecutionContext): Behavior[Command] =
Behaviors
.supervise[Command] {
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) { buffer =>
Behaviors.setup { context =>
load().pipe(context.pipeToSelf) {
case Success(controller) => InitializationSucceeded(controller)
case Failure(exception) => InitializationFailed(exception)
}
new Actor(context, buffer).initializing()
}
}
}
.onFailure(SupervisorStrategy.restart)
}
}
package actors
import actors.CpcActuator.CPC_STEP
import actors.SamplePublisher.Command
import akka.actor.typed.scaladsl.{ Behaviors, TimerScheduler }
import akka.actor.typed.{ ActorRef, Behavior }
import akka.pattern.StatusReply
import com.google.inject.Provides
import play.api.libs.concurrent.ActorModule
import scala.concurrent.duration.DurationInt
import scala.util.Random
case class SamplePublisher(
minPrice: Float,
cpc: Float,
maxCpc: Float,
averageClicks: Int = 0,
clicksEveryMinute: Int = 0,
count: Int = 0
) extends Command {
def incrementAverageClicks(): SamplePublisher = {
if (count % 5 == 0) {
this.copy(
averageClicks = averageClicks + (1 to 5 map (_ => Random.nextInt(100))).sum / 5,
count = count + 1
)
} else
this.copy(count = count + 1)
}
def bid(cpc: Float) = {
if (cpc <= minPrice)
this.copy(cpc = cpc)
else {
val clicks = (Random.nextInt(100) * (cpc / minPrice)).toInt
if (count % 5 == 0)
this.copy(clicksEveryMinute = clicksEveryMinute + clicks, cpc = cpc)
else
this.copy(clicksEveryMinute = Math.max(clicksEveryMinute, clicks), cpc = cpc)
}
}
def boost(value: Int) = {
if (System.currentTimeMillis() % 2 == 0 && cpc > maxCpc) {
this.copy(clicksEveryMinute = clicksEveryMinute + value)
} else this.copy(averageClicks = averageClicks + value)
}
}
object SamplePublisher {
sealed trait Command
case class GetSamplePublisher(replyTo: ActorRef[StatusReply[SamplePublisher]]) extends Command
case class Bid(cpc: Float) extends Command
case class Pace(value: Int) extends Command
private class Actor(timers: TimerScheduler[Command]) {
private def active(publisher: SamplePublisher): Behavior[Command] =
Behaviors.receiveMessagePartial {
case GetSamplePublisher(replyTo) =>
replyTo ! StatusReply.Success(publisher)
if (timers.isTimerActive(PaceChange)) timers.cancel(PaceChange)
timers.startSingleTimer(PaceChange, Pace(Random.between(100, 1000)), Random.between(0, 60).seconds)
active(publisher.incrementAverageClicks())
case Bid(cpc) =>
active(publisher.bid(cpc))
case Pace(value) =>
active(publisher.boost(value))
}
}
object Actor extends ActorModule {
override type Message = Command
@Provides def apply(): Behavior[Command] = Behaviors.withTimers { timers =>
val minPrice = Random.between(20, 25).toFloat
val cpc = minPrice + Random.between(0, 10)
new Actor(timers).active(
SamplePublisher(
minPrice,
cpc,
cpc + CPC_STEP,
Random.nextInt(100),
Random.nextInt(100) * (cpc / minPrice).toInt,
count = 1
)
)
}
}
case object PaceChange extends Command
}
package usecase
import actors.ASK_TIME_OUT
import actors.SamplePublisher.{Bid, Command, GetSamplePublisher}
import akka.actor.typed.scaladsl.AskPattern.{Askable, schedulerFromActorSystem}
import akka.actor.typed.{ActorRef, Scheduler}
import akka.util.Timeout
import com.google.inject.{Inject, Provides}
import play.api.Configuration
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt
@Provides class SamplePublisherService @Inject()(configuration: Configuration, samplePublisher: ActorRef[Command])(
implicit scheduler: Scheduler,
ex: ExecutionContext
) {
implicit val timeout: Timeout = configuration.get[Int](ASK_TIME_OUT).seconds
def getSamplePublisher() = samplePublisher.askWithStatus(GetSamplePublisher)
def bid(price: Float) = samplePublisher ! Bid(price)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment