Created
January 19, 2015 01:01
-
-
Save erukiti/a3cd0aab411aeab5a0f8 to your computer and use it in GitHub Desktop.
RxJava + Play2 + aws-java-sdk で S3(RiakCS) のドキュメントを取ってくる
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers; | |
import com.amazonaws.AmazonClientException; | |
import com.amazonaws.ClientConfiguration; | |
import com.amazonaws.Protocol; | |
import com.amazonaws.auth.BasicAWSCredentials; | |
import com.amazonaws.services.s3.AmazonS3Client; | |
import com.amazonaws.services.s3.model.S3Object; | |
import play.*; | |
import play.mvc.*; | |
import rx.Observable; | |
import rx.Subscriber; | |
import java.io.BufferedReader; | |
import java.io.InputStreamReader; | |
public class Application extends Controller { | |
static Result result; | |
public static Result index() { | |
String s3_key = Play.application().configuration().getString("s3.key"); | |
String s3_secret = Play.application().configuration().getString("s3.secret"); | |
String s3_host = Play.application().configuration().getString("s3.host"); | |
Integer s3_port = Play.application().configuration().getInt("s3.port"); | |
BasicAWSCredentials credentials = new BasicAWSCredentials(s3_key, s3_secret); | |
ClientConfiguration conf = new ClientConfiguration(); | |
conf.setProtocol(Protocol.HTTP); | |
conf.setProxyHost(s3_host); | |
conf.setProxyPort(s3_port); | |
AmazonS3Client s3 = new AmazonS3Client(credentials, conf); | |
Observable<S3Object> getS3object = Observable.create(observer -> { | |
new Thread(() -> { | |
try { | |
observer.onNext(s3.getObject("test", "/hoge.txt")); | |
} catch (AmazonClientException e) { | |
observer.onError(e); | |
} | |
observer.onCompleted(); | |
}).start(); | |
}); | |
Observable<String> getLines = getS3object.flatMap(obj -> { | |
return Observable.create((Subscriber<? super String> observer) -> { | |
new Thread(() -> { | |
try { | |
BufferedReader reader = new BufferedReader(new InputStreamReader(obj.getObjectContent())); | |
String line; | |
while ((line = reader.readLine()) != null) { | |
observer.onNext(line); | |
} | |
} catch (Throwable e) { | |
observer.onError(e); | |
} | |
observer.onCompleted(); | |
}).start(); | |
}); | |
}); | |
getLines.reduce((acc, x) -> acc + "\n" + x).map(str -> ok(str)).onErrorReturn(err -> ok(err.toString())).subscribe(res -> { | |
result = res; | |
}); | |
return result; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thread生成するより subscribeOn/observeOn を使ったほうが見やすいかも