Last active
August 29, 2015 14:15
-
-
Save smallufo/efd39c8997db8f43dca9 to your computer and use it in GitHub Desktop.
Listening twitter keyword , implemented by OkHttp + RxJava's Observables
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by smallufo on 2015-02-22. | |
*/ | |
package twitter; | |
import com.squareup.okhttp.*; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.functions.Func1; | |
import rx.subscriptions.Subscriptions; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.UnsupportedEncodingException; | |
import java.util.Arrays; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
/** | |
* Courtesy of @paulcbetts paulcbetts@github | |
* https://gist.github.com/paulcbetts/2274581f24ded7502011 | |
*/ | |
public class RxOkHttp { | |
private static Logger logger = LoggerFactory.getLogger(RxOkHttp.class); | |
public static Observable<Response> request(OkHttpClient client, Request request) { | |
return Observable.create((Subscriber<? super Response> subj) -> { | |
final Call call = client.newCall(request); | |
subj.add(Subscriptions.create(call::cancel)); | |
call.enqueue(new Callback() { | |
@Override | |
public void onFailure(Request request, IOException e) { | |
subj.onError(e); | |
} | |
@Override | |
public void onResponse(Response response) throws IOException { | |
Throwable error = getFailureExceptionOnBadStatus(response); | |
if (error != null) { | |
subj.onError(error); | |
return; | |
} | |
subj.onNext(response); | |
subj.onCompleted(); | |
} | |
}); | |
}); | |
} | |
public static Observable<byte[]> streamBytes(OkHttpClient client, Request request) { | |
return request(client, request) | |
.flatMap(response -> Observable.create( | |
(Subscriber<? super byte[]> subj) -> | |
{ | |
Callable<Void> callable = () -> { | |
InputStream stream; | |
byte[] buffer = new byte[65536]; | |
int bytesRead = 0; | |
stream = response.body().byteStream(); | |
try { | |
while (bytesRead > -1 && !subj.isUnsubscribed()) { | |
bytesRead = stream.read(buffer, 0, 65536); | |
if (bytesRead < 1) | |
continue; | |
subj.onNext(Arrays.copyOfRange(buffer, 0, bytesRead)); | |
} | |
if (!subj.isUnsubscribed()) | |
subj.onCompleted(); | |
stream.close(); | |
} catch (IOException ex) { | |
subj.onError(ex); | |
} | |
return null; | |
}; | |
ExecutorService executorService = Executors.newFixedThreadPool(1); | |
subj.add(Subscriptions.create(() -> executorService.shutdown() )); | |
executorService.submit(callable); | |
} | |
) | |
); | |
} | |
public static Observable<String> streamStrings(OkHttpClient client, Request request) { | |
return streamBytes(client, request).map(bytes -> { | |
try { | |
return new String(bytes, "UTF-8"); | |
} catch (UnsupportedEncodingException e) { | |
throw new RuntimeException("UTF8 isn't supported this will never happen"); | |
} | |
}); | |
} | |
public static Observable<String> streamLines(OkHttpClient client, Request request) { | |
return streamStrings(client, request) | |
.concatWith(Observable.just("\n")) | |
.flatMap(new Func1<String, Observable<? extends String>>() { | |
String remainingString = ""; | |
@Override | |
public Observable<? extends String> call(String s) { | |
String[] lines = (remainingString + s).split("\n"); | |
if (s.charAt(s.length() - 1) != '\n') { | |
remainingString = lines[lines.length - 1]; | |
return Observable.from(Arrays.copyOfRange(lines, 0, lines.length - 1)); | |
} | |
remainingString = ""; | |
return Observable.from(lines); | |
} | |
}).filter(x -> x.length() > 0); | |
} | |
private static Throwable getFailureExceptionOnBadStatus(Response resp) { | |
if (resp.code() < 399) | |
return null; | |
return new RuntimeException(resp.message()); | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by smallufo on 2015-02-22. | |
*/ | |
package twitter; | |
import com.squareup.okhttp.OkHttpClient; | |
import com.squareup.okhttp.Request; | |
import com.squareup.okhttp.RequestBody; | |
import net.oauth.OAuth; | |
import org.apache.commons.lang3.RandomStringUtils; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import rx.Observable; | |
import javax.crypto.Mac; | |
import javax.crypto.spec.SecretKeySpec; | |
import java.util.Base64; | |
import java.util.HashMap; | |
import java.util.LinkedHashMap; | |
import java.util.Map; | |
import java.util.stream.Collectors; | |
public class Tracker { | |
private Logger logger = LoggerFactory.getLogger(getClass()); | |
private final String consumerKey = "xxx"; | |
/** consumer secret */ | |
private final String secretKey = "xxx"; | |
private final String accessToken = "xxx"; | |
private final String accessSecret = "xxx"; | |
private final String track; | |
public Tracker(String track) {this.track = track;} | |
public Observable<String> streamRequest() throws Exception { | |
OkHttpClient client = new OkHttpClient(); | |
long timestamp = System.currentTimeMillis() / 1000; | |
String nonce = RandomStringUtils.randomAlphanumeric(32); | |
logger.info("timestamp = {} , nonce = {}", timestamp , nonce); | |
String sigBaseString = getSigBaseString(nonce , timestamp); | |
logger.info("sigBaseString = {}", sigBaseString); | |
StringBuilder sb = new StringBuilder(); | |
sb.append(OAuth.percentEncode(secretKey)); | |
sb.append("&"); | |
sb.append(OAuth.percentEncode(accessSecret)); | |
String signingKey = sb.toString(); | |
logger.info("signingKey = {}" , signingKey); | |
String signature = getSignature(signingKey , sigBaseString); | |
logger.info("sign = {}" , signature); | |
String dst = getDst(nonce , signature , timestamp); | |
logger.info("dst = {}" , dst); | |
Request request = new Request.Builder() | |
.url("https://stream.twitter.com/1.1/statuses/filter.json?track="+track) | |
.header("Authorization" , dst) | |
.post(RequestBody.create(null, "track="+track)) | |
.build(); | |
return RxOkHttp.streamLines(client, request); | |
} | |
private String getSigBaseString(String oauthNonce , long timestamp) { | |
Map<String , String> parameters = new HashMap<String , String>() {{ | |
put("oauth_consumer_key" , consumerKey); | |
put("oauth_nonce" , oauthNonce); | |
put("oauth_signature_method" , "HMAC-SHA1"); | |
put("oauth_timestamp" , String.valueOf(timestamp)); | |
put("oauth_token" , accessToken); | |
put("oauth_version" , "1.0"); | |
put("track" , track); | |
}}; | |
String join = parameters.entrySet().stream() | |
.map(kv -> OAuth.percentEncode(kv.getKey()) + "=" + OAuth.percentEncode(kv.getValue())) | |
.sorted() | |
.collect(Collectors.joining("&")); | |
logger.debug("joined parameters = {}" , join); | |
StringBuilder sb = new StringBuilder(); | |
sb.append("POST&"); | |
sb.append(OAuth.percentEncode("https://stream.twitter.com/1.1/statuses/filter.json")); | |
sb.append("&"); | |
sb.append(OAuth.percentEncode(join)); | |
String sigBaseString = sb.toString(); | |
return sigBaseString; | |
} // base string | |
private String getDst(String nonce , String signature , long timestamp) { | |
Map<String , String> headers7 = new LinkedHashMap<String , String>() {{ | |
put("oauth_consumer_key" , consumerKey); | |
put("oauth_nonce" , nonce); | |
put("oauth_signature" , signature); | |
put("oauth_signature_method" , "HMAC-SHA1"); | |
put("oauth_timestamp" , String.valueOf(timestamp)); | |
put("oauth_token" , accessToken); | |
put("oauth_version" , "1.0"); | |
}}; | |
StringBuilder sb = new StringBuilder(); | |
sb.append("OAuth "); | |
sb.append(headers7.entrySet().stream() | |
.map(kv -> OAuth.percentEncode(kv.getKey()) + "=\"" + OAuth.percentEncode(kv.getValue()) + "\"") | |
.collect(Collectors.joining(", ")) | |
); | |
String dst = sb.toString(); | |
logger.debug("dst = {}", dst); | |
return dst; | |
} // dst | |
private String getSignature(String signingKey , String sigBaseString) throws Exception { | |
String sha1 = "HmacSHA1"; | |
SecretKeySpec keySpec = new SecretKeySpec(signingKey.getBytes("UTF-8"), sha1); | |
Mac mac = Mac.getInstance(sha1); | |
mac.init(keySpec); | |
byte[] digest = mac.doFinal(sigBaseString.getBytes("UTF-8")); | |
String sign = new String(Base64.getEncoder().encode(digest)); | |
return sign; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package twitter; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
public class TrackerTest { | |
private Logger logger = LoggerFactory.getLogger(getClass()); | |
@Test | |
public void testStream() throws Exception { | |
Tracker t = new Tracker("taiwan"); | |
t.streamRequest().toBlocking().forEach( | |
s -> logger.info("{}" , s) | |
); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is much simpler ( with twitter4j )
https://gist.github.com/smallufo/d157b146e14dc017c461