Skip to content

Instantly share code, notes, and snippets.

@nguyenvietyen
Last active April 26, 2017 15:13
Show Gist options
  • Save nguyenvietyen/0a2fdfabe201bb4dd6474fcf65d079f5 to your computer and use it in GitHub Desktop.
Save nguyenvietyen/0a2fdfabe201bb4dd6474fcf65d079f5 to your computer and use it in GitHub Desktop.
package com.hypefactors.mediaflow
import com.sksamuel.elastic4s.ElasticDsl._
import com.sksamuel.elastic4s.ElasticDsl.{bulk, indexInto}
import com.sksamuel.elastic4s.{ElasticsearchClientUri, TcpClient}
import com.sksamuel.elastic4s.xpack.security.XPackElasticClient
import org.apache.beam.sdk.transforms.DoFn._
import org.apache.beam.sdk.transforms.{DoFn, PTransform, ParDo}
import org.apache.beam.sdk.values.{PCollection, PDone}
import org.elasticsearch.common.settings.Settings
import scala.concurrent.Await
import scala.concurrent._
import scala.concurrent.duration._
import collection.JavaConverters._
/**
* Created by nguyen on 4/26/17.
*/
class ElasticSearchWrite(val settings: Map[String, Any],
val uri: ElasticsearchClientUri,
val indexFn: Map[String, Any] => String,
val typeFn: Map[String, Any] => String,
val idFnOption: Option[Map[String, Any] => String] = None
) extends PTransform[PCollection[Map[String, Any]], PDone] {
override def expand(input: PCollection[Map[String, Any]]): PDone = {
input(ParDo.of(new ElasticSearchWriteFn(this)))
PDone.in(input.getPipeline)
}
}
class ElasticSearchWriteFn(spec: ElasticSearchWrite) extends DoFn[Map[String, Any], Void] {
private val items = collection.mutable.MutableList[Map[String, Any]]()
@ProcessElement
def processElement(context: ProcessContext): Unit = items += context.element()
@FinishBundle
def finishBundle(context: Context): Unit = {
val settings = Settings.builder().put(spec.settings.asJava).build()
val client = XPackElasticClient(settings, spec.uri)
val indexDefinitions = items.toList.map(item => {
val indexName = spec.indexFn(item)
val typeName = spec.typeFn(item)
val indexingOp = indexInto(indexName, typeName).fields(item)
val indexingWithOptionalId = spec.idFnOption
.map(idFn => indexingOp.id(idFn(item)))
.getOrElse(indexingOp)
indexingWithOptionalId
})
val bulkIndexDefinitions = bulk(indexDefinitions)
// NOTE: elastic4s 5.3.0 throws NoNodeException if authentication fails. This can be confusing.
client.execute(bulkIndexDefinitions).await(Duration(5, MINUTES))
client.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment