Skip to content

Instantly share code, notes, and snippets.

@koen-dejonghe
Created January 27, 2020 11:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save koen-dejonghe/5c0fa6793082dbca21f1a1a97853efa5 to your computer and use it in GitHub Desktop.
Save koen-dejonghe/5c0fa6793082dbca21f1a1a97853efa5 to your computer and use it in GitHub Desktop.
package akka.stream.alpakka.elasticsearch.scaladsl
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import akka.actor.Cancellable
import akka.stream.alpakka.elasticsearch.{ElasticsearchSourceSettings, ReadResult}
import akka.stream.scaladsl.{Flow, Source}
import com.typesafe.scalalogging.LazyLogging
import org.elasticsearch.client.RestClient
import spray.json.JsObject
import scala.collection.mutable
import scala.concurrent.duration._
import scala.language.postfixOps
object ElasticsearchPollingSource extends LazyLogging {
/**
* Poll an ES index and push documents downstream.
* @param indexPrefix Prefix of the index to poll. Date of the index will be appended.
* For example: prefix = "logstash-apache" will poll "logstash-apache-yyyy.MM.dd"
* where "yyyy.MM.dd" is the date when the tick occurs.
* @param typeName Type of the ES query
* @param tickInterval Interval between ticks, in seconds
* @param overlap Extra seconds to take from the last window, duplicates will be removed
* @param lag Number of seconds to allow documents to appear on the ES index
* @param bufferSize Size of the buffer used for deduplication
* @param settings Elastic Search source settings
* @param elasticsearchClient REST client
* @return Source with Spray JSON read results
*/
def create(indexPrefix: String,
typeName: String,
tickInterval: Int,
overlap: Int,
lag: Int,
bufferSize: Int,
settings: ElasticsearchSourceSettings)(
implicit elasticsearchClient: RestClient
): Source[ReadResult[JsObject], Cancellable] = {
def indexOfToday: String = {
val formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd")
val today = ZonedDateTime.now()
val date = formatter format today
s"$indexPrefix-$date"
}
def query(missedTicks: Int): String = {
val window = lag + ((tickInterval + overlap) * (1 + missedTicks))
s"""
|{
| "range" : {
| "@timestamp" : {
| "gt" : "now-${window}s/s",
| "lte" : "now-${lag}s/s"
| }
| }
|}
|""".stripMargin
}
// see https://doc.akka.io/docs/akka/current/stream/stream-cookbook.html#collecting-missed-ticks
val missedTicks =
Flow[String].conflateWithSeed(seed = _ => 0)((missedTicks, _) => {
logger.warn(s"missed ticks: ${missedTicks + 1}")
missedTicks + 1
})
Source
.tick(0 seconds, tickInterval seconds, "tick")
.via(missedTicks)
.flatMapConcat { missedTicks: Int =>
ElasticsearchSource
.create(
indexName = indexOfToday,
typeName = typeName,
query = query(missedTicks),
settings = settings
)
.map(message => (message.id, message))
}
.statefulMapConcat { () => // deduplicate
val dedupBuffer = new LtdUniQueue[String](bufferSize)
m =>
val (id, message) = m
if (dedupBuffer ?+ id) List(message) else Nil
}
}
class LtdUniQueue[A](maxSize: Int) extends mutable.LinkedHashSet[A] {
import scala.util.chaining._
def ?+(elem: A): Boolean =
add(elem) tap (added => if (added && size >= maxSize) remove(head))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment