Skip to content

Instantly share code, notes, and snippets.

View htimur's full-sized avatar

Timur Khamrakulov htimur

  • Germany
View GitHub Profile
{
"_id" : "shard01",
"host" : "shard01/localhost:27018,localhost:27019,localhost:27020"
}
def parseShardInformation(item: Document): Shard = {
val document = item.toBsonDocument
val shardId = document.getString("_id").getValue
val serversDefinition = document.getString("host").getValue
val servers = if (serversDefinition.contains("/")) serversDefinition.substring(serversDefinition.indexOf('/') + 1) else serversDefinition
Shard(shardId, "mongodb://" + servers)
}
val shards = client.getDatabase("config")
.getCollection("shards")
.find()
.map(parseShardInformation)
def source(client: MongoClient): Source[Document, NotUsed] = {
val observable = client.getDatabase("local")
.getCollection("oplog.rs")
.find(and(
in("op", "i", "d", "u"),
exists("fromMigrate", false)))
.cursorType(CursorType.TailableAwait)
.noCursorTimeout(true)
Source.fromPublisher(observable)
val allShards: Source[Document, NotUsed] =
sources.foldLeft(Source.empty[Document]) {
(prev, current) => Source.combine(prev, current)(Merge(_))
}
allShards.runForeach(println)
val sourceOne = Source(List(1))
val sourceTwo = Source(List(2))
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))
client.getDatabase("local")
.getCollection("oplog.rs")
.find(and(
in(MongoConstants.OPLOG_OPERATION, "i", "d", "u"),
exists("fromMigrate", false)))
.cursorType(CursorType.TailableAwait)
.noCursorTimeout(true)
def doSomethingImportant() = {
val timer = registry.timer(name(classOf[WebProxy], "get-requests"))
val context = timer.time()
try // critical business logic
finally context.stop()
}
@Timed
def doSomethingImportant = {
// critical business logic
}
@Timed
class SuperCriticalFunctionality {
def doSomethingImportant = {
// critical business logic
}
}