Skip to content

Instantly share code, notes, and snippets.

@kevinherron
Created September 12, 2010 19:59
Show Gist options
  • Save kevinherron/576380 to your computer and use it in GitHub Desktop.
Save kevinherron/576380 to your computer and use it in GitHub Desktop.
package com.kevinherron.campfire.streaming;
import java.util.concurrent.Future;
import org.json.JSONTokener;
import com.kevinherron.campfire.api.CampfireException;
import com.kevinherron.campfire.api.LoginCredentials;
import com.ning.http.client.AsyncHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.HttpResponseBodyPart;
import com.ning.http.client.HttpResponseHeaders;
import com.ning.http.client.HttpResponseStatus;
import com.ning.http.client.Realm;
import com.ning.http.client.Request;
/**
* GET https://streaming.campfirenow.com/room/#{id}/live.json
*
* @author Kevin Herron <kevinherron@gmail.com>
*/
public class RoomStreamer {
private volatile boolean keepStreaming = false;
private Future<?> requestFuture;
private AsyncHttpClient httpClient;
private Realm realm;
private final int roomId;
public RoomStreamer(LoginCredentials credentials, int roomId) throws CampfireException {
this.roomId = roomId;
AsyncHttpClientConfig config = new AsyncHttpClientConfig.Builder()
.setRequestTimeoutInMs(Integer.MAX_VALUE)
.setIdleConnectionTimeoutInMs(Integer.MAX_VALUE)
.build();
httpClient = new AsyncHttpClient(config);
realm = new Realm.RealmBuilder()
.setPrincipal(credentials.getApiKey())
.setPassword("X")
.build();
start();
}
private void start() throws CampfireException {
keepStreaming = true;
try {
Request request = httpClient
.prepareGet(String.format("https://streaming.campfirenow.com/room/%s/live.json", roomId))
.setRealm(realm)
.build();
Handler handler = new Handler();
requestFuture = httpClient.executeRequest(request, handler);
} catch (Exception e) {
// TODO log error
throw new CampfireException(e);
}
}
public synchronized void stop() {
keepStreaming = false;
if (requestFuture != null) {
requestFuture.cancel(true);
}
if (httpClient != null) {
System.out.println("Closing RoomStreamer client...");
httpClient.close();
httpClient = null;
System.out.println("Done.");
}
}
private class Handler implements AsyncHandler<String> {
@Override
public void onThrowable(Throwable t) {
// TODO log error
System.err.println(t);
if (keepStreaming) {
// TODO reschedule ourselves?
}
}
@Override
public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart)
throws Exception {
if (keepStreaming) {
String json = new String(bodyPart.getBodyPartBytes());
JSONTokener tokener = new JSONTokener(json);
try {
while (tokener.more()) {
Object o = tokener.nextValue();
System.out.println(o.getClass().getSimpleName());
System.out.println(o);
}
} catch (Exception e) {
// Probably the " " string sent to keep the connection alive...
}
return STATE.CONTINUE;
} else {
System.out.println("onBodyPartReceived aborting...");
return STATE.ABORT;
}
}
@Override
public STATE onStatusReceived(HttpResponseStatus responseStatus)
throws Exception {
if (keepStreaming) {
return STATE.CONTINUE;
} else {
System.out.println("onStatusReceived aborting...");
return STATE.ABORT;
}
}
@Override
public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
if (keepStreaming) {
return STATE.CONTINUE;
} else {
System.out.println("onHeadersReceived aborting...");
return STATE.ABORT;
}
}
@Override
public String onCompleted() throws Exception {
// Not really interested in the end result... just the stuff we get as it's completing.
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment