Skip to content

Instantly share code, notes, and snippets.

@arkadijs
Last active August 29, 2015 14:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arkadijs/18de74c3b8d46ea9865b to your computer and use it in GitHub Desktop.
Save arkadijs/18de74c3b8d46ea9865b to your computer and use it in GitHub Desktop.
Instagram Tags media feed based on Observable / ReactiveX in Scala, Groovy, and Java. https://bitbucket.org/arkadi/instarx
@Grapes([
@Grab('io.reactivex:rxgroovy:1.0.0'),
@Grab('org.codehaus.gpars:gpars:1.2.1'),
@Grab('com.github.groovy-wslite:groovy-wslite:1.1.0')])
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import rx.Observable
import rx.Subscriber
import rx.schedulers.Schedulers
//import java.util.concurrent.ForkJoinPool
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import groovy.json.JsonBuilder
import groovy.json.JsonSlurper
import jsr166y.ForkJoinPool
import static groovyx.gpars.GParsPool.executeAsync
import static groovyx.gpars.GParsPool.withExistingPool
import static groovyx.gpars.GParsPool.withPool
import wslite.rest.RESTClient
String participant = "change-me"
String clientId = "" // obtain your own at https://instagram.com/developer
String api = "https://api.instagram.com/v1"
String ui = "http://localhost:5050/ui"
int interval = 5 // seconds
int count = 10 // count of tagged media to return
def tags = "love instagood me tbt cute follow followme photooftheday happy tagforlikes beautiful girl like selfie picoftheday summer fun smile friends like4like instadaily fashion igers instalike food".split(" ")
int limit = 3 // number of tags per observable stream
def instaRest = new RESTClient(api)
def uiRest = new RESTClient(ui)
def fjp = new ForkJoinPool(limit)
def scheduler = Schedulers.newThread()
@EqualsAndHashCode @ToString(includeNames=true)
class Location { Double latitude; Double longitude; Integer id; String name; }
@EqualsAndHashCode(includes = "url") @ToString
class Media { String tag; String url; Location location; String participant; }
def unjson(json) {
json.data.collect { post ->
def l = post.location
def location = l ? new Location(latitude: l.latitude, longitude: l.longitude, id: l.id, name: l.name) : null
new Media(url: post.images.thumbnail.url, location: location)
}
}
def media = { String tag ->
def json = instaRest.get(path: "/tags/$tag/media/recent", query: [ client_id: clientId, count: count ]).json
def media = unjson(json)
media.each { it.tag = tag }
media
}
println 'Starting...'
// single-threaded HTTP fetch
// Observable<Observable<List<Media>>> => Observable<List<Media>>
def instagram /*Observable<List<Media>>*/ = Observable.merge(Observable.interval(interval, TimeUnit.SECONDS).map { _seq ->
Observable.from(tags.take(limit)).flatMap { tag -> // Observable<List<Media>>
try {
Observable.just(media(tag)) // .from()
} catch (Exception e) {
e.printStackTrace()
Observable.empty()
}
}
})
// multi-threaded HTTP fetch
def instagram2 /*Observable<List<Media>>*/ = Observable.interval(interval, TimeUnit.SECONDS).flatMap { _seq ->
Observable.merge(withExistingPool(fjp) { _pool -> // List<Observable<List<Media>>> => Observable<List<Media>>
executeAsync( /*List<Closure<List<Media>>>*/ tags.take(limit).collect { tag -> { ->
try {
media(tag)
} catch (Exception e) {
e.printStackTrace()
[]
}
}}).collect { future -> Observable.from(future) } // List<Future<List<Media>>> => List<Observable<List<Media>>>
})
}
// flatten
def instagram3 = Observable.create { Subscriber<Media> observer ->
instagram2.subscribe(
{ images -> images.each { image -> observer.onNext(image) } },
{ ex -> observer.onError(ex) },
{ -> observer.onCompleted() }
)
}
//instagram3.subscribe { images -> println(images) }
// de-duplicate
long start = System.currentTimeMillis()
def limited = instagram3.takeWhile { _ -> (System.currentTimeMillis() - start) < 20000 } .replay()
def threaded = limited.observeOn(scheduler)
threaded.subscribe(
{ image -> println(image) },
{ ex -> println("===== error: " + ex) },
{ -> println("===== completed"); Thread.sleep(1000); System.exit(0) }
)
threaded.count().subscribe { sz -> println("===== unfiltered stream size is $sz") }
threaded.distinct().count().subscribe { sz -> println("===== deduplicated stream size is $sz") }
limited.connect()
// send to UI
threaded.subscribe { image ->
image.participant = participant
uiRest.post() {
type "application/json"
charset "UTF-8"
text new JsonBuilder(image).toString()
}
}
Thread.sleep(100000)
package instarx;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.observables.ConnectableObservable;
import rx.schedulers.Schedulers;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import com.mashape.unirest.http.HttpResponse;
import com.mashape.unirest.http.JsonNode;
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.exceptions.UnirestException;
import org.json.JSONArray;
import org.json.JSONObject;
public class JavaMain {
String participant = "change-me";
String clientId = ""; // obtain your own at https://instagram.com/developer
String api = "https://api.instagram.com/v1";
String ui = "http://localhost:5050/ui";
int interval = 5; // seconds
int count = 10; // count of tagged media to return
String[] tags = "invent your own".split(" ");
Scheduler scheduler = Schedulers.newThread();
public static void main(String[] args) {
new JavaMain().run();
}
// http://unirest.io/java.html
List<Media> media(String tag) throws UnirestException {
HttpResponse<JsonNode> resp = Unirest.get(String.format("%s/tags/%s/media/recent", api, tag))
.queryString("client_id", clientId)
.queryString("count", count)
.asJson();
return unjson(resp.getBody().getObject()).stream().map(image -> image.setTag(tag)).collect(Collectors.toList());
}
void run() {
// step one: single-threaded HTTP fetch
Observable<Long> ticker = Observable.interval(interval, TimeUnit.SECONDS);
Observable<String> obstags = Observable.from(tags);
Observable<Observable<List<Media>>> nested = ticker.map(_seq ->
/*Observable<List<Media>>*/ obstags.flatMap(tag -> {
try {
return Observable.just(media(tag));
} catch (Exception e) {
e.printStackTrace();
return Observable.empty();
}
}));
Observable<List<Media>> list = Observable.merge(nested);
list.subscribe(images -> images.forEach(image -> System.out.println("list: " + image)));
// step two: flatten
Observable<Media> instagram = Observable.create(/*Subscriber<Media>*/ observer -> {
list.subscribe(
images -> {
if (!observer.isUnsubscribed()) images.forEach(image -> observer.onNext(image));
},
observer::onError,
observer::onCompleted
);
});
// step three: send to UI
instagram.subscribe(image -> {
try {
Unirest.post(ui).body(new JSONObject(image.setParticipant(participant)).toString()).asBinary();
} catch (UnirestException e) {
e.printStackTrace();
}
});
// step four: deduplicate
long start = System.currentTimeMillis();
ConnectableObservable<Media> limited = instagram.takeWhile( _image -> (System.currentTimeMillis() - start) < 20000) .replay();
Observable<Media> threaded = limited.observeOn(scheduler);
threaded.subscribe(
System.out::println,
ex -> System.out.println("===== error: " + ex),
() -> {
System.out.println("===== completed");
sleep(1000);
System.exit(0);
}
);
threaded.count().forEach(sz -> System.out.println("===== unfiltered stream size is " + sz));
threaded.distinct(image -> image.getUrl()).count().forEach(sz -> System.out.println("===== deduplicated stream size is " + sz));
limited.connect();
sleep(100000);
}
void sleep(long msec) {
try { Thread.sleep(msec); } catch (InterruptedException e) { throw new RuntimeException(e); }
}
List<Media> unjson(JSONObject json) {
JSONArray posts = json.getJSONArray("data");
List<Media> media = new ArrayList<>(posts.length());
for (int i = 0; i < posts.length(); ++i) {
JSONObject post = posts.getJSONObject(i);
JSONObject l = post.optJSONObject("location");
Location location = l == null ? null : new Location(
l.optDouble("latitude"), l.optDouble("longitude"), l.has("id") ? l.getInt("id") : null, l.has("name") ? l.optString("name") : null);
media.add(new Media(post.getJSONObject("images").getJSONObject("thumbnail").getString("url"), location));
}
return media;
}
public class Location {
Double latitude; Double longitude; Integer id; String name;
public Location(Double latitude, Double longitude, Integer id, String name) {
this.latitude = latitude;
this.longitude = longitude;
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "Location{" +
"latitude=" + latitude +
", longitude=" + longitude +
", id=" + id +
", name='" + name + '\'' +
'}';
}
public Double getLatitude() {
return latitude;
}
public Double getLongitude() {
return longitude;
}
public Integer getId() {
return id;
}
public String getName() {
return name;
}
}
public class Media {
String tag; String url; Location location; String participant;
public Media(String url, Location location) {
this.url = url;
this.location = location;
}
public Media setTag(String tag) {
this.tag = tag;
return this;
}
public Media setParticipant(String participant) {
this.participant = participant;
return this;
}
@Override
public String toString() {
return "Media{" +
"tag='" + tag + '\'' +
", url='" + url + '\'' +
", location=" + location +
", participant='" + participant + '\'' +
'}';
}
public String getTag() {
return tag;
}
public String getUrl() {
return url;
}
public Location getLocation() {
return location;
}
public String getParticipant() {
return participant;
}
}
}
package instarx
import scala.collection.JavaConversions._
import scala.util.control.Breaks._
import scala.concurrent.ExecutionContext.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{read, write}
import rx.lang.scala.{Observable, Observer, Subscriber, Subscription}
import rx.lang.scala.subjects.ReplaySubject
import rx.lang.scala.schedulers.NewThreadScheduler
import dispatch._, Defaults._
object Main {
val log = org.slf4j.LoggerFactory.getLogger(getClass)
val participant = Some("change-me")
val clientId = "" // obtain your own at https://instagram.com/developer
val api = "https://api.instagram.com/v1"
val ui = "http://localhost:5050/ui"
val interval = 5.seconds
val count = 10 // count of tagged media to return
val tags = "love instagood me tbt cute follow followme photooftheday happy tagforlikes beautiful girl like selfie picoftheday summer fun smile friends like4like instadaily fashion igers instalike food".split(" ").toSeq
val limit = 3 // number of tags per observable stream
val scheduler = NewThreadScheduler()
case class Location(latitude: Option[Double], longitude: Option[Double], id: Option[BigInt], name: Option[String])
case class Media(tag: String, url: String, location: Option[Location], participant: Option[String] = None)
// Parse Instagram media response to extract image URL and location (if any)
private def unjson(text: String)(implicit tag: String): Seq[Media] = {
implicit val formats = DefaultFormats
val json = parse(text)
val media/*: Seq[Media]*/ = for {
JArray(posts) <- json \ "data"
post <- posts
JString(url) <- post \ "images" \ "thumbnail" \ "url"
} yield Media(tag, url, (post \ "location").toOption.map(_.extract[Location]))
media
}
/* The life of the stream starts as a framework provided stream of timer events created
by Observable.interval(). For every Instagram Tag a HTTP call is made to gather the list
of Media URL-s. The Future returned by HTTP client is converted by Observable.from() into
Observable. Then Observables are flatten()-ed to un-nest them into Observable[Media].
*/
def main(args: Array[String]) {
// Instagram limit is 5000 API requests per hour per client id or access token
val instagram/*: Observable[Seq[Media]]*/ = Observable.interval(interval).map { _ =>
Observable.from(tags.take(limit)).flatMap { implicit tag =>
val req = url(s"$api/tags/$tag/media/recent?client_id=$clientId&count=$count")
val future/*: Future[Seq[Media]]*/ = Http(req OK as.String).map { resp =>
unjson(resp)
} .recover { case e: Exception => Nil }
Observable.from(future)
}
} /*Observable[Observable[Seq[Media]]]*/ .flatten /*=> Observable[Seq[Media]]*/
//instagram.subscribe(images => println(images))
// How to get Observable[Media]]?
// Manual solution using (Replay)Subject which is both a Subscription and Observable:
// the events are bridged from `instagram: Observable[Seq[Media]]` - for each Media in
// the received sequence Subject.onNext() is called.
val instagram2 = Observable((subscriber: Subscriber[Media]) => {
val rsubj = ReplaySubject[Media]
instagram.subscribe(images => images.foreach(rsubj.onNext), ex => rsubj.onError(ex), () => rsubj.onCompleted())
rsubj.subscribe(subscriber)
})
val instagram3 = Observable.create((observer: Observer[Media]) => {
val rsubj = ReplaySubject[Media]
instagram.subscribe(images => images.foreach(rsubj.onNext), ex => rsubj.onError(ex), () => rsubj.onCompleted())
rsubj.subscribe(observer)
})
// What if there is no Observable to subscribe to? How to build an Observable from scratch?
// https://github.com/ReactiveX/RxScala/blob/0.x/examples/src/main/scala/SyncObservable.scala
// Here instagram.subscribe() - a subscription to Observable is used, but it could be anything
// else: Future.onComplete(), Actor body, an imperative piece of code that decides the event
// must be pushed to subscriber, etc.
// Follow the protocol:
// (1) check for Subscriber.isUnsubscribed, (2) call onNext to push the data, (3) finish the
// stream with onCompleted, (4) signal the error _and_ finish the stream with onError.
val instagram4 = Observable((subscriber: Subscriber[Media]) => {
var s: Subscription = null
s = instagram.subscribe(
images => if (!subscriber.isUnsubscribed) images.foreach(subscriber.onNext) else if (s != null) s.unsubscribe(),
ex => subscriber.onError(ex),
() => subscriber.onCompleted())
})
// A simpler solution by using nested Observables
val instagram5/*: Observable[Media]*/ = Observable.interval(interval).map { _ =>
Observable.from(tags.drop(limit).take(limit)).flatMap { implicit tag =>
val req = url(s"$api/tags/$tag/media/recent?client_id=$clientId&count=$count")
val future/*: Future[Observable[Media]]*/ = Http(req OK as.String).map { resp =>
Observable.from(unjson(resp)) // notice the difference: Seq[Media] => Observable[Media]
} .recover { case e: Exception => Observable.empty }
Observable.from(future)
}
} /*Observable[Observable[Observable[Media]]]*/ .flatten.flatten /*=> Observable[Media]*/
// Instagram's media/recent query may return images already pulled in previous run.
// Apply Observable.distinct() to filter out duplicates.
val start = System.currentTimeMillis
val limited = instagram5.takeWhile(_ => (System.currentTimeMillis - start) < 20000).replay
val threaded = limited.observeOn(scheduler) // introduce some concurrency
threaded.subscribe(images => println(images), ex => println("===== error: " + ex), () => { println("===== completed"); Thread.sleep(1000); System.exit(0) })
threaded.size.foreach(sz => println(s"===== unfiltered stream size is $sz"))
threaded.distinct(_.url).size.foreach(sz => println(s"===== deduplicated stream size is $sz"))
limited.connect
// smooth the delivery of images (advanced task) - create Smooth (Average) operator, like
// Sample and Debounce, but (1) no event loss and (2) internal adaptive trigger that track
// incoming rate and smoothly adapts outgoing rate to keep it steady
// 123........45......6790123.......4 => 1...2...3...4..5.....6.7.8.9.0.1.2.3.4
// Send media URL and location to UI
// {"tag":"tbt","url":"https://scontent.cdninstagram.com/....jpg","location":{"latitude":51.504976275,"longitude":-0.087847965,"id":225481160,"name":"The Shard London"},"participant":"change-me"}
threaded.subscribe { image =>
implicit val formats = Serialization.formats(NoTypeHints)
val req = url(ui).setContentType("application/json", "UTF-8").setBody(write(image.copy(participant = participant)))
Await.ready(Http(req.POST), 2.seconds)
}
Thread.sleep(10000)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment