Created
October 15, 2022 03:44
-
-
Save megafarad/dd08c43c5bf84e6ab25c10a6abbd820a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.megafarad.jmdictapi.importer | |
import akka.Done | |
import akka.actor.typed._ | |
import akka.actor.typed.scaladsl._ | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} | |
import akka.stream.{IOResult, Materializer} | |
import akka.stream.scaladsl.{FileIO, Sink, Source} | |
import com.megafarad.jmdictapi.model.Entry | |
import com.megafarad.jmdictapi.model.EntrySerialization._ | |
import com.megafarad.jmdictapi.mongo.MongoDb | |
import com.mongodb.client.model.ReplaceOptions | |
import org.json.XMLParserConfiguration | |
import org.mongodb.scala.model.Filters | |
import io.circe.parser.decode | |
import java.io.{File, FileInputStream} | |
import java.util.Properties | |
import java.util.zip.GZIPInputStream | |
import java.util | |
import javax.xml.stream.XMLInputFactory | |
import javax.xml.stream.events.XMLEvent | |
import scala.collection.mutable | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.{Failure, Success, Try} | |
object Importer { | |
sealed trait Import | |
final case class DownloadFile(source: String) extends Import | |
final case class ParseFile(tempFile: File) extends Import | |
final case class ParseEvent(xmlEvent: XMLEvent) extends Import | |
final case class ImportEntry(entry: Entry) extends Import | |
final case class SuccessfulImport(entry: Entry) extends Import | |
def apply(mongoDb: MongoDb): Behavior[Import] = importer(mongoDb, new mutable.StringBuilder) | |
private def importer(mongoDb: MongoDb, elementBuilder: mutable.StringBuilder): Behavior[Import] = Behaviors.setup[Import] { context => | |
Behaviors.receiveMessage { | |
case DownloadFile(source) => | |
implicit val system: ActorSystem[Nothing] = context.system | |
implicit val ec: ExecutionContext = system.executionContext | |
context.log.info("Begin import") | |
val file = File.createTempFile("jmdict", ".gz") | |
file.deleteOnExit() | |
context.log.info("downloading to... {}", file.toString) | |
context.pipeToSelf(downloadViaFlow(Uri(source), file)) { | |
case Failure(_) => DownloadFile(source) | |
case Success(_) => ParseFile(tempFile = file) | |
} | |
Behaviors.same | |
case ParseFile(tempFile) => | |
context.log.info("parsing file... {}", tempFile) | |
val fis: FileInputStream = new FileInputStream(tempFile.toString) | |
val gis: GZIPInputStream = new GZIPInputStream(fis) | |
val props: Properties = System.getProperties | |
props.setProperty("jdk.xml.entityExpansionLimit", "0") | |
val xmlInputFactory = XMLInputFactory.newInstance() | |
val reader = xmlInputFactory.createXMLEventReader(gis) | |
while (reader.hasNext) { | |
context.self ! ParseEvent(xmlEvent = reader.nextEvent()) | |
} | |
tempFile.delete() | |
Behaviors.same | |
case ParseEvent(xmlEvent) => | |
if (xmlEvent.isStartElement) { | |
val startElement = xmlEvent.asStartElement() | |
startElement.getName.getLocalPart match { | |
case "JMdict" => context.log.info("Start Import") | |
case _ => elementBuilder.append(startElement.toString) | |
} | |
} else if (xmlEvent.isCharacters) { | |
val characters = xmlEvent.asCharacters() | |
elementBuilder.append(characters.toString | |
.replace("&", "&") | |
.replace("<", "<") | |
.replace(">", ">") | |
.replace("\"", """) | |
.replace("\'", "'")) | |
} else if (xmlEvent.isEndElement) { | |
val endElement = xmlEvent.asEndElement() | |
endElement.getName.getLocalPart match { | |
case "JMdict" => context.log.info("End Parsing Input") | |
case "entry" => | |
elementBuilder.append(endElement.toString) | |
val forceList = new util.HashSet[String]() | |
forceList.add("k_ele") | |
forceList.add("r_ele") | |
forceList.add("sense") | |
forceList.add("ke_inf") | |
forceList.add("ke_pri") | |
forceList.add("re_restr") | |
forceList.add("re_inf") | |
forceList.add("re_pri") | |
forceList.add("links") | |
forceList.add("bibl") | |
forceList.add("etym") | |
forceList.add("audit") | |
forceList.add("stagk") | |
forceList.add("stagr") | |
forceList.add("pos") | |
forceList.add("xref") | |
forceList.add("ant") | |
forceList.add("field") | |
forceList.add("misc") | |
forceList.add("s_inf") | |
forceList.add("lsource") | |
forceList.add("dial") | |
forceList.add("gloss") | |
forceList.add("example") | |
//TODO: don't like this. Would rather parse out XML and rework that way... | |
val cleanXML = elementBuilder.toString().replace("xml:lang=", "lang=") | |
.replace("<re_nokanji></re_nokanji>", "<re_nokanji>true</re_nokanji>") | |
val json = org.json.XML.toJSONObject(cleanXML, new XMLParserConfiguration().withForceList(forceList).withKeepStrings(true)) | |
val entryJson = json.getJSONObject("entry") | |
elementBuilder.clear() | |
val entry = decode[Entry](entryJson.toString) | |
entry match { | |
case Left(_) => context.log.error(entryJson.toString(4)) | |
case Right(value) => context.self ! ImportEntry(value) | |
} | |
case _ => elementBuilder.append(endElement) | |
} | |
} | |
importer(mongoDb, elementBuilder) | |
case ImportEntry(entry) => | |
val entSeq: String = entry.ent_seq | |
context.pipeToSelf(mongoDb.collection.replaceOne(Filters.eq("ent_seq", entSeq), entry, (new ReplaceOptions).upsert(true)).toFuture()) { | |
case Failure(_) => ImportEntry(entry) | |
case Success(_) => SuccessfulImport(entry) | |
} | |
Behaviors.same | |
case SuccessfulImport(entry) => | |
context.log.debug("Imported entry: {}", entry) | |
Behaviors.same | |
} | |
} | |
private def writeFile(file: File)(httpResponse: HttpResponse)(implicit materializer: Materializer): Future[IOResult] = { | |
httpResponse.entity.dataBytes.runWith(FileIO.toPath(file.toPath)) | |
} | |
private def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match { | |
case (responseTry, context) => (responseTry.get, context) | |
} | |
private def downloadViaFlow(uri: Uri, downloadFile: File)(implicit system: ActorSystem[Nothing]): Future[Done] = { | |
val request = HttpRequest(uri = uri) | |
val source = Source.single((request, ())) | |
val requestResponseFlow = Http().superPool[Unit]() | |
val parallelism = 10 | |
source.via(requestResponseFlow) | |
.map(responseOrFail) | |
.map(_._1) | |
.mapAsyncUnordered(parallelism)(writeFile(downloadFile)) | |
.runWith(Sink.ignore) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment