Skip to content

Instantly share code, notes, and snippets.

@johnjelinek
Last active December 20, 2015 02:29
Show Gist options
  • Save johnjelinek/6056904 to your computer and use it in GitHub Desktop.
Save johnjelinek/6056904 to your computer and use it in GitHub Desktop.
Snippets from https://github.com/tradestation/sample-webapi-desktop-java to accompany the blogpost on processing streaming data with the TradeStation WebAPI.
public Thread getBarchartStream(Observer observer, String symbol, int interval, String intervalType, String startDate) {
Request request = new RequestBuilder("GET")
.setUrl(BASEURL + String.format("stream/barchart/%s/%s/%s/%s",
symbol, interval, intervalType, startDate))
.addHeader("Content-Type", "application/x-www-form-urlencoded")
.addHeader("Authorization", "Bearer " + this.token.getAccess_token())
.build();
// create an event source
final StreamSource streamSource = new StreamSource<IntradayBar>(request, IntradayBar.class);
// subscribe the observer to the event source
streamSource.addObserver(observer);
// return the event thread
return new Thread(streamSource);
}
@Override
public void run() {
try {
client.executeRequest(request, new AsyncHandler<Response>() {
private String remainingPartialJsonData = "";
private <T> T processJson(String partialJsonData, Class<T> type) throws IOException {
String json = "";
for (char c : partialJsonData.toCharArray()) {
json += c;
if (json.contains("{") && isValidJSON(json)) {
remainingPartialJsonData = remainingPartialJsonData.replace(json, "");
json = json.replace("\\/Date(", "").replace(")\\/", "");
return TradeStationWebApi.mapper.readValue(json, type);
}
}
return null;
}
@Override
public void onThrowable(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public STATE onBodyPartReceived(HttpResponseBodyPart httpResponseBodyPart) throws Exception {
ByteArrayOutputStream responseBytes = new ByteArrayOutputStream();
responseBytes.write(httpResponseBodyPart.getBodyPartBytes());
remainingPartialJsonData += responseBytes.toString("UTF-8");
String partialJsonData = remainingPartialJsonData;
if (partialJsonData.contains("ERROR")) {
return STATE.ABORT;
}
Object result = processJson(partialJsonData, entity);
if (result != null) {
setChanged();
notifyObservers(result);
}
return STATE.CONTINUE;
}
@Override
public STATE onStatusReceived(HttpResponseStatus httpResponseStatus) throws Exception {
return STATE.CONTINUE;
}
@Override
public STATE onHeadersReceived(HttpResponseHeaders httpResponseHeaders) throws Exception {
return STATE.CONTINUE;
}
@Override
public Response onCompleted() throws Exception {
return null;
}
}).get();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
static final AsyncHttpClient client =
new AsyncHttpClient(new AsyncHttpClientConfig.Builder().setRequestTimeoutInMs(-1).build());
@Override
public void update(Observable obj, Object arg) {
if (arg instanceof IntradayBar) {
IntradayBar response = (IntradayBar) arg;
System.out.println("Symbol: " + symbol + "\tDownTicks: " + response.getDownTicks()
+ "\tTimeStamp: " + response.getTimeStamp());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment