Created
May 24, 2012 08:12
-
-
Save santail/2780170 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.elasticsearch.common.logging.ESLogger | |
import org.elasticsearch.action.admin.indices.exists.IndicesExistsRequest | |
import org.elasticsearch.cluster.ClusterState | |
import org.elasticsearch.cluster.metadata.IndexMetaData | |
import org.elasticsearch.common.xcontent.XContentFactory._ | |
import org.elasticsearch.action.bulk.{BulkResponse, BulkRequestBuilder} | |
import org.elasticsearch.action.ActionListener | |
import java.util.concurrent.atomic.AtomicInteger | |
import org.elasticsearch.river.RiverSettings | |
import org.elasticsearch.common.xcontent.support.XContentMapValues._ | |
import org.elasticsearch.common.xcontent.{XContentFactory, XContentBuilder} | |
import actors.Actor | |
import org.elasticsearch.client.{Requests, Client} | |
import org.elasticsearch.index.query.QueryBuilders | |
import org.elasticsearch.search.sort.SortOrder | |
import org.elasticsearch.search.SearchHit | |
import java.sql.{SQLException, DriverManager, ResultSet, Connection, Statement} | |
import org.elasticsearch.action.index.IndexRequest | |
import org.elasticsearch.client.transport.TransportClient | |
import org.elasticsearch.common.transport.TransportAddress | |
import org.elasticsearch.node.NodeBuilder | |
import org.elasticsearch.node.internal.InternalNode | |
import org.elasticsearch.transport.TransportService | |
import org.elasticsearch.common.settings.ImmutableSettings | |
class OracleJdbcIndexer(client: Client, logger: ESLogger, settings: RiverSettings, riverName: String) extends Runnable | |
with ConnectionHandler with QueryExecutor { | |
private val bulkSize = nodeIntegerValue(settings.settings.get("bulkSize"), 10000) | |
private val maxBulksProcessedSimultaneously: Int = nodeIntegerValue(settings.settings.get("maxBulksProcessedSimultaneously"), 50) | |
private val onGoingBulks: AtomicInteger = new AtomicInteger | |
private val id: String = nodeStringValue(settings.settings.get("id"), "id") | |
private val indexName: String = nodeStringValue(settings.settings.get("index"), "index") | |
private val docType: String = nodeStringValue(settings.settings.get("docType"), "docType") | |
private val nextExecutionInSeconds: Int = nodeIntegerValue(settings.settings.get("nextExecutionInSeconds"), 0) | |
private val deltaQuery: String = nodeStringValue(settings.settings.get("deltaQuery"), null) | |
private val modifiedDateColumnName: String = nodeStringValue(settings.settings.get("modifiedDateColumnName"), "change_date") | |
private val elasticClient: ElasticSearchClient = new ElasticSearchClient(indexName, docType, client) | |
private val indexer: Actor = new Indexer | |
private val digger: Actor = new DataDigger | |
private var stopped: Boolean = false | |
def getSettings: RiverSettings = settings | |
def run() { | |
elasticClient.ensureIndexExists() | |
elasticClient.ensureMappingExists(mappingBuilder) | |
digger.start() | |
indexer.start() | |
} | |
def stop() { | |
stopped = true | |
} | |
private def logTimeTaken(fetchStart: Long, count: Int, indexingStart: Long) { | |
val took: Long = System.currentTimeMillis - indexingStart | |
val totalTime: Long = took / 1000 | |
val speedPerSec: String = if (totalTime == 0) "" + count else "" + count / (took / 1000) | |
logger.info("Total time for this run was " + (System.currentTimeMillis - fetchStart) + "ms. Indexing took " + (took) + "ms. average indexing speed was " + speedPerSec + " documents/s. Total number of indexed documents: " + count) | |
} | |
class DataDigger extends Actor { | |
def act() { | |
sendIndexRequests(nodeStringValue(settings.settings.get("query"), null)) | |
if (deltaQuery == null || nextExecutionInSeconds <= 0) { | |
exit() | |
} | |
loopWhile {!stopped} { | |
Thread.sleep(1000 * nextExecutionInSeconds) | |
if(!stopped) { | |
val changeDate: String = elasticClient.findLastModifiedDate(modifiedDateColumnName) | |
if (changeDate != null) sendIndexRequests(deltaQuery.replace("?", changeDate)) | |
} | |
} | |
} | |
def sendIndexRequests(query: String) { | |
if (query == null) { | |
throw new IllegalArgumentException("no query given!") | |
} | |
val conn: Connection = getConnection | |
val fetchStart: Long = System.currentTimeMillis | |
logger.info("start database fetch") | |
val rs: ResultSet = getRowsToIndex(conn, query) | |
logger.info("database rows fetched, it took " + (System.currentTimeMillis - fetchStart) + "ms.\nStart indexing operation.") | |
val indexingStart: Long = System.currentTimeMillis | |
val columnNames: List[String] = getColumnNames(rs) | |
var counter = 0 | |
while (rs.next()) { | |
indexer ! createIndexRequest(rs, columnNames) | |
counter += 1 | |
} | |
indexer ! "stop" | |
logTimeTaken(fetchStart, counter, indexingStart) | |
rs.close() | |
conn.close() | |
} | |
} | |
class Indexer extends Actor { | |
def act() { | |
loopWhile{!stopped} { | |
processData() | |
} | |
} | |
def processData() { | |
var currentBulk: BulkRequestBuilder = createBulk() | |
while (currentBulk.numberOfActions > 0) { | |
currentBulk.execute(new ActionListener[BulkResponse] { | |
def onResponse(bulkResponse: BulkResponse) { | |
onGoingBulks.decrementAndGet() | |
} | |
def onFailure(e: Throwable) { | |
logger.warn("failed to process bulk") | |
} | |
}) | |
if (onGoingBulks.incrementAndGet > maxBulksProcessedSimultaneously) { | |
logger.info("too many ongoing bulks! waiting 1s") | |
Thread.sleep(1000) | |
} | |
currentBulk = createBulk() | |
} | |
} | |
def createBulk(): BulkRequestBuilder = { | |
val currentBulk: BulkRequestBuilder = client.prepareBulk | |
while (currentBulk.numberOfActions < bulkSize) { | |
? match { | |
case request: IndexRequest => currentBulk.add(request) | |
case _ => return currentBulk | |
} | |
} | |
currentBulk | |
} | |
} | |
def mappingBuilder(): XContentBuilder = { | |
import scala.collection.JavaConversions._ | |
import java.util.Map | |
val mappings: Map[String, Map[String, String]] = settings.settings.get("mappings").asInstanceOf[Map[String, Map[String, String]]] | |
val builder: XContentBuilder = XContentFactory.jsonBuilder.startObject.startObject(docType).startObject("properties") | |
for (val e <- mappings.entrySet()) { | |
val property: XContentBuilder = builder.startObject(e.getKey) | |
for (val param <- e.getValue.entrySet) { | |
property.field(param.getKey, param.getValue) | |
} | |
property.endObject() | |
} | |
builder.endObject.endObject.endObject | |
} | |
def createIndexRequest(rs: ResultSet, columnNames: List[String]): IndexRequest = { | |
Requests.indexRequest(indexName).`type`(docType).id(rs.getString(id)).source(transformToJSON(rs, columnNames)) | |
} | |
private def transformToJSON(rs: ResultSet, columnNames: List[String]): XContentBuilder = { | |
val builder: XContentBuilder = jsonBuilder.startObject | |
columnNames.foreach(columnName => builder.field(columnName.toLowerCase, rs.getString(columnName))) | |
builder.endObject | |
} | |
} | |
trait QueryExecutor { | |
def getRowsToIndex(conn: Connection, query: String): ResultSet = { | |
val stmt: Statement = conn.createStatement | |
stmt.setFetchSize(15000) | |
stmt.executeQuery(query) | |
} | |
def getColumnNames(rs: ResultSet): List[String] = { | |
var columnNames: List[String] = List[String]() | |
for (i <- 1 to rs.getMetaData.getColumnCount) { | |
columnNames = columnNames :+ rs.getMetaData.getColumnName(i) | |
} | |
columnNames | |
} | |
} | |
trait ConnectionHandler { | |
def getConnection: Connection = { | |
try { | |
val dbUrl: String = nodeStringValue(getSettings.settings().get("dbUrl"), null) | |
if (dbUrl == null) { | |
throw new RuntimeException("please provide dbUrl in river request.") | |
} | |
Class.forName(nodeStringValue(getSettings.settings().get("dbDriver"), "oracle.jdbc.driver.OracleDriver")) | |
val connection: Connection = DriverManager.getConnection(dbUrl) | |
connection.setAutoCommit(false) | |
connection | |
} | |
catch { | |
case ignored: SQLException => null | |
} | |
} | |
def getSettings: RiverSettings | |
} | |
trait TransportClientsProvider { | |
def getTransportClient: TransportClient = { | |
val transportAddress: TransportAddress = NodeBuilder.nodeBuilder().node() | |
.asInstanceOf[InternalNode].injector().getInstance(classOf[TransportService]).boundAddress.publishAddress | |
new TransportClient(ImmutableSettings.settingsBuilder().build()) | |
.addTransportAddress(transportAddress) | |
} | |
} | |
class ElasticSearchClient(index: String, docType: String, client: Client) { | |
def ensureIndexExists() { | |
if (!isIndexExists) { | |
createIndex() | |
} | |
} | |
def isIndexExists: Boolean = { | |
client.admin.indices.exists(new IndicesExistsRequest(index)).actionGet.exists | |
} | |
def ensureMappingExists(buildMapping: () => XContentBuilder) { | |
if (!isMappingExists) { | |
createMapping(buildMapping()) | |
} | |
} | |
def isMappingExists: Boolean = { | |
val cs: ClusterState = client.admin.cluster.prepareState.setFilterIndices(index).execute.actionGet.getState | |
val imd: IndexMetaData = cs.getMetaData.index(index) | |
imd != null && imd.mapping(docType) != null && imd.mapping(docType).`type` == docType | |
} | |
def createMapping(mappingBuilder: XContentBuilder) { | |
client.admin.indices.preparePutMapping(index).setType(docType).setSource(mappingBuilder).execute.actionGet | |
} | |
def createIndex() { | |
client.admin.indices.prepareCreate(index).execute.actionGet | |
} | |
def findLastModifiedDate(modifiedDateColumnName: String): String = { | |
val hits: Array[SearchHit] = client.prepareSearch(index) | |
.addField(modifiedDateColumnName).setSize(1).addSort(modifiedDateColumnName, SortOrder.DESC) | |
.setQuery(QueryBuilders.matchAllQuery()).execute.actionGet.hits.hits | |
if (hits.length == 0) null else hits(0).field(modifiedDateColumnName).value() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment