-
-
Save andrcmdr/ba51571630cf4e0db938ecf1ba98da8b to your computer and use it in GitHub Desktop.
Async XML Parser using Aalto and Vertx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/********************************************** | |
* LICENSE | |
* -------- | |
* https://github.com/georocket/georocket#license | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
**********************************************/ | |
package xml; | |
import javax.xml.stream.*; | |
import com.fasterxml.aalto.*; | |
import com.fasterxml.aalto.stax.*; | |
import io.vertx.core.buffer.*; | |
import rx.*; | |
/** | |
* An asynchronous XML parser that can be fed with data and emits | |
* {@link XMLStreamEvent}s | |
* | |
* @author Michel Kraemer | |
*/ | |
public class AsyncXMLParser { | |
private AsyncXMLInputFactory xmlInputFactory = new InputFactoryImpl(); | |
private AsyncXMLStreamReader<AsyncByteArrayFeeder> streamReader = xmlInputFactory.createAsyncForByteArray(); | |
/** | |
* Feed the parser with data and create an observable that emits | |
* {@link XMLStreamEvent}s until all data has been consumed | |
* | |
* @param data | |
* the data | |
* @return the observable | |
*/ | |
public rx.Observable<XMLStreamEvent> feed(Buffer data) { | |
try { | |
byte[] bytes = data.getBytes(); | |
streamReader.getInputFeeder().feedInput(bytes, 0, bytes.length); | |
} catch (XMLStreamException e) { | |
return rx.Observable.error(e); | |
} | |
// consume all data | |
return rx.Observable.create(subscriber -> { | |
while (true) { | |
// read next token | |
int event; | |
try { | |
event = streamReader.next(); | |
} catch (XMLStreamException e) { | |
subscriber.onError(e); | |
break; | |
} | |
if (event == AsyncXMLStreamReader.EVENT_INCOMPLETE) { | |
// wait for more input | |
subscriber.onCompleted(); | |
break; | |
} | |
// create stream event | |
int pos = streamReader.getLocation().getCharacterOffset(); | |
XMLStreamEvent e = new XMLStreamEvent(event, pos, streamReader); | |
subscriber.onNext(e); | |
} | |
}); | |
} | |
/** | |
* Close the parser and release all resources | |
* | |
* @return an observable that emits when the parser has been closed | |
*/ | |
public rx.Observable<Void> close() { | |
try { | |
streamReader.close(); | |
return Observable.just(null); | |
} catch (XMLStreamException e) { | |
return Observable.error(e); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package bots; | |
import java.io.*; | |
import javax.xml.parsers.*; | |
import javax.xml.stream.events.*; | |
import javax.xml.transform.*; | |
import javax.xml.transform.dom.*; | |
import javax.xml.transform.stream.*; | |
import org.w3c.dom.*; | |
import io.vertx.core.*; | |
import io.vertx.core.net.*; | |
import io.vertx.rxjava.core.AbstractVerticle; | |
import io.vertx.rxjava.core.buffer.*; | |
import io.vertx.rxjava.core.net.NetClient; | |
import io.vertx.rxjava.core.net.NetSocket; | |
import io.vertx.rxjava.core.streams.*; | |
import xml.*; | |
public class BotsManager extends AbstractVerticle { | |
NetSocket socket = null; | |
public BotsManager() { | |
super.vertx = io.vertx.rxjava.core.Vertx.vertx(new VertxOptions() | |
.setBlockedThreadCheckInterval(5*60*1000) | |
); | |
} | |
public static void main(String[] args) { | |
BotsManager bots = new BotsManager(); | |
bots.connect("bgblitz.no-ip.org", 12344); | |
try { | |
Thread.currentThread().join(); | |
} catch (InterruptedException e) { | |
// TODO Auto-generated catch block | |
e.printStackTrace(); | |
} | |
} | |
public void connect(String host, int port) { | |
NetClientOptions options = new NetClientOptions() | |
.setConnectTimeout(10000) | |
.setReconnectAttempts(10) | |
.setReconnectInterval(500); | |
NetClient client = vertx.createNetClient(options); | |
client.connect(port, host, socketObs -> { | |
if(socketObs.succeeded()) { | |
System.out.println("Connected!"); | |
this.socket = socketObs.result(); | |
processMessages(socket); | |
// socket.handler(event -> { | |
// System.out.println("Handler triggered; Event=" + event); | |
// }); | |
socket.closeHandler(event -> { | |
System.out.println("CloseHandler triggered; Event=" + event); | |
}); | |
socket.exceptionHandler(e -> { | |
System.out.println("ExceptionHandler triggered; Event=" + e); | |
}); | |
sendSetupRequest(); | |
} | |
}); | |
} | |
private void processMessages(ReadStream<Buffer> f) { | |
AsyncXMLParser parser = new AsyncXMLParser(); | |
f.toObservable() | |
.map(buf -> (io.vertx.core.buffer.Buffer)buf.getDelegate()) | |
.flatMap(parser::feed) | |
.doOnNext(e -> { | |
System.out.println(e); | |
if(e.getType() == XMLEvent.END_ELEMENT && e.getName().equals("TutorResult")) { | |
System.out.println("COMPLETE XML"); | |
} | |
}) | |
.last() // "wait" for last event (i.e. end of socket) | |
.doAfterTerminate(parser::close) | |
.subscribe(next -> { | |
}, e -> System.out.println(e) | |
, () -> { | |
System.out.println("FINISHED"); | |
}); | |
} | |
private void sendSetupRequest() { | |
try { | |
DocumentBuilderFactory fact = DocumentBuilderFactory.newInstance(); | |
DocumentBuilder build = fact.newDocumentBuilder(); | |
Document doc = build.newDocument(); | |
Element setupReq = doc.createElement("SetupRequest"); | |
doc.appendChild(setupReq); | |
setupReq.setAttribute("ply", "2"); | |
setupReq.setAttribute("threads", "2"); | |
setupReq.setAttribute("trace", "setup_log.txt"); | |
TransformerFactory tFact = TransformerFactory.newInstance(); | |
Transformer trans = tFact.newTransformer(); | |
trans.setOutputProperty(OutputKeys.INDENT, "yes"); | |
StringWriter writer = new StringWriter(); | |
StreamResult result = new StreamResult(writer); | |
DOMSource source = new DOMSource(doc); | |
trans.transform(source, result); | |
socket.write(writer.toString()); | |
} catch (TransformerException | ParserConfigurationException e) { | |
System.out.println("Error outputting document"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment