Skip to content

Instantly share code, notes, and snippets.

@darkseed
Forked from beiske/BestMatchSearcher.scala
Created April 7, 2016 14:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save darkseed/bfe702225a2879aa31d0046e3e6ea555 to your computer and use it in GitHub Desktop.
Save darkseed/bfe702225a2879aa31d0046e3e6ea555 to your computer and use it in GitHub Desktop.
Code for getting started with Elasticsearch and Lire
import java.nio.file.Files
import java.nio.file.Paths
import scala.Array.canBuildFrom
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.concurrent.ExecutionContext.Implicits.global
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import com.sksamuel.elastic4s.ElasticClient
import dispatch.Future
import dispatch.Future
import dispatch.enrichFuture
/**
* Args: </path/to/jpg/images> <Number of images to index>
*/
object BestMatchSearcher extends App {
val bulkSize = 50
val path = Paths.get(args(0))
val max = Integer.parseInt(args(1))
def getData(): Stream[ImageDocument] = {
for {
childPath <- Files.newDirectoryStream(path).asScala.toStream
if childPath.toFile().getName().endsWith(".jpg")
} yield new ImageDocument(childPath)
}
val settings = ImmutableSettings.settingsBuilder()
.put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule")
.put("transport.found.api-key", "api-key")
.put("cluster.name", "cluster_id")
.put("client.transport.ignore_cluster_name", false)
.build();
val client = {
val address = new InetSocketTransportAddress("cluster_id-eu-west-1.foundcluster.com", 9343);
val c = ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address))
println("Waiting for clusterstatus yellow.")
val clusterStatus = c.client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet().getStatus().name()
println(s"Got clusterstatus $clusterStatus")
c
}
def search(image: ImageDocument) = {
val response = Future {client.client.prepareSearch("images3").setQuery(image.toQuery).addField("name").get()}
// response.onSuccess{
// case result => {
// println(s"Images similar to: ${image.name}")
// for (hit <- result.getHits().getHits()) {
// println(s"Found: [${hit.getFields().get("name").getValue()}] with score: [${hit.getScore()}]")
// }
// }
// }
val topAverageScore = response.map(_.getHits().getHits().take(3).map(_.getScore()).toSeq.sum / 3)
topAverageScore.map(image.name -> _)
}
def observeWinner(a: Future[Tuple2[String, Float]], b: Future[Tuple2[String, Float]]) = {
if (a()._2 < b()._2) {
println(s"New leader: ${b()._1}, score: ${b()._2}")
b
} else {
a
}
}
val winner = {
val stream = getData().map(search(_))
new Thread{
stream.size
}.start()
stream.reduce(observeWinner)
}
winner.onComplete{
case _ => client.close
}
winner.onSuccess{
case (image, score) => println(s"Winner is: $image with score $score")
}
}
name := "image-indexer"
version := "1.0-SNAPSHOT"
scalaVersion := "2.10.2"
libraryDependencies += "no.found.elasticsearch" % "elasticsearch-transport-module" % "0.8.7-1.0.0"
libraryDependencies += "com.sksamuel.elastic4s" % "elastic4s_2.10" % "1.0.3.0"
EclipseKeys.withSource := true
fork in run := true
import java.nio.file.Files
import java.nio.file.Path
import sun.misc.BASE64Encoder
case class ImageDocument(name: String, data: Array[Byte]) {
def this(path: Path) = this(path.toFile().getName(), Files.readAllBytes(path))
def toJson = {
s"""
{
"name" : "$name",
"image" :"${new BASE64Encoder().encode(data).replaceAll("[\n\r]", "")}"
}"""
}
def toQuery: String = {
s"""
{
"image": {
"image": {
"feature": "CEDD",
"image": "${new BASE64Encoder().encode(data).replaceAll("[\n\r]", "")}",
"hash": "BIT_SAMPLING",
"limit": 10
}
}
}"""
}
}
import java.nio.file.Files
import java.nio.file.Paths
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.concurrent.ExecutionContext.Implicits.global
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import Reindexer.StringSource
import dispatch.Future
import dispatch.enrichFuture
import scala.concurrent.ExecutionContext
/**
* Args: </path/to/jpg/images> <Number of images to index>
*/
object Indexer extends App {
val bulkSize = 3
val path = Paths.get(args(0))
val max = Integer.parseInt(args(1))
def getData(): Stream[ImageDocument] = {
for {
childPath <- Files.newDirectoryStream(path).asScala.toStream
if childPath.toFile().getName().endsWith(".jpg")
} yield new ImageDocument(childPath)
}
def push(images: Stream[ImageDocument]) = {
val settings = ImmutableSettings.settingsBuilder()
.put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule")
.put("transport.found.api-key", "api-key")
.put("cluster.name", "cluster_id")
.put("client.transport.ignore_cluster_name", false)
// .put("client.transport.nodes_sampler_interval", "30s")
// .put("client.transport.ping_timeout", "30s")
.build();
val client = {
val address = new InetSocketTransportAddress("cluster_id-eu-west-1.foundcluster.com", 9343);
ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address))
}
println("Waiting for clusterstatus yellow.")
val clusterStatus = client.client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet().getStatus().name()
println(s"Got clusterstatus $clusterStatus")
import ExecutionContext.Implicits.global
val bulkLoads = images.grouped(bulkSize).map {
batch =>
client.bulk(
batch.map(image => index into "images5/image" source new StringSource(image.toJson)): _*)
}.toStream
Future.sequence(bulkLoads).onComplete(result => {
println("Closing client")
client.close()
})
for (bulk <- bulkLoads) {
bulk.onSuccess {
case bulkResult => {
if (bulkResult.hasFailures()) {
for (result <- bulkResult.getItems()) {
if (result.isFailed()) {
println(s"Failed response: [${result.getFailure()}][${result.getFailureMessage()}]")
}
}
} else {
println("Bulkresult successful!")
}
}
}
}
val failureCount = (Future sequence bulkLoads.map(_.map(_.asScala.count(_.isFailed())))).map(_.sum)
failureCount.onSuccess {
case count: Int => {
println(s"Found ${images.size} images, $count failed to index")
}
}
failureCount()
}
push(getData().take(max))
}
import java.nio.file.Files
import java.nio.file.Paths
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.concurrent.ExecutionContext.Implicits.global
import org.elasticsearch.client.transport.TransportClient
import org.elasticsearch.common.settings.ImmutableSettings
import org.elasticsearch.common.transport.InetSocketTransportAddress
import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import Reindexer.StringSource
import dispatch.Future
import dispatch.enrichFuture
import scala.concurrent.ExecutionContext
import scala.util.Random
/**
* Args: </path/to/jpg/images> <Number of images to consider>
*/
object Searcher extends App {
val bulkSize = 50
val path = Paths.get(args(0))
val max = Integer.parseInt(args(1))
def getData(): Stream[ImageDocument] = {
for {
childPath <- Files.newDirectoryStream(path).asScala.toStream
if childPath.toFile().getName().endsWith(".jpg")
} yield new ImageDocument(childPath)
}
def search(image: ImageDocument) = {
val settings = ImmutableSettings.settingsBuilder()
.put("transport.type", "no.found.elasticsearch.transport.netty.FoundNettyTransportModule")
.put("transport.found.api-key", "api-key")
.put("cluster.name", "cluster_id")
.put("client.transport.ignore_cluster_name", false)
.build();
val client = {
val address = new InetSocketTransportAddress("cluster_id-eu-west-1.foundcluster.com", 9343);
ElasticClient.fromClient(new TransportClient(settings).addTransportAddress(address))
}
println("Waiting for clusterstatus yellow.")
val clusterStatus = client.client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet().getStatus().name()
println(s"Got clusterstatus $clusterStatus")
println(s"Searching for images similar to: ${image.name}")
val response = client.client.prepareSearch("images3").setQuery(image.toQuery).addField("name").get()
for (hit <- response.getHits().getHits()) {
println(s"Found: [${hit.getFields().get("name").getValue()}] with score: [${hit.getScore()}]")
}
client.close
}
val index = Random.nextInt(max)
val image = getData()(index)
search(image)
System.exit(0)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment