Created
November 28, 2018 15:29
-
-
Save IlyaHalsky/72fadeedebed8d79cada2363106613b9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ee.cone.core.c4security.depviews | |
import com.typesafe.scalalogging.{LazyLogging, Logger} | |
import ee.cone.c4actor.BranchProtocol.BranchResult | |
import ee.cone.c4actor.QProtocol.{Firstborn, TxRef} | |
import ee.cone.c4actor.Types.SrcId | |
import ee.cone.c4actor._ | |
import ee.cone.c4actor.dep.DepTypes.DepRequest | |
import ee.cone.c4actor.dep._ | |
import ee.cone.c4actor.dep.reponse.filter.DepCommonResponseForwardApp | |
import ee.cone.c4actor.dep.request.{CurrentTimeConfig, CurrentTimeConfigApp} | |
import ee.cone.c4actor.dep_impl.{AskByPKsApp, DepHandlersApp, DepResponseFiltersApp} | |
import ee.cone.c4assemble.Types.{DPIterable, Each, Index, Values} | |
import ee.cone.c4assemble._ | |
import ee.cone.c4gate.AlienProtocol.ToAlienWrite | |
import ee.cone.c4gate.CurrentSessionKey | |
import ee.cone.c4gate.HttpProtocol.HttpPost | |
import ee.cone.c4ui.dep.SessionAttrAskUtility | |
import ee.cone.c4proto.Protocol | |
import ee.cone.c4ui.CurrentPathKey | |
import ee.cone.core.c4base._ | |
import ee.cone.core.c4security.views.DepAssembleProfilerProtocol._ | |
import ee.cone.core.c4security.views.{DepAssembleProfilerProtocol, ProfileSettings, TablesBuilderImpl} | |
import ee.cone.core.ui.c4view.{InputMeta, InputMetaKey, ViewTagsApp} | |
import okio.ByteString | |
import scala.collection.immutable | |
/*NoGen*/ trait DepAssembleProfilerApp | |
extends ProtocolsApp | |
with DepAssembleProfileTransaction | |
with AssemblesApp | |
with DepViewRequestsApp | |
with DepHandlersApp | |
with DepAskFactoryApp | |
with ViewTagsApp | |
with QAdapterRegistryApp | |
with AskByPKFactoryApp | |
with DepFactoryApp | |
with AskByPKsApp | |
with ModelAccessFactoryApp | |
with SessionAttrAskUtility | |
with DepCommonResponseForwardApp | |
with DepResponseFiltersApp | |
with CommonIdInjectApps | |
with DepAssembleProfilerLocalViewApp | |
with CurrentTimeConfigApp | |
with OrigMetaAttrHandlersRegistryMix { | |
override def currentTimeConfig: List[CurrentTimeConfig] = CurrentTimeConfig(DepAssembleProfileViewTime.key, DepAssembleProfileViewTime.refreshRate) :: super.currentTimeConfig | |
def appUrl: SrcId = DepAssembleProfilerLocalViewObj.url | |
def disableProfileCl: Product = StopProfile("main", false) | |
def snapshotTaskSigner: Signer[SnapshotTask] | |
override def viewRequests: List[UlrRequestContainer[_ <: DepRequest]] = | |
UlrRequestContainer(appUrl :: Nil, classOf[DepAssembleProfilerRequest])(s ⇒ DepAssembleProfilerRequest(s._1)) :: super.viewRequests | |
override def protocols: List[Protocol] = DepAssembleProfilerProtocol :: super.protocols | |
private val drawAsk = depAskFactory.forClasses(classOf[DepAssembleProfilerRequest], classOf[Int]) | |
override def depHandlers: List[DepHandler] = injectContext[DepAssembleProfilerRequest](drawAsk, _.sessionId) :: handler2.handler :: handler.handler :: super.depHandlers | |
override def askByPKs: List[AbstractAskByPK] = handler2.byPk ::: handler.byPk ::: super.askByPKs | |
override def assembles: List[Assemble] = new DepAssembleProfilerAssemble( | |
qAdapterRegistry | |
) :: super.assembles | |
/*override def depFilters: List[DepResponseForwardFilter] = | |
depCommonResponseForward.forwardSessionIds(classOf[DepAssembleProfileDrawRequest]) :: | |
super.depFilters*/ | |
private val handler = DepAssembleProfilerDrawHandler(depFactory, depAskFactory, askByPKFactory, viewTags, new TablesBuilderImpl(viewTags), sessionAttrAskFactory, modelAccessFactory) | |
private val handler2 = DepAssembleProfileHandler(depFactory, depAskFactory, askByPKFactory, viewTags, new TablesBuilderImpl(viewTags), qAdapterRegistry, modelAccessFactory, sessionAttrAskFactory, origMetaAttrHandlerRegistry, snapshotTaskSigner) | |
def toUpdate: ToUpdate | |
private def clToLong: List[Class[_]] ⇒ Set[Long] = _.map(cl ⇒ qAdapterRegistry.byName(cl.getName).id).toSet | |
def assembleProfiler: AssembleProfiler = DepAssembleProfiler( | |
"main", | |
toUpdate, | |
10, | |
Some(100), | |
clToLong(classOf[ToAlienWrite] :: classOf[BranchResult] :: classOf[HttpPost] :: | |
classOf[DepAssembleProfilerMeta] :: classOf[UpdatesViewSetting] :: classOf[PauseProfile] :: classOf[StopProfile] :: Nil | |
), | |
clToLong(classOf[DepAssembleProfilerMeta] :: Nil).head, | |
appUrl, | |
qAdapterRegistry | |
) | |
} | |
case class DepConsoleAssembleProfiler( | |
durationMS: Long, | |
count: Option[Long] = None | |
) extends AssembleProfiler with LazyLogging { | |
def createSerialJoiningProfiling(localOpt: Option[Context]): SerialJoiningProfiling = { | |
val meta: Option[List[OrigMetaAttr]] = localOpt.map(local ⇒ TxTransformOrigMetaKey.of(local)) | |
if (meta.isDefined) | |
logger.info(s"TxMeta: ${meta.get.toString}") | |
ConsoleSerialJoiningProfiling(durationMS, count, 0) | |
} | |
def addMeta(profiling: SerialJoiningProfiling, updates: immutable.Seq[QProtocol.Update]): immutable.Seq[QProtocol.Update] = updates | |
} | |
case class DepAssembleProfiler( | |
threadName: String, | |
toUpdate: ToUpdate, | |
durationMS: Long, | |
count: Option[Long] = None, | |
ignore: Set[Long], | |
metaId: Long, | |
appUrl: String, | |
qAdapterRegistry: QAdapterRegistry | |
) extends AssembleProfiler with LazyLogging { | |
lazy val decoder: DepRequest => AnyOrigProtocol.AnyOrig = AnyAdapter.encode(qAdapterRegistry) | |
def createSerialJoiningProfiling(localOpt: Option[Context]): SerialJoiningProfiling = | |
localOpt match { | |
case Some(local) ⇒ | |
val isStopped = ByPK(classOf[StopProfile]).of(local).nonEmpty | |
if (!isStopped) { | |
val writeTo = ProfileSettings.of(local) | |
val writeJoiners = writeTo.exists(_._2) | |
val inputMeta = InputMetaKey.of(local) | |
if (inputMeta.exists(_.url.endsWith(appUrl))) | |
NoneSerialJoiningProfiling | |
else { | |
val stack: List[SrcId] = Nil //Thread.currentThread.getStackTrace.toList.take(5).map(_.toString) | |
val sessionKey: SrcId = CurrentSessionKey.of(local) | |
val pathKey = CurrentPathKey.of(local) | |
val txNumber = WriteModelKey.of(local).count(_.valueTypeId == metaId) | |
val txMeta = TxTransformOrigMetaKey.of(local) | |
//val stat = ByPK(classOf[UpdatesProfileTotal]).of(local).getOrElse("main", UpdatesProfileTotal("main", 0, 0, 0)) | |
DepSerialJoiningProfiling( | |
sessionKey, | |
stack, | |
System.currentTimeMillis(), | |
System.nanoTime(), | |
Nil, | |
pathKey, | |
inputMeta, | |
txNumber, | |
txMeta, | |
writeTo.map(_._1), | |
writeJoiners | |
) | |
} | |
} else { | |
NoneSerialJoiningProfiling | |
} | |
case None ⇒ ConsoleSerialJoiningProfiling(durationMS, count, 0) | |
} | |
def addMeta(profiling: SerialJoiningProfiling, updates: immutable.Seq[QProtocol.Update]): immutable.Seq[QProtocol.Update] = { | |
profiling match { | |
case ConsoleSerialJoiningProfiling(_, _, _) ⇒ | |
updates | |
case DepSerialJoiningProfiling(sessionId, _, startTimeMillis, startTimeNano, joinStack, pathKey, inputMetaOpt, txCount, txMeta, to, _) ⇒ | |
if (updates.map(_.valueTypeId).forall(ignore)) { | |
updates | |
} else { | |
val srcId = if (to.isDefined) to.get else RandomSrcId() | |
val meta = DepAssembleProfilerMeta( | |
srcId, | |
threadName, | |
txCount, | |
startTimeMillis, | |
System.nanoTime() - startTimeNano, | |
sessionId, | |
joinStack, | |
pathKey, | |
updates.map(c ⇒ if (ignore.apply(c.valueTypeId)) c.copy(value = ByteString.EMPTY) else c).toList, | |
inputMetaOpt.map(c ⇒ InputMetaProto(c.userName, c.caption, c.path, c.action, c.sessionId, c.url, c.body)), | |
txMeta.map(meta ⇒ decoder(meta.orig)) | |
) | |
val txRef = TxRef(srcId, "") | |
(meta :: txRef :: Nil).flatMap(LEvent.update).map(toUpdate.toUpdate) ++ updates | |
} | |
case _ ⇒ updates | |
} | |
} | |
} | |
case class DepSerialJoiningProfiling( | |
sessionId: String, | |
stackTrace: List[String], | |
startTimeMillis: Long, | |
startTimeNano: Long, | |
joinStack: List[JoinLogEntry] = Nil, | |
uiPath: String, | |
inputMetaOpt: Option[InputMeta], | |
txNumber: Int, | |
txMeta: List[OrigMetaAttr], | |
writeTo: Option[String], | |
profileJoiners: Boolean | |
) extends SerialJoiningProfiling { | |
def time: Long = if (profileJoiners) System.nanoTime else 0L | |
def handle( | |
join: Join, | |
calcStart: Long, | |
findChangesStart: Long, | |
patchStart: Long, | |
joinRes: DPIterable[Index], | |
transition: WorldTransition | |
): WorldTransition = | |
if (profileJoiners) | |
transition.copy(profiling = copy(joinStack = JoinLogEntry(join.name, join.assembleName, joinRes.size, (time - calcStart) / 10000) :: joinStack)) | |
else | |
transition | |
} | |
/*NoGen*/ trait ProfileLogger { | |
lazy val logger: Logger = Logger("Profile") | |
} | |
case class ConsoleSerialJoiningProfiling(durationMS: Long, count: Option[Long] = None, soFar: Long) extends SerialJoiningProfiling with ProfileLogger { | |
def time: Long = System.nanoTime | |
def handle( | |
join: Join, | |
calcStart: Long, | |
findChangesStart: Long, | |
patchStart: Long, | |
joinRes: DPIterable[Index], | |
transition: WorldTransition | |
): WorldTransition = { | |
val timeNano: Long = (System.nanoTime - calcStart) / 10000 | |
val timeFront: Double = timeNano / 100.0 | |
val countT = joinRes.size | |
if (timeFront > durationMS && count.exists(_ < countT)) | |
if (soFar < 100000) | |
logger.info(s"${join.assembleName} rule ${join.name} ${getColoredCount(countT)} items for ${getColoredPeriod(timeFront)} ms") | |
else | |
logger.info(s"${join.assembleName} rule ${join.name} ${getColoredCount(countT)} items for ${getColoredPeriod(timeFront)} ms, so far ${soFar / 100.0} ms total") | |
transition.copy(profiling = copy(soFar = soFar + timeNano)) | |
} | |
def getColoredPeriod: Double ⇒ String = { | |
case i if i < 200 ⇒ ColoredString("g")(i.toString) | |
case i if i >= 200 && i < 500 ⇒ ColoredString("y")(i.toString) | |
case i if i >= 500 ⇒ ColoredString("r")(i.toString) | |
} | |
def getColoredCount: Int ⇒ String = { | |
case i if i < 100 ⇒ ColoredString("g")(i.toString) | |
case i if i >= 100 && i < 1000 ⇒ ColoredString("y")(i.toString) | |
case i if i >= 1000 ⇒ ColoredString("r")(i.toString) | |
} | |
} | |
case object NoneSerialJoiningProfiling extends SerialJoiningProfiling { | |
def time: Long = 0L | |
def handle(join: Join, calcStart: Long, findChangesStart: Long, patchStart: Long, joinRes: DPIterable[Index], transition: WorldTransition): WorldTransition = transition | |
} | |
case class TotalTransactions( | |
threadId: SrcId, | |
transactionsIds: List[SrcId] | |
) { | |
val keepIds: Set[String] = transactionsIds.toSet | |
} | |
case class TransactionsIdContainer(srcId: SrcId) | |
case class TransactionProfile( | |
transactionId: SrcId, | |
threadId: SrcId, | |
startedAt: Long, | |
profiles: List[ProfileMetaWithTransactionId], | |
setting: Option[UpdatesViewSetting], | |
trType: String | |
) { | |
val keepIds: Set[String] = profiles.map(_.meta.srcId).toSet | |
} | |
case class RichPauseProfile( | |
pauseProfile: PauseProfile | |
) { | |
val pauseSet: Set[String] = pauseProfile.toKeep.toSet | |
} | |
case class ProfileMetaWithTransactionId( | |
transactionId: SrcId, | |
meta: DepAssembleProfilerMeta, | |
txRef: TxRef | |
) | |
case class KillProfiles(srcId: SrcId, profiles: List[TransactionProfile]) extends TxTransform { | |
def transform(local: Context): Context = | |
TxAdd(profiles.flatMap(t ⇒ t.profiles.flatMap(p ⇒ | |
p.meta :: p.txRef :: Nil | |
) ::: t.setting.toList | |
).flatMap(LEvent.delete) | |
)(local) | |
} | |
case class KillProfilesStop(srcId: SrcId) extends TxTransform { | |
def transform(local: Context): Context = { | |
val settings: Iterable[UpdatesViewSetting] = ByPK(classOf[UpdatesViewSetting]).of(local).values | |
val meta: Iterable[DepAssembleProfilerMeta] = ByPK(classOf[DepAssembleProfilerMeta]).of(local).values | |
val txRef: Iterable[TxRef] = ByPK(classOf[TxRef]).of(local).values | |
val stops: Iterable[StopProfile] = ByPK(classOf[StopProfile]).of(local).values | |
val events = | |
(settings ++ meta ++ txRef).toList.flatMap(LEvent.delete) ++ | |
stops.map(_.copy(completedStop = true)).toList.flatMap(LEvent.update) | |
TxAdd(events)(local) | |
} | |
} | |
@assemble class DepAssembleProfilerAssemble(qAdapterRegistry: QAdapterRegistry) extends Assemble with LazyLogging { | |
type ThreadId = SrcId | |
type TransactionId = SrcId | |
type KillMe = SrcId | |
type PauseAll = All | |
type StopAll = All | |
/* def RoundStats( | |
srcId: SrcId, | |
stat: Each[UpdatesProfileTotal] | |
): Values[(SrcId, UpdatesProfileTotalRounded)] = | |
WithPKList(UpdatesProfileTotalRounded(stat.srcId, stat.ui / 10 * 10, stat.time / 10 * 10, stat.business / 10 * 10))*/ | |
def PauseToAll( | |
srcId: SrcId, | |
pause: Each[PauseProfile] | |
): Values[(PauseAll, RichPauseProfile)] = | |
WithAll(RichPauseProfile(pause)) :: Nil | |
def StopToAll( | |
srcId: SrcId, | |
stop: Each[StopProfile] | |
): Values[(StopAll, StopProfile)] = | |
WithAll(stop) :: Nil | |
def CollectProfiles( | |
profileId: SrcId, | |
@by[StopAll] stops: Values[StopProfile], | |
profile: Each[DepAssembleProfilerMeta], | |
txRef: Each[TxRef] | |
): Values[(TransactionId, ProfileMetaWithTransactionId)] = | |
WithPKIf(stops.isEmpty, ProfileMetaWithTransactionId(txRef.txId, profile, txRef)) | |
def CreateTotalUpdatesProfile( | |
transactionId: SrcId, | |
settings: Values[UpdatesViewSetting], | |
@by[TransactionId] profiles: Values[ProfileMetaWithTransactionId] | |
): Values[(SrcId, TransactionProfile)] = | |
if (profiles.isEmpty) Nil | |
else | |
WithPKList( | |
TransactionProfile( | |
transactionId, | |
profiles.head.meta.threadId, | |
profiles.map(_.meta.startedAtMillis).min, | |
profiles.toList.sortBy(_.meta.txAddNumber), | |
settings.headOption, | |
if (profiles.exists(_.meta.inputMeta.isDefined)) "user" else "bg" | |
) | |
) | |
def TransactionProfileToThread( | |
transaction: SrcId, | |
transactionProfile: Each[TransactionProfile] | |
): Values[(ThreadId, TransactionProfile)] = | |
(transactionProfile.threadId → transactionProfile) :: Nil | |
def TotalTransactionsCreation( | |
threadId: SrcId, | |
filters: Values[ProfileFilters], | |
@by[StopAll] stops: Values[StopProfile], | |
@by[PauseAll] pauses: Values[RichPauseProfile], | |
@by[ThreadId] transactionProfiles: Values[TransactionProfile] | |
): Values[(SrcId, TotalTransactions)] = { | |
val filter: TransactionProfile ⇒ Boolean = pr ⇒ filters.headOption.isEmpty || filters.headOption.get.dd.forall(_ == pr.trType) | |
if (stops.nonEmpty || transactionProfiles.isEmpty) | |
Nil | |
else | |
pauses.toList match { | |
case Nil ⇒ | |
WithPKList( | |
TotalTransactions( | |
threadId, | |
transactionProfiles.filter(filter).sortBy(_.transactionId).reverse.take(30).map(_.transactionId).toList | |
) | |
) | |
case a :: _ ⇒ | |
WithPKList( | |
TotalTransactions( | |
threadId, | |
transactionProfiles.filter(filter).sortBy(_.transactionId).reverse.map(_.transactionId).filter(a.pauseSet).toList | |
) | |
) | |
} | |
} | |
type DepLive = SrcId | |
def KeepAlive( | |
srcId: SrcId, | |
a: Each[TotalTransactions] | |
): Values[(DepLive, TransactionsIdContainer)] = | |
for { | |
id ← a.transactionsIds | |
} yield WithPK(TransactionsIdContainer(id)) | |
def KillOffDepAssembleProfilerMeta( | |
threadId: SrcId, | |
pr: Each[TransactionProfile], | |
@by[DepLive] a: Values[TransactionsIdContainer] | |
): Values[(KillMe, TransactionProfile)] = | |
if (a.isEmpty) { | |
(pr.threadId → pr) :: Nil | |
} else Nil | |
def CreateKiller( | |
threadId: SrcId, | |
@by[KillMe] profiles: Values[TransactionProfile], | |
@by[StopAll] stops: Values[StopProfile] | |
): Values[(SrcId, TxTransform)] = | |
if (stops.isEmpty) | |
ListIf(profiles.size > 100, WithPKList(KillProfiles(threadId, profiles.toList))) | |
else | |
WithPKList(KillProfiles(threadId, profiles.toList)) | |
def KillIfStop( | |
threadId: SrcId, | |
firstBorn: Each[Firstborn], | |
@by[StopAll] stops: Values[StopProfile] | |
): Values[(SrcId, TxTransform)] = | |
stops.toList match { | |
case a :: Nil if !a.completedStop ⇒ WithPKList(KillProfilesStop(a.srcId)) | |
case _ ⇒ Nil | |
} | |
} | |
/*def SimplifyUpdates( | |
key: SrcId, | |
@by[PauseAll] pauses: Values[RichPauseProfile], | |
@by[StopAll] stops: Values[StopProfile], | |
updates: Each[DepAssembleProfilerMeta], | |
viewSettings: Values[UpdatesViewSetting] | |
): Values[(CollectiveId, UpdateProfile)] = | |
if (stops.isEmpty) | |
pauses.headOption match { | |
case None ⇒ | |
/*val stup: Map[Long, Int] = updates.updates.groupBy(_.valueTypeId).transform{case (k, v) ⇒ v.map(_.value.size()).sum} | |
logger.info(key) | |
for {i ← stup} yield { | |
logger.info(i.toString()) | |
}*/ | |
/*val isTime = updates.updates.map(_.valueTypeId).forall(timeIds) | |
val isUi = updates.updates.map(_.valueTypeId).forall(uiIds) | |
logger.info((key,isTime, isUi).toString()) | |
val time = updates.updates.find(_.valueTypeId == timeAdapter.id).map(u ⇒ timeAdapter.decode(u.value).currentTimeSeconds).getOrElse(0L) | |
logger.info((key, time, updates.updates.map(_.valueTypeId).distinct).toString)*/ | |
val updateIds = updates.updates.map(_.valueTypeId).distinct | |
val metaProper: List[DepAssembleProfilerMeta] = updates.updates.filter(u ⇒ u.valueTypeId == metaAdapter.id).map(u ⇒ metaAdapter.decode(u.value)) | |
ListIf(!updateIds.exists(ignoreIds) && metaProper.nonEmpty, | |
List(collectiveId → UpdateProfile( | |
updates.srcId, | |
updates.updates.size, | |
updates.updates.map(_.value.size).sum, | |
updateIds, | |
metaProper, | |
viewSettings.headOption | |
) | |
) | |
) | |
case Some(pause) if pause.pauseSet.contains(updates.srcId) ⇒ | |
val updateIds = updates.updates.map(_.valueTypeId).distinct | |
val metaProper: List[DepAssembleProfilerMeta] = updates.updates.filter(u ⇒ u.valueTypeId == metaAdapter.id).map(u ⇒ metaAdapter.decode(u.value)) | |
ListIf(!updateIds.exists(ignoreIds) && metaProper.nonEmpty, | |
List(collectiveId → UpdateProfile( | |
updates.srcId, | |
updates.updates.size, | |
updates.updates.map(_.value.size).sum, | |
updateIds, | |
metaProper, | |
viewSettings.headOption | |
) | |
) | |
) | |
case _ ⇒ Nil | |
} | |
else | |
Nil | |
def CollectProfiles( | |
key: SrcId, | |
@by[CollectiveId] updatesSummary: Values[UpdateProfile] | |
): Values[(SrcId, UpdatesProfile)] = { | |
@tailrec def headToKeep(res: UpdatesProfile, in: List[UpdateProfile]): UpdatesProfile = | |
if (in.isEmpty) res else { | |
val will = UpdatesProfile( | |
res.srcId, | |
in.head :: res.items, | |
res.txCount + 1, | |
res.objCount + in.head.objCount, | |
res.byteCount + in.head.byteCount | |
) | |
if (will.txCount > 30 || | |
will.objCount > 10000 || | |
will.byteCount > 100000000 | |
) res | |
else headToKeep(will, in.tail) | |
} | |
List(WithPK(headToKeep( | |
UpdatesProfile(key, Nil, 0L, 0L, 0L), | |
updatesSummary.filterNot(_.valueTypeIds.forall(skipIds)) | |
.sortBy(_.srcId).toList.reverse | |
) | |
) | |
) | |
} | |
def KeepAliveUpdates( | |
key: SrcId, | |
stops: Values[StopProfile], | |
pauses: Values[PauseProfile], | |
updatesListSummary: Each[UpdatesProfile] | |
): Values[(SrcId, KeepUpdates)] = | |
if (stops.isEmpty) | |
if (pauses.isEmpty) | |
for { | |
item ← updatesListSummary.items | |
} yield WithPK(KeepUpdates(item.srcId)) | |
else | |
pauses.head.toKeep.map(id ⇒ WithPK(KeepUpdates(id))) | |
else | |
Nil*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment