Skip to content

Instantly share code, notes, and snippets.

@RoryKelly
Last active February 7, 2018 07:58
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save RoryKelly/e34d07b1eb34b289f0f6d00f628a7c47 to your computer and use it in GitHub Desktop.
Save RoryKelly/e34d07b1eb34b289f0f6d00f628a7c47 to your computer and use it in GitHub Desktop.
Mirrored Source for OKIO.
/**
* Takes a bytes source and streams it to parallel sources. I.E Streaming bytes to two server in parallel.
*/
public final class MirroredSource {
private final Buffer buffer = new Buffer();
private final Source source;
private final AtomicBoolean sourceExhausted = new AtomicBoolean();
private boolean closed = false;
/**
* Takes a bytes source and streams it to parallel sources, I.E Streaming bytes to two server in parallel.
*
* @param source The Bytes source you want to stream in parallel.
*/
public MirroredSource(final Source source) {
this.source = source;
}
/**
* As soon as you read from the returned source, its output is copied and buffered. This buffer can then be read from
* mirror().
*
* @return a byte source.
*/
public final Source original() {
return new okio.Source() {
@Override public long read(final Buffer sink, final long byteCount) throws IOException {
final long bytesRead = source.read(sink, byteCount);
if (bytesRead > 0) {
synchronized (buffer) {
sink.copyTo(buffer, sink.size() - bytesRead, bytesRead);
// Notfiy the mirror to continue
buffer.notifyAll();
}
} else {
synchronized (buffer) {
buffer.notifyAll();
}
sourceExhausted.set(true);
}
return bytesRead;
}
@Override public Timeout timeout() {
return source.timeout();
}
@Override public void close() throws IOException {
source.close();
sourceExhausted.set(true);
synchronized (buffer) {
buffer.notifyAll();
}
}
};
}
/**
* A byte source. Will emit all bytes emitted from original(). Will end when original() is exhausted.
*
* @return A bytes source.
*/
public final Source mirror() {
return new okio.Source() {
@Override public long read(final Buffer sink, final long byteCount) throws IOException {
if (closed) new IllegalStateException("reading closed source");
while (!sourceExhausted.get()) {
// only need to synchronise on reads when the source is not exhausted.
synchronized (buffer) {
if (buffer.request(byteCount)) {
return buffer.read(sink, byteCount);
} else {
try {
buffer.wait(200);
} catch (final InterruptedException e) {
return -1;
}
}
}
}
return buffer.read(sink, byteCount);
}
@Override public Timeout timeout() {
return new Timeout();
}
@Override public void close() throws IOException {
buffer.clear();
closed = true;
}
};
}
}
public final class OkioTools {
public static Source tee(final Source source, final BufferedSink cacheBody) {
return new Source() {
@Override public long read(final Buffer sink, final long byteCount) throws IOException {
final long bytesRead;
bytesRead = source.read(sink, byteCount);
if (bytesRead == -1) {
return -1;
}
sink.copyTo(cacheBody.buffer(), sink.size() - bytesRead, bytesRead);
cacheBody.emitCompleteSegments();
cacheBody.flush();
return bytesRead;
}
@Override public Timeout timeout() {
return source.timeout();
}
@Override public void close() throws IOException {
cacheBody.flush();
cacheBody.close();
source.close();
}
};
}
public static MirroredSource mirror(final Source source) {
return new MirroredSource(source);
}
/**
* Takes a source and emits the cumulative bytes at I.E byteInterval, byteInterval* 2, byteInterval * N.
*
* @param source Recording Source.
* @param byteInterval The byte interval to emit each byte array at.
* @return Emits a byte array at byteInterval length, i.e byteInterval, byteInterval* 2, byteInterval * N.
*/
public static Observable<byte[]> cumulativeEmit(final Source source, final int byteInterval) {
Preconditions.checkArgument(byteInterval > 0, "byte interval must be greater that zero.");
final Buffer allAudioData = new Buffer();
return Observable.create(subscriber -> {
try (BufferedSource bufferedSource = Okio.buffer(source)) {
int nextEmission = byteInterval;
while (!subscriber.isUnsubscribed() && !bufferedSource.exhausted()) {
// read into allAudioData
bufferedSource.read(allAudioData, byteInterval);
if (nextEmission < allAudioData.size()){
// copy the allAudioData to a temp allAudioData so i can get a byte array.
final Buffer tempBuffer = new Buffer();
allAudioData.copyTo(tempBuffer, 0, nextEmission);
subscriber.onNext(tempBuffer.readByteArray());
tempBuffer.clear();
nextEmission += byteInterval;
}
}
allAudioData.clear();
} catch (final IOException e) {
subscriber.onError(e);
}
subscriber.onCompleted();
});
}
}
@RoryKelly
Copy link
Author

Updated with Documentation and a few useful static helpers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment