Last active May 7, 2023 15:45
// summary : Generate stock time series
// keywords : scala, elasticsearch, feed, bigdata, fractal, mandelbrot
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (
// id : e10d61c2-2916-43ed-a9db-af50e9ed1665
// created-on : 2019-11-24T15:53:31Z
// managed-by :
// execution : scala ammonite script ( - run as follow 'amm'
import $ivy.`com.sksamuel.elastic4s::elastic4s-core:7.3.1`
import $ivy.`com.sksamuel.elastic4s::elastic4s-client-esjava:7.3.1`
import $ivy.`com.sksamuel.elastic4s::elastic4s-json-json4s:7.3.1`
import $ivy.`org.json4s::json4s-native:3.6.7`
import $ivy.`org.json4s::json4s-ext:3.6.7`
import org.json4s.{DefaultFormats, native}
import org.json4s.ext.JavaTimeSerializers
import java.time.{Instant, OffsetDateTime, ZoneId, ZoneOffset}
import java.time.format.DateTimeFormatter
import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties, Response}
import com.sksamuel.elastic4s.http.JavaClient
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.json4s.ElasticJson4s.Implicits._
import com.sksamuel.elastic4s.requests.mappings._
import com.sksamuel.elastic4s.requests.mappings.FieldType._
import scala.concurrent._
import scala.util.Properties.{envOrNone, propOrNone}
import scala.concurrent.duration._
import com.sksamuel.elastic4s.requests.count.CountResponse
import com.sksamuel.elastic4s.requests.indexes.admin.DeleteIndexResponse
import com.sksamuel.elastic4s.requests.indexes.admin.RefreshIndexResponse
class Generator(elasticEndPoints:String) {
implicit val serialization = native.Serialization
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all
val indexBaseName = "stocks-"
val indexNamePattern = "stocks-*"
val mappingName = "stocks-mapping"
// Customize the default configuration, we're going to insert a huge amount of data in a unclean but fast way
val client = ElasticClient(JavaClient(ElasticProperties(elasticEndPoints)))
def now(): Long = System.currentTimeMillis()
def doClean(): Future[Response[DeleteIndexResponse]] = {
client.execute {
def doRefresh(): Future[Response[RefreshIndexResponse]] = {
client.execute {
def doCount(): Future[Response[CountResponse]] = {
client.execute {
val dateFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.of("Europe/Paris"))
def insertBulk(entries: Seq[StockEntry]) = client.execute {
bulk {
for {entry <- entries} yield {
val dateSuffix = dateFormat.format(entry.timestamp)
val indexName = indexBaseName + "-" + dateSuffix
def doCreateIndexMapping() = client.execute {
createIndexTemplate(mappingName, indexNamePattern).mappings(
properties() as Seq(
// 2019-11-18 18:53:48,864
val timestampFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss,SSS").withZone(ZoneId.of("Europe/Paris"))
case class StockEntry(
value: Double
def generate(fromDate:OffsetDateTime, toDate:OffsetDateTime, stockNames:List[String], dailyTransactionsCount:Int) {
val startedAt = now()
def writeData(): Future[Unit] = Future {
val iterator = logsDir.list(_.extension == Some(".log")).toSeq.flatMap(_.lineIterator).filter(_.contains(" AccessLog:"))
val groupedIterator = iterator.grouped(100)
val parallelismLevel = 10
while (groupedIterator.hasNext) {
print("*" * parallelismLevel)
val groups = groupedIterator.take(parallelismLevel).map { group =>
val futureResponse = for {
cleaned <- doClean() // delete any existing indexBaseName
created <- doCreateIndexMapping() // create the right index pattern
responses <- writeData() // bulk operation insert all events
refreshed <- doRefresh()
count <- doCount()
} yield {
Await.result(futureResponse, 30.minutes) // because we don't want to exit the script before the future has completed
futureResponse map { countResponse =>
val duration = (now() - startedAt) / 1000
println(s"$countResponse documents inserted in $duration seconds")
def main(elasticEndPoints:String = s""): Unit = {
def envOrPropOrNone(key: String): Option[String] = {
val stockToGenerateCount=3
val names = 'A'.to('Z').combinations(3).map(_.mkString).toList.take(stockToGenerateCount)
new Generator(elasticEndPoints).generate(
fromDate = OffsetDateTime.parse("2009-01-01T00:00:00.000Z"),
toDate = OffsetDateTime.parse("2019-11-24T23:59:59.999Z"),
stockNames = names,
dailyTransactionsCount = 100
