Skip to content

Instantly share code, notes, and snippets.

@andrcmdr
Forked from shvalb/AsyncXMLParser.java
Created December 13, 2018 15:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andrcmdr/ba51571630cf4e0db938ecf1ba98da8b to your computer and use it in GitHub Desktop.
Save andrcmdr/ba51571630cf4e0db938ecf1ba98da8b to your computer and use it in GitHub Desktop.
Async XML Parser using Aalto and Vertx
/**********************************************
* LICENSE
* --------
* https://github.com/georocket/georocket#license
* http://www.apache.org/licenses/LICENSE-2.0
**********************************************/
package xml;
import javax.xml.stream.*;
import com.fasterxml.aalto.*;
import com.fasterxml.aalto.stax.*;
import io.vertx.core.buffer.*;
import rx.*;
/**
* An asynchronous XML parser that can be fed with data and emits
* {@link XMLStreamEvent}s
*
* @author Michel Kraemer
*/
public class AsyncXMLParser {
private AsyncXMLInputFactory xmlInputFactory = new InputFactoryImpl();
private AsyncXMLStreamReader<AsyncByteArrayFeeder> streamReader = xmlInputFactory.createAsyncForByteArray();
/**
* Feed the parser with data and create an observable that emits
* {@link XMLStreamEvent}s until all data has been consumed
*
* @param data
* the data
* @return the observable
*/
public rx.Observable<XMLStreamEvent> feed(Buffer data) {
try {
byte[] bytes = data.getBytes();
streamReader.getInputFeeder().feedInput(bytes, 0, bytes.length);
} catch (XMLStreamException e) {
return rx.Observable.error(e);
}
// consume all data
return rx.Observable.create(subscriber -> {
while (true) {
// read next token
int event;
try {
event = streamReader.next();
} catch (XMLStreamException e) {
subscriber.onError(e);
break;
}
if (event == AsyncXMLStreamReader.EVENT_INCOMPLETE) {
// wait for more input
subscriber.onCompleted();
break;
}
// create stream event
int pos = streamReader.getLocation().getCharacterOffset();
XMLStreamEvent e = new XMLStreamEvent(event, pos, streamReader);
subscriber.onNext(e);
}
});
}
/**
* Close the parser and release all resources
*
* @return an observable that emits when the parser has been closed
*/
public rx.Observable<Void> close() {
try {
streamReader.close();
return Observable.just(null);
} catch (XMLStreamException e) {
return Observable.error(e);
}
}
}
package bots;
import java.io.*;
import javax.xml.parsers.*;
import javax.xml.stream.events.*;
import javax.xml.transform.*;
import javax.xml.transform.dom.*;
import javax.xml.transform.stream.*;
import org.w3c.dom.*;
import io.vertx.core.*;
import io.vertx.core.net.*;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.buffer.*;
import io.vertx.rxjava.core.net.NetClient;
import io.vertx.rxjava.core.net.NetSocket;
import io.vertx.rxjava.core.streams.*;
import xml.*;
public class BotsManager extends AbstractVerticle {
NetSocket socket = null;
public BotsManager() {
super.vertx = io.vertx.rxjava.core.Vertx.vertx(new VertxOptions()
.setBlockedThreadCheckInterval(5*60*1000)
);
}
public static void main(String[] args) {
BotsManager bots = new BotsManager();
bots.connect("bgblitz.no-ip.org", 12344);
try {
Thread.currentThread().join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void connect(String host, int port) {
NetClientOptions options = new NetClientOptions()
.setConnectTimeout(10000)
.setReconnectAttempts(10)
.setReconnectInterval(500);
NetClient client = vertx.createNetClient(options);
client.connect(port, host, socketObs -> {
if(socketObs.succeeded()) {
System.out.println("Connected!");
this.socket = socketObs.result();
processMessages(socket);
// socket.handler(event -> {
// System.out.println("Handler triggered; Event=" + event);
// });
socket.closeHandler(event -> {
System.out.println("CloseHandler triggered; Event=" + event);
});
socket.exceptionHandler(e -> {
System.out.println("ExceptionHandler triggered; Event=" + e);
});
sendSetupRequest();
}
});
}
private void processMessages(ReadStream<Buffer> f) {
AsyncXMLParser parser = new AsyncXMLParser();
f.toObservable()
.map(buf -> (io.vertx.core.buffer.Buffer)buf.getDelegate())
.flatMap(parser::feed)
.doOnNext(e -> {
System.out.println(e);
if(e.getType() == XMLEvent.END_ELEMENT && e.getName().equals("TutorResult")) {
System.out.println("COMPLETE XML");
}
})
.last() // "wait" for last event (i.e. end of socket)
.doAfterTerminate(parser::close)
.subscribe(next -> {
}, e -> System.out.println(e)
, () -> {
System.out.println("FINISHED");
});
}
private void sendSetupRequest() {
try {
DocumentBuilderFactory fact = DocumentBuilderFactory.newInstance();
DocumentBuilder build = fact.newDocumentBuilder();
Document doc = build.newDocument();
Element setupReq = doc.createElement("SetupRequest");
doc.appendChild(setupReq);
setupReq.setAttribute("ply", "2");
setupReq.setAttribute("threads", "2");
setupReq.setAttribute("trace", "setup_log.txt");
TransformerFactory tFact = TransformerFactory.newInstance();
Transformer trans = tFact.newTransformer();
trans.setOutputProperty(OutputKeys.INDENT, "yes");
StringWriter writer = new StringWriter();
StreamResult result = new StreamResult(writer);
DOMSource source = new DOMSource(doc);
trans.transform(source, result);
socket.write(writer.toString());
} catch (TransformerException | ParserConfigurationException e) {
System.out.println("Error outputting document");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment