Skip to content

Instantly share code, notes, and snippets.

@erukiti
Created January 19, 2015 01:01
Show Gist options
  • Save erukiti/a3cd0aab411aeab5a0f8 to your computer and use it in GitHub Desktop.
Save erukiti/a3cd0aab411aeab5a0f8 to your computer and use it in GitHub Desktop.
RxJava + Play2 + aws-java-sdk で S3(RiakCS) のドキュメントを取ってくる
package controllers;
import com.amazonaws.AmazonClientException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.S3Object;
import play.*;
import play.mvc.*;
import rx.Observable;
import rx.Subscriber;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class Application extends Controller {
static Result result;
public static Result index() {
String s3_key = Play.application().configuration().getString("s3.key");
String s3_secret = Play.application().configuration().getString("s3.secret");
String s3_host = Play.application().configuration().getString("s3.host");
Integer s3_port = Play.application().configuration().getInt("s3.port");
BasicAWSCredentials credentials = new BasicAWSCredentials(s3_key, s3_secret);
ClientConfiguration conf = new ClientConfiguration();
conf.setProtocol(Protocol.HTTP);
conf.setProxyHost(s3_host);
conf.setProxyPort(s3_port);
AmazonS3Client s3 = new AmazonS3Client(credentials, conf);
Observable<S3Object> getS3object = Observable.create(observer -> {
new Thread(() -> {
try {
observer.onNext(s3.getObject("test", "/hoge.txt"));
} catch (AmazonClientException e) {
observer.onError(e);
}
observer.onCompleted();
}).start();
});
Observable<String> getLines = getS3object.flatMap(obj -> {
return Observable.create((Subscriber<? super String> observer) -> {
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(obj.getObjectContent()));
String line;
while ((line = reader.readLine()) != null) {
observer.onNext(line);
}
} catch (Throwable e) {
observer.onError(e);
}
observer.onCompleted();
}).start();
});
});
getLines.reduce((acc, x) -> acc + "\n" + x).map(str -> ok(str)).onErrorReturn(err -> ok(err.toString())).subscribe(res -> {
result = res;
});
return result;
}
}
@daneko
Copy link

daneko commented Jan 19, 2015

Thread生成するより subscribeOn/observeOn を使ったほうが見やすいかも

Observable<S3Object> getS3object = Observable.create(observer -> {
                try {
                    observer.onNext(s3.getObject("test", "/hoge.txt"));
                    observer.onCompleted();
                } catch (AmazonClientException e) {
                    observer.onError(e);
                }
        }).subscribeOn(Schedulers.io());

Observable<String> getLines = getS3object.flatMap(obj -> 
            Observable.create((Subscriber<? super String> observer) -> {
                 try {
                     BufferedReader reader = new BufferedReader(new InputStreamReader(obj.getObjectContent()));
                     String line;
                     while ((line = reader.readLine()) != null) {
                         observer.onNext(line);
                     }
                     observer.onCompleted();
                 } catch (Throwable e) {
                     observer.onError(e);
                 }
            })
        );

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment