Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active May 7, 2023 15:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/22b5389155471058438024c0918441fc to your computer and use it in GitHub Desktop.
Save dacr/22b5389155471058438024c0918441fc to your computer and use it in GitHub Desktop.
Generate stock time series / published by https://github.com/dacr/code-examples-manager #e10d61c2-2916-43ed-a9db-af50e9ed1665/7d538ba91faf74196fc19e2943fb07ed2191455
// summary : Generate stock time series
// keywords : scala, elasticsearch, feed, bigdata, fractal, mandelbrot
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : e10d61c2-2916-43ed-a9db-af50e9ed1665
// created-on : 2019-11-24T15:53:31Z
// managed-by : https://github.com/dacr/code-examples-manager
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
import $ivy.`com.sksamuel.elastic4s::elastic4s-core:7.3.1`
import $ivy.`com.sksamuel.elastic4s::elastic4s-client-esjava:7.3.1`
import $ivy.`com.sksamuel.elastic4s::elastic4s-json-json4s:7.3.1`
import $ivy.`org.json4s::json4s-native:3.6.7`
import $ivy.`org.json4s::json4s-ext:3.6.7`
import org.json4s.{DefaultFormats, native}
import org.json4s.ext.JavaTimeSerializers
import java.time.{Instant, OffsetDateTime, ZoneId, ZoneOffset}
import java.time.format.DateTimeFormatter
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, Response}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.json4s.ElasticJson4s.Implicits._
import com.sksamuel.elastic4s.requests.mappings._
import com.sksamuel.elastic4s.requests.mappings.FieldType._
import scala.concurrent._
import scala.util.Properties.{envOrNone, propOrNone}
import scala.concurrent.duration._
import com.sksamuel.elastic4s.requests.count.CountResponse
import com.sksamuel.elastic4s.requests.indexes.admin.DeleteIndexResponse
import com.sksamuel.elastic4s.requests.indexes.admin.RefreshIndexResponse
class Generator(elasticEndPoints:String) {
import scala.concurrent.ExecutionContext.Implicits.global
implicit val serialization = native.Serialization
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all
val indexBaseName = "stocks-"
val indexNamePattern = "stocks-*"
val mappingName = "stocks-mapping"
// Customize the default configuration, we're going to insert a huge amount of data in a unclean but fast way
val client = ElasticClient(JavaClient(ElasticProperties(elasticEndPoints)))
def now(): Long = System.currentTimeMillis()
def doClean(): Future[Response[DeleteIndexResponse]] = {
client.execute {
deleteIndex(indexNamePattern)
}
}
def doRefresh(): Future[Response[RefreshIndexResponse]] = {
client.execute {
refreshIndex(indexNamePattern)
}
}
def doCount(): Future[Response[CountResponse]] = {
client.execute {
count(indexNamePattern)
}
}
val dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("Europe/Paris"))
def insertBulk(entries: Seq[StockEntry]) = client.execute {
bulk {
for {entry <- entries} yield {
val dateSuffix = dateFormat.format(entry.timestamp)
val indexName = indexBaseName + "-" + dateSuffix
indexInto(indexName).doc(entry)
}
}
}
def doCreateIndexMapping() = client.execute {
createIndexTemplate(mappingName, indexNamePattern).mappings(
properties() as Seq(
dateField("timestamp"),
keywordField("name"),
doubleField("value"),
)
)
}
// 2019-11-18 18:53:48,864
val timestampFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS").withZone(ZoneId.of("Europe/Paris"))
case class StockEntry(
timestamp:OffsetDateTime,
name:String,
value: Double
)
def generate(fromDate:OffsetDateTime, toDate:OffsetDateTime, stockNames:List[String], dailyTransactionsCount:Int) {
val startedAt = now()
def writeData(): Future[Unit] = Future {
val iterator = logsDir.list(_.extension == Some(".log")).toSeq.flatMap(_.lineIterator).filter(_.contains(" AccessLog:"))
val groupedIterator = iterator.grouped(100)
val parallelismLevel = 10
while (groupedIterator.hasNext) {
print("*" * parallelismLevel)
val groups = groupedIterator.take(parallelismLevel).map { group =>
insertBulk(group.flatMap(parseAccessLogEntry))
}
Future.sequence(groups).await
}
}
val futureResponse = for {
cleaned <- doClean() // delete any existing indexBaseName
created <- doCreateIndexMapping() // create the right index pattern
responses <- writeData() // bulk operation insert all events
refreshed <- doRefresh()
count <- doCount()
} yield {
count
}
Await.result(futureResponse, 30.minutes) // because we don't want to exit the script before the future has completed
futureResponse map { countResponse =>
val duration = (now() - startedAt) / 1000
println(s"$countResponse documents inserted in $duration seconds")
}
}
}
@main
def main(elasticEndPoints:String = s"http://127.0.0.1:9201"): Unit = {
def envOrPropOrNone(key: String): Option[String] = {
envOrNone(key).orElse(propOrNone(key))
}
val stockToGenerateCount=3
val names = 'A'.to('Z').combinations(3).map(_.mkString).toList.take(stockToGenerateCount)
new Generator(elasticEndPoints).generate(
fromDate = OffsetDateTime.parse("2009-01-01T00:00:00.000Z"),
toDate = OffsetDateTime.parse("2019-11-24T23:59:59.999Z"),
stockNames = names,
dailyTransactionsCount = 100
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment