Skip to content

Instantly share code, notes, and snippets.

@IlyaHalsky
Created November 28, 2018 15:29
Show Gist options
  • Save IlyaHalsky/72fadeedebed8d79cada2363106613b9 to your computer and use it in GitHub Desktop.
Save IlyaHalsky/72fadeedebed8d79cada2363106613b9 to your computer and use it in GitHub Desktop.
package ee.cone.core.c4security.depviews
import com.typesafe.scalalogging.{LazyLogging, Logger}
import ee.cone.c4actor.BranchProtocol.BranchResult
import ee.cone.c4actor.QProtocol.{Firstborn, TxRef}
import ee.cone.c4actor.Types.SrcId
import ee.cone.c4actor._
import ee.cone.c4actor.dep.DepTypes.DepRequest
import ee.cone.c4actor.dep._
import ee.cone.c4actor.dep.reponse.filter.DepCommonResponseForwardApp
import ee.cone.c4actor.dep.request.{CurrentTimeConfig, CurrentTimeConfigApp}
import ee.cone.c4actor.dep_impl.{AskByPKsApp, DepHandlersApp, DepResponseFiltersApp}
import ee.cone.c4assemble.Types.{DPIterable, Each, Index, Values}
import ee.cone.c4assemble._
import ee.cone.c4gate.AlienProtocol.ToAlienWrite
import ee.cone.c4gate.CurrentSessionKey
import ee.cone.c4gate.HttpProtocol.HttpPost
import ee.cone.c4ui.dep.SessionAttrAskUtility
import ee.cone.c4proto.Protocol
import ee.cone.c4ui.CurrentPathKey
import ee.cone.core.c4base._
import ee.cone.core.c4security.views.DepAssembleProfilerProtocol._
import ee.cone.core.c4security.views.{DepAssembleProfilerProtocol, ProfileSettings, TablesBuilderImpl}
import ee.cone.core.ui.c4view.{InputMeta, InputMetaKey, ViewTagsApp}
import okio.ByteString
import scala.collection.immutable
/*NoGen*/ trait DepAssembleProfilerApp
extends ProtocolsApp
with DepAssembleProfileTransaction
with AssemblesApp
with DepViewRequestsApp
with DepHandlersApp
with DepAskFactoryApp
with ViewTagsApp
with QAdapterRegistryApp
with AskByPKFactoryApp
with DepFactoryApp
with AskByPKsApp
with ModelAccessFactoryApp
with SessionAttrAskUtility
with DepCommonResponseForwardApp
with DepResponseFiltersApp
with CommonIdInjectApps
with DepAssembleProfilerLocalViewApp
with CurrentTimeConfigApp
with OrigMetaAttrHandlersRegistryMix {
override def currentTimeConfig: List[CurrentTimeConfig] = CurrentTimeConfig(DepAssembleProfileViewTime.key, DepAssembleProfileViewTime.refreshRate) :: super.currentTimeConfig
def appUrl: SrcId = DepAssembleProfilerLocalViewObj.url
def disableProfileCl: Product = StopProfile("main", false)
def snapshotTaskSigner: Signer[SnapshotTask]
override def viewRequests: List[UlrRequestContainer[_ <: DepRequest]] =
UlrRequestContainer(appUrl :: Nil, classOf[DepAssembleProfilerRequest])(s ⇒ DepAssembleProfilerRequest(s._1)) :: super.viewRequests
override def protocols: List[Protocol] = DepAssembleProfilerProtocol :: super.protocols
private val drawAsk = depAskFactory.forClasses(classOf[DepAssembleProfilerRequest], classOf[Int])
override def depHandlers: List[DepHandler] = injectContext[DepAssembleProfilerRequest](drawAsk, _.sessionId) :: handler2.handler :: handler.handler :: super.depHandlers
override def askByPKs: List[AbstractAskByPK] = handler2.byPk ::: handler.byPk ::: super.askByPKs
override def assembles: List[Assemble] = new DepAssembleProfilerAssemble(
qAdapterRegistry
) :: super.assembles
/*override def depFilters: List[DepResponseForwardFilter] =
depCommonResponseForward.forwardSessionIds(classOf[DepAssembleProfileDrawRequest]) ::
super.depFilters*/
private val handler = DepAssembleProfilerDrawHandler(depFactory, depAskFactory, askByPKFactory, viewTags, new TablesBuilderImpl(viewTags), sessionAttrAskFactory, modelAccessFactory)
private val handler2 = DepAssembleProfileHandler(depFactory, depAskFactory, askByPKFactory, viewTags, new TablesBuilderImpl(viewTags), qAdapterRegistry, modelAccessFactory, sessionAttrAskFactory, origMetaAttrHandlerRegistry, snapshotTaskSigner)
def toUpdate: ToUpdate
private def clToLong: List[Class[_]] ⇒ Set[Long] = _.map(cl ⇒ qAdapterRegistry.byName(cl.getName).id).toSet
def assembleProfiler: AssembleProfiler = DepAssembleProfiler(
"main",
toUpdate,
10,
Some(100),
clToLong(classOf[ToAlienWrite] :: classOf[BranchResult] :: classOf[HttpPost] ::
classOf[DepAssembleProfilerMeta] :: classOf[UpdatesViewSetting] :: classOf[PauseProfile] :: classOf[StopProfile] :: Nil
),
clToLong(classOf[DepAssembleProfilerMeta] :: Nil).head,
appUrl,
qAdapterRegistry
)
}
case class DepConsoleAssembleProfiler(
durationMS: Long,
count: Option[Long] = None
) extends AssembleProfiler with LazyLogging {
def createSerialJoiningProfiling(localOpt: Option[Context]): SerialJoiningProfiling = {
val meta: Option[List[OrigMetaAttr]] = localOpt.map(local ⇒ TxTransformOrigMetaKey.of(local))
if (meta.isDefined)
logger.info(s"TxMeta: ${meta.get.toString}")
ConsoleSerialJoiningProfiling(durationMS, count, 0)
}
def addMeta(profiling: SerialJoiningProfiling, updates: immutable.Seq[QProtocol.Update]): immutable.Seq[QProtocol.Update] = updates
}
case class DepAssembleProfiler(
threadName: String,
toUpdate: ToUpdate,
durationMS: Long,
count: Option[Long] = None,
ignore: Set[Long],
metaId: Long,
appUrl: String,
qAdapterRegistry: QAdapterRegistry
) extends AssembleProfiler with LazyLogging {
lazy val decoder: DepRequest => AnyOrigProtocol.AnyOrig = AnyAdapter.encode(qAdapterRegistry)
def createSerialJoiningProfiling(localOpt: Option[Context]): SerialJoiningProfiling =
localOpt match {
case Some(local) ⇒
val isStopped = ByPK(classOf[StopProfile]).of(local).nonEmpty
if (!isStopped) {
val writeTo = ProfileSettings.of(local)
val writeJoiners = writeTo.exists(_._2)
val inputMeta = InputMetaKey.of(local)
if (inputMeta.exists(_.url.endsWith(appUrl)))
NoneSerialJoiningProfiling
else {
val stack: List[SrcId] = Nil //Thread.currentThread.getStackTrace.toList.take(5).map(_.toString)
val sessionKey: SrcId = CurrentSessionKey.of(local)
val pathKey = CurrentPathKey.of(local)
val txNumber = WriteModelKey.of(local).count(_.valueTypeId == metaId)
val txMeta = TxTransformOrigMetaKey.of(local)
//val stat = ByPK(classOf[UpdatesProfileTotal]).of(local).getOrElse("main", UpdatesProfileTotal("main", 0, 0, 0))
DepSerialJoiningProfiling(
sessionKey,
stack,
System.currentTimeMillis(),
System.nanoTime(),
Nil,
pathKey,
inputMeta,
txNumber,
txMeta,
writeTo.map(_._1),
writeJoiners
)
}
} else {
NoneSerialJoiningProfiling
}
case None ⇒ ConsoleSerialJoiningProfiling(durationMS, count, 0)
}
def addMeta(profiling: SerialJoiningProfiling, updates: immutable.Seq[QProtocol.Update]): immutable.Seq[QProtocol.Update] = {
profiling match {
case ConsoleSerialJoiningProfiling(_, _, _) ⇒
updates
case DepSerialJoiningProfiling(sessionId, _, startTimeMillis, startTimeNano, joinStack, pathKey, inputMetaOpt, txCount, txMeta, to, _) ⇒
if (updates.map(_.valueTypeId).forall(ignore)) {
updates
} else {
val srcId = if (to.isDefined) to.get else RandomSrcId()
val meta = DepAssembleProfilerMeta(
srcId,
threadName,
txCount,
startTimeMillis,
System.nanoTime() - startTimeNano,
sessionId,
joinStack,
pathKey,
updates.map(c ⇒ if (ignore.apply(c.valueTypeId)) c.copy(value = ByteString.EMPTY) else c).toList,
inputMetaOpt.map(c ⇒ InputMetaProto(c.userName, c.caption, c.path, c.action, c.sessionId, c.url, c.body)),
txMeta.map(meta ⇒ decoder(meta.orig))
)
val txRef = TxRef(srcId, "")
(meta :: txRef :: Nil).flatMap(LEvent.update).map(toUpdate.toUpdate) ++ updates
}
case _ ⇒ updates
}
}
}
case class DepSerialJoiningProfiling(
sessionId: String,
stackTrace: List[String],
startTimeMillis: Long,
startTimeNano: Long,
joinStack: List[JoinLogEntry] = Nil,
uiPath: String,
inputMetaOpt: Option[InputMeta],
txNumber: Int,
txMeta: List[OrigMetaAttr],
writeTo: Option[String],
profileJoiners: Boolean
) extends SerialJoiningProfiling {
def time: Long = if (profileJoiners) System.nanoTime else 0L
def handle(
join: Join,
calcStart: Long,
findChangesStart: Long,
patchStart: Long,
joinRes: DPIterable[Index],
transition: WorldTransition
): WorldTransition =
if (profileJoiners)
transition.copy(profiling = copy(joinStack = JoinLogEntry(join.name, join.assembleName, joinRes.size, (time - calcStart) / 10000) :: joinStack))
else
transition
}
/*NoGen*/ trait ProfileLogger {
lazy val logger: Logger = Logger("Profile")
}
case class ConsoleSerialJoiningProfiling(durationMS: Long, count: Option[Long] = None, soFar: Long) extends SerialJoiningProfiling with ProfileLogger {
def time: Long = System.nanoTime
def handle(
join: Join,
calcStart: Long,
findChangesStart: Long,
patchStart: Long,
joinRes: DPIterable[Index],
transition: WorldTransition
): WorldTransition = {
val timeNano: Long = (System.nanoTime - calcStart) / 10000
val timeFront: Double = timeNano / 100.0
val countT = joinRes.size
if (timeFront > durationMS && count.exists(_ < countT))
if (soFar < 100000)
logger.info(s"${join.assembleName} rule ${join.name} ${getColoredCount(countT)} items for ${getColoredPeriod(timeFront)} ms")
else
logger.info(s"${join.assembleName} rule ${join.name} ${getColoredCount(countT)} items for ${getColoredPeriod(timeFront)} ms, so far ${soFar / 100.0} ms total")
transition.copy(profiling = copy(soFar = soFar + timeNano))
}
def getColoredPeriod: Double ⇒ String = {
case i if i < 200 ⇒ ColoredString("g")(i.toString)
case i if i >= 200 && i < 500 ⇒ ColoredString("y")(i.toString)
case i if i >= 500 ⇒ ColoredString("r")(i.toString)
}
def getColoredCount: Int ⇒ String = {
case i if i < 100 ⇒ ColoredString("g")(i.toString)
case i if i >= 100 && i < 1000 ⇒ ColoredString("y")(i.toString)
case i if i >= 1000 ⇒ ColoredString("r")(i.toString)
}
}
case object NoneSerialJoiningProfiling extends SerialJoiningProfiling {
def time: Long = 0L
def handle(join: Join, calcStart: Long, findChangesStart: Long, patchStart: Long, joinRes: DPIterable[Index], transition: WorldTransition): WorldTransition = transition
}
case class TotalTransactions(
threadId: SrcId,
transactionsIds: List[SrcId]
) {
val keepIds: Set[String] = transactionsIds.toSet
}
case class TransactionsIdContainer(srcId: SrcId)
case class TransactionProfile(
transactionId: SrcId,
threadId: SrcId,
startedAt: Long,
profiles: List[ProfileMetaWithTransactionId],
setting: Option[UpdatesViewSetting],
trType: String
) {
val keepIds: Set[String] = profiles.map(_.meta.srcId).toSet
}
case class RichPauseProfile(
pauseProfile: PauseProfile
) {
val pauseSet: Set[String] = pauseProfile.toKeep.toSet
}
case class ProfileMetaWithTransactionId(
transactionId: SrcId,
meta: DepAssembleProfilerMeta,
txRef: TxRef
)
case class KillProfiles(srcId: SrcId, profiles: List[TransactionProfile]) extends TxTransform {
def transform(local: Context): Context =
TxAdd(profiles.flatMap(t ⇒ t.profiles.flatMap(p ⇒
p.meta :: p.txRef :: Nil
) ::: t.setting.toList
).flatMap(LEvent.delete)
)(local)
}
case class KillProfilesStop(srcId: SrcId) extends TxTransform {
def transform(local: Context): Context = {
val settings: Iterable[UpdatesViewSetting] = ByPK(classOf[UpdatesViewSetting]).of(local).values
val meta: Iterable[DepAssembleProfilerMeta] = ByPK(classOf[DepAssembleProfilerMeta]).of(local).values
val txRef: Iterable[TxRef] = ByPK(classOf[TxRef]).of(local).values
val stops: Iterable[StopProfile] = ByPK(classOf[StopProfile]).of(local).values
val events =
(settings ++ meta ++ txRef).toList.flatMap(LEvent.delete) ++
stops.map(_.copy(completedStop = true)).toList.flatMap(LEvent.update)
TxAdd(events)(local)
}
}
@assemble class DepAssembleProfilerAssemble(qAdapterRegistry: QAdapterRegistry) extends Assemble with LazyLogging {
type ThreadId = SrcId
type TransactionId = SrcId
type KillMe = SrcId
type PauseAll = All
type StopAll = All
/* def RoundStats(
srcId: SrcId,
stat: Each[UpdatesProfileTotal]
): Values[(SrcId, UpdatesProfileTotalRounded)] =
WithPKList(UpdatesProfileTotalRounded(stat.srcId, stat.ui / 10 * 10, stat.time / 10 * 10, stat.business / 10 * 10))*/
def PauseToAll(
srcId: SrcId,
pause: Each[PauseProfile]
): Values[(PauseAll, RichPauseProfile)] =
WithAll(RichPauseProfile(pause)) :: Nil
def StopToAll(
srcId: SrcId,
stop: Each[StopProfile]
): Values[(StopAll, StopProfile)] =
WithAll(stop) :: Nil
def CollectProfiles(
profileId: SrcId,
@by[StopAll] stops: Values[StopProfile],
profile: Each[DepAssembleProfilerMeta],
txRef: Each[TxRef]
): Values[(TransactionId, ProfileMetaWithTransactionId)] =
WithPKIf(stops.isEmpty, ProfileMetaWithTransactionId(txRef.txId, profile, txRef))
def CreateTotalUpdatesProfile(
transactionId: SrcId,
settings: Values[UpdatesViewSetting],
@by[TransactionId] profiles: Values[ProfileMetaWithTransactionId]
): Values[(SrcId, TransactionProfile)] =
if (profiles.isEmpty) Nil
else
WithPKList(
TransactionProfile(
transactionId,
profiles.head.meta.threadId,
profiles.map(_.meta.startedAtMillis).min,
profiles.toList.sortBy(_.meta.txAddNumber),
settings.headOption,
if (profiles.exists(_.meta.inputMeta.isDefined)) "user" else "bg"
)
)
def TransactionProfileToThread(
transaction: SrcId,
transactionProfile: Each[TransactionProfile]
): Values[(ThreadId, TransactionProfile)] =
(transactionProfile.threadId → transactionProfile) :: Nil
def TotalTransactionsCreation(
threadId: SrcId,
filters: Values[ProfileFilters],
@by[StopAll] stops: Values[StopProfile],
@by[PauseAll] pauses: Values[RichPauseProfile],
@by[ThreadId] transactionProfiles: Values[TransactionProfile]
): Values[(SrcId, TotalTransactions)] = {
val filter: TransactionProfile ⇒ Boolean = pr ⇒ filters.headOption.isEmpty || filters.headOption.get.dd.forall(_ == pr.trType)
if (stops.nonEmpty || transactionProfiles.isEmpty)
Nil
else
pauses.toList match {
case Nil ⇒
WithPKList(
TotalTransactions(
threadId,
transactionProfiles.filter(filter).sortBy(_.transactionId).reverse.take(30).map(_.transactionId).toList
)
)
case a :: _ ⇒
WithPKList(
TotalTransactions(
threadId,
transactionProfiles.filter(filter).sortBy(_.transactionId).reverse.map(_.transactionId).filter(a.pauseSet).toList
)
)
}
}
type DepLive = SrcId
def KeepAlive(
srcId: SrcId,
a: Each[TotalTransactions]
): Values[(DepLive, TransactionsIdContainer)] =
for {
id ← a.transactionsIds
} yield WithPK(TransactionsIdContainer(id))
def KillOffDepAssembleProfilerMeta(
threadId: SrcId,
pr: Each[TransactionProfile],
@by[DepLive] a: Values[TransactionsIdContainer]
): Values[(KillMe, TransactionProfile)] =
if (a.isEmpty) {
(pr.threadId → pr) :: Nil
} else Nil
def CreateKiller(
threadId: SrcId,
@by[KillMe] profiles: Values[TransactionProfile],
@by[StopAll] stops: Values[StopProfile]
): Values[(SrcId, TxTransform)] =
if (stops.isEmpty)
ListIf(profiles.size > 100, WithPKList(KillProfiles(threadId, profiles.toList)))
else
WithPKList(KillProfiles(threadId, profiles.toList))
def KillIfStop(
threadId: SrcId,
firstBorn: Each[Firstborn],
@by[StopAll] stops: Values[StopProfile]
): Values[(SrcId, TxTransform)] =
stops.toList match {
case a :: Nil if !a.completedStop ⇒ WithPKList(KillProfilesStop(a.srcId))
case _ ⇒ Nil
}
}
/*def SimplifyUpdates(
key: SrcId,
@by[PauseAll] pauses: Values[RichPauseProfile],
@by[StopAll] stops: Values[StopProfile],
updates: Each[DepAssembleProfilerMeta],
viewSettings: Values[UpdatesViewSetting]
): Values[(CollectiveId, UpdateProfile)] =
if (stops.isEmpty)
pauses.headOption match {
case None ⇒
/*val stup: Map[Long, Int] = updates.updates.groupBy(_.valueTypeId).transform{case (k, v) ⇒ v.map(_.value.size()).sum}
logger.info(key)
for {i ← stup} yield {
logger.info(i.toString())
}*/
/*val isTime = updates.updates.map(_.valueTypeId).forall(timeIds)
val isUi = updates.updates.map(_.valueTypeId).forall(uiIds)
logger.info((key,isTime, isUi).toString())
val time = updates.updates.find(_.valueTypeId == timeAdapter.id).map(u ⇒ timeAdapter.decode(u.value).currentTimeSeconds).getOrElse(0L)
logger.info((key, time, updates.updates.map(_.valueTypeId).distinct).toString)*/
val updateIds = updates.updates.map(_.valueTypeId).distinct
val metaProper: List[DepAssembleProfilerMeta] = updates.updates.filter(u ⇒ u.valueTypeId == metaAdapter.id).map(u ⇒ metaAdapter.decode(u.value))
ListIf(!updateIds.exists(ignoreIds) && metaProper.nonEmpty,
List(collectiveId → UpdateProfile(
updates.srcId,
updates.updates.size,
updates.updates.map(_.value.size).sum,
updateIds,
metaProper,
viewSettings.headOption
)
)
)
case Some(pause) if pause.pauseSet.contains(updates.srcId) ⇒
val updateIds = updates.updates.map(_.valueTypeId).distinct
val metaProper: List[DepAssembleProfilerMeta] = updates.updates.filter(u ⇒ u.valueTypeId == metaAdapter.id).map(u ⇒ metaAdapter.decode(u.value))
ListIf(!updateIds.exists(ignoreIds) && metaProper.nonEmpty,
List(collectiveId → UpdateProfile(
updates.srcId,
updates.updates.size,
updates.updates.map(_.value.size).sum,
updateIds,
metaProper,
viewSettings.headOption
)
)
)
case _ ⇒ Nil
}
else
Nil
def CollectProfiles(
key: SrcId,
@by[CollectiveId] updatesSummary: Values[UpdateProfile]
): Values[(SrcId, UpdatesProfile)] = {
@tailrec def headToKeep(res: UpdatesProfile, in: List[UpdateProfile]): UpdatesProfile =
if (in.isEmpty) res else {
val will = UpdatesProfile(
res.srcId,
in.head :: res.items,
res.txCount + 1,
res.objCount + in.head.objCount,
res.byteCount + in.head.byteCount
)
if (will.txCount > 30 ||
will.objCount > 10000 ||
will.byteCount > 100000000
) res
else headToKeep(will, in.tail)
}
List(WithPK(headToKeep(
UpdatesProfile(key, Nil, 0L, 0L, 0L),
updatesSummary.filterNot(_.valueTypeIds.forall(skipIds))
.sortBy(_.srcId).toList.reverse
)
)
)
}
def KeepAliveUpdates(
key: SrcId,
stops: Values[StopProfile],
pauses: Values[PauseProfile],
updatesListSummary: Each[UpdatesProfile]
): Values[(SrcId, KeepUpdates)] =
if (stops.isEmpty)
if (pauses.isEmpty)
for {
item ← updatesListSummary.items
} yield WithPK(KeepUpdates(item.srcId))
else
pauses.head.toKeep.map(id ⇒ WithPK(KeepUpdates(id)))
else
Nil*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment