Skip to content

Instantly share code, notes, and snippets.

@santail
Created May 24, 2012 08:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save santail/2780170 to your computer and use it in GitHub Desktop.
Save santail/2780170 to your computer and use it in GitHub Desktop.
import org.elasticsearch.common.logging.ESLogger
import org.elasticsearch.action.admin.indices.exists.IndicesExistsRequest
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.xcontent.XContentFactory._
import org.elasticsearch.action.bulk.{BulkResponse, BulkRequestBuilder}
import org.elasticsearch.action.ActionListener
import java.util.concurrent.atomic.AtomicInteger
import org.elasticsearch.river.RiverSettings
import org.elasticsearch.common.xcontent.support.XContentMapValues._
import org.elasticsearch.common.xcontent.{XContentFactory, XContentBuilder}
import actors.Actor
import org.elasticsearch.client.{Requests, Client}
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.search.sort.SortOrder
import org.elasticsearch.search.SearchHit
import java.sql.{SQLException, DriverManager, ResultSet, Connection, Statement}
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.transport.TransportAddress
import org.elasticsearch.node.NodeBuilder
import org.elasticsearch.node.internal.InternalNode
import org.elasticsearch.transport.TransportService
import org.elasticsearch.common.settings.ImmutableSettings
class OracleJdbcIndexer(client: Client, logger: ESLogger, settings: RiverSettings, riverName: String) extends Runnable
with ConnectionHandler with QueryExecutor {
private val bulkSize = nodeIntegerValue(settings.settings.get("bulkSize"), 10000)
private val maxBulksProcessedSimultaneously: Int = nodeIntegerValue(settings.settings.get("maxBulksProcessedSimultaneously"), 50)
private val onGoingBulks: AtomicInteger = new AtomicInteger
private val id: String = nodeStringValue(settings.settings.get("id"), "id")
private val indexName: String = nodeStringValue(settings.settings.get("index"), "index")
private val docType: String = nodeStringValue(settings.settings.get("docType"), "docType")
private val nextExecutionInSeconds: Int = nodeIntegerValue(settings.settings.get("nextExecutionInSeconds"), 0)
private val deltaQuery: String = nodeStringValue(settings.settings.get("deltaQuery"), null)
private val modifiedDateColumnName: String = nodeStringValue(settings.settings.get("modifiedDateColumnName"), "change_date")
private val elasticClient: ElasticSearchClient = new ElasticSearchClient(indexName, docType, client)
private val indexer: Actor = new Indexer
private val digger: Actor = new DataDigger
private var stopped: Boolean = false
def getSettings: RiverSettings = settings
def run() {
elasticClient.ensureIndexExists()
elasticClient.ensureMappingExists(mappingBuilder)
digger.start()
indexer.start()
}
def stop() {
stopped = true
}
private def logTimeTaken(fetchStart: Long, count: Int, indexingStart: Long) {
val took: Long = System.currentTimeMillis - indexingStart
val totalTime: Long = took / 1000
val speedPerSec: String = if (totalTime == 0) "" + count else "" + count / (took / 1000)
logger.info("Total time for this run was " + (System.currentTimeMillis - fetchStart) + "ms. Indexing took " + (took) + "ms. average indexing speed was " + speedPerSec + " documents/s. Total number of indexed documents: " + count)
}
class DataDigger extends Actor {
def act() {
sendIndexRequests(nodeStringValue(settings.settings.get("query"), null))
if (deltaQuery == null || nextExecutionInSeconds <= 0) {
exit()
}
loopWhile {!stopped} {
Thread.sleep(1000 * nextExecutionInSeconds)
if(!stopped) {
val changeDate: String = elasticClient.findLastModifiedDate(modifiedDateColumnName)
if (changeDate != null) sendIndexRequests(deltaQuery.replace("?", changeDate))
}
}
}
def sendIndexRequests(query: String) {
if (query == null) {
throw new IllegalArgumentException("no query given!")
}
val conn: Connection = getConnection
val fetchStart: Long = System.currentTimeMillis
logger.info("start database fetch")
val rs: ResultSet = getRowsToIndex(conn, query)
logger.info("database rows fetched, it took " + (System.currentTimeMillis - fetchStart) + "ms.\nStart indexing operation.")
val indexingStart: Long = System.currentTimeMillis
val columnNames: List[String] = getColumnNames(rs)
var counter = 0
while (rs.next()) {
indexer ! createIndexRequest(rs, columnNames)
counter += 1
}
indexer ! "stop"
logTimeTaken(fetchStart, counter, indexingStart)
rs.close()
conn.close()
}
}
class Indexer extends Actor {
def act() {
loopWhile{!stopped} {
processData()
}
}
def processData() {
var currentBulk: BulkRequestBuilder = createBulk()
while (currentBulk.numberOfActions > 0) {
currentBulk.execute(new ActionListener[BulkResponse] {
def onResponse(bulkResponse: BulkResponse) {
onGoingBulks.decrementAndGet()
}
def onFailure(e: Throwable) {
logger.warn("failed to process bulk")
}
})
if (onGoingBulks.incrementAndGet > maxBulksProcessedSimultaneously) {
logger.info("too many ongoing bulks! waiting 1s")
Thread.sleep(1000)
}
currentBulk = createBulk()
}
}
def createBulk(): BulkRequestBuilder = {
val currentBulk: BulkRequestBuilder = client.prepareBulk
while (currentBulk.numberOfActions < bulkSize) {
? match {
case request: IndexRequest => currentBulk.add(request)
case _ => return currentBulk
}
}
currentBulk
}
}
def mappingBuilder(): XContentBuilder = {
import scala.collection.JavaConversions._
import java.util.Map
val mappings: Map[String, Map[String, String]] = settings.settings.get("mappings").asInstanceOf[Map[String, Map[String, String]]]
val builder: XContentBuilder = XContentFactory.jsonBuilder.startObject.startObject(docType).startObject("properties")
for (val e <- mappings.entrySet()) {
val property: XContentBuilder = builder.startObject(e.getKey)
for (val param <- e.getValue.entrySet) {
property.field(param.getKey, param.getValue)
}
property.endObject()
}
builder.endObject.endObject.endObject
}
def createIndexRequest(rs: ResultSet, columnNames: List[String]): IndexRequest = {
Requests.indexRequest(indexName).`type`(docType).id(rs.getString(id)).source(transformToJSON(rs, columnNames))
}
private def transformToJSON(rs: ResultSet, columnNames: List[String]): XContentBuilder = {
val builder: XContentBuilder = jsonBuilder.startObject
columnNames.foreach(columnName => builder.field(columnName.toLowerCase, rs.getString(columnName)))
builder.endObject
}
}
trait QueryExecutor {
def getRowsToIndex(conn: Connection, query: String): ResultSet = {
val stmt: Statement = conn.createStatement
stmt.setFetchSize(15000)
stmt.executeQuery(query)
}
def getColumnNames(rs: ResultSet): List[String] = {
var columnNames: List[String] = List[String]()
for (i <- 1 to rs.getMetaData.getColumnCount) {
columnNames = columnNames :+ rs.getMetaData.getColumnName(i)
}
columnNames
}
}
trait ConnectionHandler {
def getConnection: Connection = {
try {
val dbUrl: String = nodeStringValue(getSettings.settings().get("dbUrl"), null)
if (dbUrl == null) {
throw new RuntimeException("please provide dbUrl in river request.")
}
Class.forName(nodeStringValue(getSettings.settings().get("dbDriver"), "oracle.jdbc.driver.OracleDriver"))
val connection: Connection = DriverManager.getConnection(dbUrl)
connection.setAutoCommit(false)
connection
}
catch {
case ignored: SQLException => null
}
}
def getSettings: RiverSettings
}
trait TransportClientsProvider {
def getTransportClient: TransportClient = {
val transportAddress: TransportAddress = NodeBuilder.nodeBuilder().node()
.asInstanceOf[InternalNode].injector().getInstance(classOf[TransportService]).boundAddress.publishAddress
new TransportClient(ImmutableSettings.settingsBuilder().build())
.addTransportAddress(transportAddress)
}
}
class ElasticSearchClient(index: String, docType: String, client: Client) {
def ensureIndexExists() {
if (!isIndexExists) {
createIndex()
}
}
def isIndexExists: Boolean = {
client.admin.indices.exists(new IndicesExistsRequest(index)).actionGet.exists
}
def ensureMappingExists(buildMapping: () => XContentBuilder) {
if (!isMappingExists) {
createMapping(buildMapping())
}
}
def isMappingExists: Boolean = {
val cs: ClusterState = client.admin.cluster.prepareState.setFilterIndices(index).execute.actionGet.getState
val imd: IndexMetaData = cs.getMetaData.index(index)
imd != null && imd.mapping(docType) != null && imd.mapping(docType).`type` == docType
}
def createMapping(mappingBuilder: XContentBuilder) {
client.admin.indices.preparePutMapping(index).setType(docType).setSource(mappingBuilder).execute.actionGet
}
def createIndex() {
client.admin.indices.prepareCreate(index).execute.actionGet
}
def findLastModifiedDate(modifiedDateColumnName: String): String = {
val hits: Array[SearchHit] = client.prepareSearch(index)
.addField(modifiedDateColumnName).setSize(1).addSort(modifiedDateColumnName, SortOrder.DESC)
.setQuery(QueryBuilders.matchAllQuery()).execute.actionGet.hits.hits
if (hits.length == 0) null else hits(0).field(modifiedDateColumnName).value()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment