Skip to content

Instantly share code, notes, and snippets.

@smallufo
Last active August 29, 2015 14:15
Show Gist options
  • Save smallufo/efd39c8997db8f43dca9 to your computer and use it in GitHub Desktop.
Save smallufo/efd39c8997db8f43dca9 to your computer and use it in GitHub Desktop.
Listening twitter keyword , implemented by OkHttp + RxJava's Observables
/**
* Created by smallufo on 2015-02-22.
*/
package twitter;
import com.squareup.okhttp.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
import rx.subscriptions.Subscriptions;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Courtesy of @paulcbetts paulcbetts@github
* https://gist.github.com/paulcbetts/2274581f24ded7502011
*/
public class RxOkHttp {
private static Logger logger = LoggerFactory.getLogger(RxOkHttp.class);
public static Observable<Response> request(OkHttpClient client, Request request) {
return Observable.create((Subscriber<? super Response> subj) -> {
final Call call = client.newCall(request);
subj.add(Subscriptions.create(call::cancel));
call.enqueue(new Callback() {
@Override
public void onFailure(Request request, IOException e) {
subj.onError(e);
}
@Override
public void onResponse(Response response) throws IOException {
Throwable error = getFailureExceptionOnBadStatus(response);
if (error != null) {
subj.onError(error);
return;
}
subj.onNext(response);
subj.onCompleted();
}
});
});
}
public static Observable<byte[]> streamBytes(OkHttpClient client, Request request) {
return request(client, request)
.flatMap(response -> Observable.create(
(Subscriber<? super byte[]> subj) ->
{
Callable<Void> callable = () -> {
InputStream stream;
byte[] buffer = new byte[65536];
int bytesRead = 0;
stream = response.body().byteStream();
try {
while (bytesRead > -1 && !subj.isUnsubscribed()) {
bytesRead = stream.read(buffer, 0, 65536);
if (bytesRead < 1)
continue;
subj.onNext(Arrays.copyOfRange(buffer, 0, bytesRead));
}
if (!subj.isUnsubscribed())
subj.onCompleted();
stream.close();
} catch (IOException ex) {
subj.onError(ex);
}
return null;
};
ExecutorService executorService = Executors.newFixedThreadPool(1);
subj.add(Subscriptions.create(() -> executorService.shutdown() ));
executorService.submit(callable);
}
)
);
}
public static Observable<String> streamStrings(OkHttpClient client, Request request) {
return streamBytes(client, request).map(bytes -> {
try {
return new String(bytes, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("UTF8 isn't supported this will never happen");
}
});
}
public static Observable<String> streamLines(OkHttpClient client, Request request) {
return streamStrings(client, request)
.concatWith(Observable.just("\n"))
.flatMap(new Func1<String, Observable<? extends String>>() {
String remainingString = "";
@Override
public Observable<? extends String> call(String s) {
String[] lines = (remainingString + s).split("\n");
if (s.charAt(s.length() - 1) != '\n') {
remainingString = lines[lines.length - 1];
return Observable.from(Arrays.copyOfRange(lines, 0, lines.length - 1));
}
remainingString = "";
return Observable.from(lines);
}
}).filter(x -> x.length() > 0);
}
private static Throwable getFailureExceptionOnBadStatus(Response resp) {
if (resp.code() < 399)
return null;
return new RuntimeException(resp.message());
}
}
/**
* Created by smallufo on 2015-02-22.
*/
package twitter;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.RequestBody;
import net.oauth.OAuth;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.util.Base64;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;
public class Tracker {
private Logger logger = LoggerFactory.getLogger(getClass());
private final String consumerKey = "xxx";
/** consumer secret */
private final String secretKey = "xxx";
private final String accessToken = "xxx";
private final String accessSecret = "xxx";
private final String track;
public Tracker(String track) {this.track = track;}
public Observable<String> streamRequest() throws Exception {
OkHttpClient client = new OkHttpClient();
long timestamp = System.currentTimeMillis() / 1000;
String nonce = RandomStringUtils.randomAlphanumeric(32);
logger.info("timestamp = {} , nonce = {}", timestamp , nonce);
String sigBaseString = getSigBaseString(nonce , timestamp);
logger.info("sigBaseString = {}", sigBaseString);
StringBuilder sb = new StringBuilder();
sb.append(OAuth.percentEncode(secretKey));
sb.append("&");
sb.append(OAuth.percentEncode(accessSecret));
String signingKey = sb.toString();
logger.info("signingKey = {}" , signingKey);
String signature = getSignature(signingKey , sigBaseString);
logger.info("sign = {}" , signature);
String dst = getDst(nonce , signature , timestamp);
logger.info("dst = {}" , dst);
Request request = new Request.Builder()
.url("https://stream.twitter.com/1.1/statuses/filter.json?track="+track)
.header("Authorization" , dst)
.post(RequestBody.create(null, "track="+track))
.build();
return RxOkHttp.streamLines(client, request);
}
private String getSigBaseString(String oauthNonce , long timestamp) {
Map<String , String> parameters = new HashMap<String , String>() {{
put("oauth_consumer_key" , consumerKey);
put("oauth_nonce" , oauthNonce);
put("oauth_signature_method" , "HMAC-SHA1");
put("oauth_timestamp" , String.valueOf(timestamp));
put("oauth_token" , accessToken);
put("oauth_version" , "1.0");
put("track" , track);
}};
String join = parameters.entrySet().stream()
.map(kv -> OAuth.percentEncode(kv.getKey()) + "=" + OAuth.percentEncode(kv.getValue()))
.sorted()
.collect(Collectors.joining("&"));
logger.debug("joined parameters = {}" , join);
StringBuilder sb = new StringBuilder();
sb.append("POST&");
sb.append(OAuth.percentEncode("https://stream.twitter.com/1.1/statuses/filter.json"));
sb.append("&");
sb.append(OAuth.percentEncode(join));
String sigBaseString = sb.toString();
return sigBaseString;
} // base string
private String getDst(String nonce , String signature , long timestamp) {
Map<String , String> headers7 = new LinkedHashMap<String , String>() {{
put("oauth_consumer_key" , consumerKey);
put("oauth_nonce" , nonce);
put("oauth_signature" , signature);
put("oauth_signature_method" , "HMAC-SHA1");
put("oauth_timestamp" , String.valueOf(timestamp));
put("oauth_token" , accessToken);
put("oauth_version" , "1.0");
}};
StringBuilder sb = new StringBuilder();
sb.append("OAuth ");
sb.append(headers7.entrySet().stream()
.map(kv -> OAuth.percentEncode(kv.getKey()) + "=\"" + OAuth.percentEncode(kv.getValue()) + "\"")
.collect(Collectors.joining(", "))
);
String dst = sb.toString();
logger.debug("dst = {}", dst);
return dst;
} // dst
private String getSignature(String signingKey , String sigBaseString) throws Exception {
String sha1 = "HmacSHA1";
SecretKeySpec keySpec = new SecretKeySpec(signingKey.getBytes("UTF-8"), sha1);
Mac mac = Mac.getInstance(sha1);
mac.init(keySpec);
byte[] digest = mac.doFinal(sigBaseString.getBytes("UTF-8"));
String sign = new String(Base64.getEncoder().encode(digest));
return sign;
}
}
package twitter;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TrackerTest {
private Logger logger = LoggerFactory.getLogger(getClass());
@Test
public void testStream() throws Exception {
Tracker t = new Tracker("taiwan");
t.streamRequest().toBlocking().forEach(
s -> logger.info("{}" , s)
);
}
}
@smallufo
Copy link
Author

This is much simpler ( with twitter4j )
https://gist.github.com/smallufo/d157b146e14dc017c461

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment