Skip to content

Instantly share code, notes, and snippets.

@megafarad
Created October 15, 2022 03:44
Show Gist options
  • Save megafarad/dd08c43c5bf84e6ab25c10a6abbd820a to your computer and use it in GitHub Desktop.
Save megafarad/dd08c43c5bf84e6ab25c10a6abbd820a to your computer and use it in GitHub Desktop.
package com.megafarad.jmdictapi.importer
import akka.Done
import akka.actor.typed._
import akka.actor.typed.scaladsl._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.stream.{IOResult, Materializer}
import akka.stream.scaladsl.{FileIO, Sink, Source}
import com.megafarad.jmdictapi.model.Entry
import com.megafarad.jmdictapi.model.EntrySerialization._
import com.megafarad.jmdictapi.mongo.MongoDb
import com.mongodb.client.model.ReplaceOptions
import org.json.XMLParserConfiguration
import org.mongodb.scala.model.Filters
import io.circe.parser.decode
import java.io.{File, FileInputStream}
import java.util.Properties
import java.util.zip.GZIPInputStream
import java.util
import javax.xml.stream.XMLInputFactory
import javax.xml.stream.events.XMLEvent
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
object Importer {
sealed trait Import
final case class DownloadFile(source: String) extends Import
final case class ParseFile(tempFile: File) extends Import
final case class ParseEvent(xmlEvent: XMLEvent) extends Import
final case class ImportEntry(entry: Entry) extends Import
final case class SuccessfulImport(entry: Entry) extends Import
def apply(mongoDb: MongoDb): Behavior[Import] = importer(mongoDb, new mutable.StringBuilder)
private def importer(mongoDb: MongoDb, elementBuilder: mutable.StringBuilder): Behavior[Import] = Behaviors.setup[Import] { context =>
Behaviors.receiveMessage {
case DownloadFile(source) =>
implicit val system: ActorSystem[Nothing] = context.system
implicit val ec: ExecutionContext = system.executionContext
context.log.info("Begin import")
val file = File.createTempFile("jmdict", ".gz")
file.deleteOnExit()
context.log.info("downloading to... {}", file.toString)
context.pipeToSelf(downloadViaFlow(Uri(source), file)) {
case Failure(_) => DownloadFile(source)
case Success(_) => ParseFile(tempFile = file)
}
Behaviors.same
case ParseFile(tempFile) =>
context.log.info("parsing file... {}", tempFile)
val fis: FileInputStream = new FileInputStream(tempFile.toString)
val gis: GZIPInputStream = new GZIPInputStream(fis)
val props: Properties = System.getProperties
props.setProperty("jdk.xml.entityExpansionLimit", "0")
val xmlInputFactory = XMLInputFactory.newInstance()
val reader = xmlInputFactory.createXMLEventReader(gis)
while (reader.hasNext) {
context.self ! ParseEvent(xmlEvent = reader.nextEvent())
}
tempFile.delete()
Behaviors.same
case ParseEvent(xmlEvent) =>
if (xmlEvent.isStartElement) {
val startElement = xmlEvent.asStartElement()
startElement.getName.getLocalPart match {
case "JMdict" => context.log.info("Start Import")
case _ => elementBuilder.append(startElement.toString)
}
} else if (xmlEvent.isCharacters) {
val characters = xmlEvent.asCharacters()
elementBuilder.append(characters.toString
.replace("&", "&")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace("\"", "&quot;")
.replace("\'", "&apos;"))
} else if (xmlEvent.isEndElement) {
val endElement = xmlEvent.asEndElement()
endElement.getName.getLocalPart match {
case "JMdict" => context.log.info("End Parsing Input")
case "entry" =>
elementBuilder.append(endElement.toString)
val forceList = new util.HashSet[String]()
forceList.add("k_ele")
forceList.add("r_ele")
forceList.add("sense")
forceList.add("ke_inf")
forceList.add("ke_pri")
forceList.add("re_restr")
forceList.add("re_inf")
forceList.add("re_pri")
forceList.add("links")
forceList.add("bibl")
forceList.add("etym")
forceList.add("audit")
forceList.add("stagk")
forceList.add("stagr")
forceList.add("pos")
forceList.add("xref")
forceList.add("ant")
forceList.add("field")
forceList.add("misc")
forceList.add("s_inf")
forceList.add("lsource")
forceList.add("dial")
forceList.add("gloss")
forceList.add("example")
//TODO: don't like this. Would rather parse out XML and rework that way...
val cleanXML = elementBuilder.toString().replace("xml:lang=", "lang=")
.replace("<re_nokanji></re_nokanji>", "<re_nokanji>true</re_nokanji>")
val json = org.json.XML.toJSONObject(cleanXML, new XMLParserConfiguration().withForceList(forceList).withKeepStrings(true))
val entryJson = json.getJSONObject("entry")
elementBuilder.clear()
val entry = decode[Entry](entryJson.toString)
entry match {
case Left(_) => context.log.error(entryJson.toString(4))
case Right(value) => context.self ! ImportEntry(value)
}
case _ => elementBuilder.append(endElement)
}
}
importer(mongoDb, elementBuilder)
case ImportEntry(entry) =>
val entSeq: String = entry.ent_seq
context.pipeToSelf(mongoDb.collection.replaceOne(Filters.eq("ent_seq", entSeq), entry, (new ReplaceOptions).upsert(true)).toFuture()) {
case Failure(_) => ImportEntry(entry)
case Success(_) => SuccessfulImport(entry)
}
Behaviors.same
case SuccessfulImport(entry) =>
context.log.debug("Imported entry: {}", entry)
Behaviors.same
}
}
private def writeFile(file: File)(httpResponse: HttpResponse)(implicit materializer: Materializer): Future[IOResult] = {
httpResponse.entity.dataBytes.runWith(FileIO.toPath(file.toPath))
}
private def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
case (responseTry, context) => (responseTry.get, context)
}
private def downloadViaFlow(uri: Uri, downloadFile: File)(implicit system: ActorSystem[Nothing]): Future[Done] = {
val request = HttpRequest(uri = uri)
val source = Source.single((request, ()))
val requestResponseFlow = Http().superPool[Unit]()
val parallelism = 10
source.via(requestResponseFlow)
.map(responseOrFail)
.map(_._1)
.mapAsyncUnordered(parallelism)(writeFile(downloadFile))
.runWith(Sink.ignore)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment