Skip to content

Instantly share code, notes, and snippets.

@nschwermann
Created October 2, 2014 22:16
Show Gist options
  • Save nschwermann/194276523b8299da1cf6 to your computer and use it in GitHub Desktop.
Save nschwermann/194276523b8299da1cf6 to your computer and use it in GitHub Desktop.
Custom DataSource for ExoPlayer that uses ION/AndroidAsync
import android.content.Context;
import android.net.Uri;
import android.text.TextUtils;
import com.google.android.exoplayer.C;
import com.google.android.exoplayer.upstream.DataSource;
import com.google.android.exoplayer.upstream.DataSpec;
import com.google.android.exoplayer.upstream.HttpDataSource;
import com.google.android.exoplayer.upstream.TransferListener;
import com.google.android.exoplayer.upstream.UnexpectedLengthException;
import com.google.android.exoplayer.util.Predicate;
import com.koushikdutta.async.ByteBufferList;
import com.koushikdutta.async.DataEmitter;
import com.koushikdutta.async.DataSink;
import com.koushikdutta.async.callback.CompletedCallback;
import com.koushikdutta.async.callback.DataCallback;
import com.koushikdutta.async.future.Future;
import com.koushikdutta.async.future.SimpleFuture;
import com.koushikdutta.async.http.libcore.RawHeaders;
import com.koushikdutta.async.parser.AsyncParser;
import com.koushikdutta.ion.HeadersCallback;
import com.koushikdutta.ion.Ion;
import com.koushikdutta.ion.builder.Builders;
import com.pinsightmedia.starshop.util.Log;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class IonDataSource implements DataSource, AsyncParser<Void>{
private static final int MAX_CONNECTIONS = StarPlayer.RENDERER_COUNT;
private static final String TAG = "IonDataSource";
private static final Pattern CONTENT_RANGE_HEADER =
Pattern.compile("^bytes (\\d+)-(\\d+)/(\\d+)$");
/*package*/ static final String ION_INSTANCE = "exoplayer_instance";
private final Ion mIon;
private final TransferListener mTransferListener;
private final boolean mShouldCache;
private final ArrayBlockingQueue<RawHeaders> mHeaderContainer = new ArrayBlockingQueue<RawHeaders>(1);
private ByteBufferList mByteBufferList;
private DataSpec mDataSpec;
private Predicate<String> mContentTypePredicate = HttpDataSource.REJECT_PAYWALL_TYPES;
private long mBytesRead;
private long mDataLength;
private volatile boolean mOpened;
private volatile Exception mException;
private int mConnectTimeout = HttpDataSource.DEFAULT_CONNECT_TIMEOUT_MILLIS;
private int mReadTimeout = HttpDataSource.DEFAULT_READ_TIMEOUT_MILLIS;
private int mResponseCode;
public IonDataSource(final Context context, final boolean enableCache, final TransferListener xferListener) {
mIon = Ion.getInstance(context.getApplicationContext(), ION_INSTANCE);
// mIon.getHttpClient().getSocketMiddleware().setMaxConnectionCount(MAX_CONNECTIONS);
// final int logLevel = Log.getLogLevel();
// if(logLevel != Log.INFO) {
// mIon.configure().setLogging(TAG, logLevel);
// }
//Do this at data source level not ion level, other ion data sources in use at same time may want a different setting.
mShouldCache = enableCache;
mTransferListener = xferListener;
}
@Override
public long open(DataSpec dataSpec) throws HttpDataSource.HttpDataSourceException {
mDataSpec = dataSpec;
mBytesRead = 0;
mException = null;
mByteBufferList = new ByteBufferList();
final Uri uri = dataSpec.uri;
try {
Builders.Any.B b = mIon.build(mIon.getContext()).load(uri.toString());
b.setTimeout(mConnectTimeout);
b.addHeader("Range", buildRangeHeader(mDataSpec));
b.group(IonDataSource.this);
b.onHeaders(new HeadersCallback() {
@Override
public void onHeaders(RawHeaders headers) {
mResponseCode = headers.getResponseCode();
mHeaderContainer.add(headers);
}
});
if(!mShouldCache) b.noCache();
b.as(this);
final RawHeaders headers = mHeaderContainer.take();
if(mResponseCode < 200 || mResponseCode > 299) {
mIon.cancelAll(IonDataSource.this);
throw new HttpDataSource.InvalidResponseCodeException(mResponseCode, headers.toMultimap(), mDataSpec);
}
final String contentType = headers.get("Content-Type");
if(contentType != null && !mContentTypePredicate.evaluate(contentType)){
mIon.cancelAll(IonDataSource.this);
throw new HttpDataSource.InvalidContentTypeException(contentType, mDataSpec);
}
final long contentLength = getContentLength(headers);
mDataLength = mDataSpec.length == C.LENGTH_UNBOUNDED ? contentLength : mDataSpec.length;
if(mDataSpec.length != C.LENGTH_UNBOUNDED && contentLength != C.LENGTH_UNBOUNDED
&& contentLength != mDataLength){
//The DataSpec specified a length and we resolved a length from the response headers but they didnt match
mIon.cancelAll(IonDataSource.this);
throw new HttpDataSource.HttpDataSourceException(new UnexpectedLengthException(dataSpec.length, contentLength), dataSpec);
}
mOpened = true;
if (mTransferListener != null) {
mTransferListener.onTransferStart();
}
return mDataLength;
}
catch (InterruptedException e) {
throw new HttpDataSource.HttpDataSourceException("Interrupted:" + e.getMessage(), mDataSpec);
}
}
@Override
public void close() throws HttpDataSource.HttpDataSourceException {
mIon.cancelAll(IonDataSource.this);
mByteBufferList.recycle();
mByteBufferList = null;
mException = null;
mDataSpec = null;
mDataLength = 0;
mResponseCode = 0;
if (mOpened) {
mOpened = false;
if (mTransferListener != null) {
mTransferListener.onTransferEnd();
}
}
mHeaderContainer.clear();
}
@Override
public int read(byte[] buffer, int offset, int readLength) throws HttpDataSource.HttpDataSourceException {
if(mException != null) {
if(mException instanceof IOException){
throw new HttpDataSource.HttpDataSourceException((IOException)mException, mDataSpec);
} else {
throw new HttpDataSource.HttpDataSourceException(mException.getMessage(), mDataSpec);
}
}
int read;
try {
read = readHelper(buffer, offset, readLength);
} catch (IOException e) {
throw new HttpDataSource.HttpDataSourceException(e, mDataSpec);
}
if(read > 0){
mBytesRead += read;
if(mTransferListener != null){
mTransferListener.onBytesTransferred(read);
}
} else if(mDataLength != C.LENGTH_UNBOUNDED && mDataLength != mBytesRead) {
//server closed the connection having not sent the correct amount of data
throw new HttpDataSource.HttpDataSourceException(new UnexpectedLengthException(mDataLength, mBytesRead), mDataSpec);
}
return read;
}
public int readHelper(byte[] buffer, int offset, int length) throws IOException {
synchronized (mByteBufferList) {
if (mByteBufferList.remaining() <= 0) {
try {
mByteBufferList.wait(mReadTimeout);
} catch (InterruptedException e) {
//eat it
}
if (mByteBufferList.remaining() <= 0) return -1;
}
int toRead = Math.min(length, mByteBufferList.remaining());
mByteBufferList.get(buffer, offset, toRead);
return toRead;
}
}
@Override
public Future<Void> parse(final DataEmitter emitter) {
final SimpleFuture<Void> ret = new SimpleFuture<Void>(){
@Override
protected void cancelCleanup() {
Log.d(TAG, "cancel");
emitter.close();
}
};
emitter.setDataCallback(new DataCallback() {
@Override
public void onDataAvailable(DataEmitter emitter, ByteBufferList data) {
synchronized (mByteBufferList) {
try {
if (!ret.isCancelled()) {
data.get(mByteBufferList);
}
}finally {
mByteBufferList.notify();
}
}
}
});
emitter.setEndCallback(new CompletedCallback() {
@Override
public void onCompleted(Exception ex) {
if (ex != null) {
ret.setComplete(ex);
return;
}
ret.setComplete();
}
});
return ret;
}
@Override
public void write(DataSink sink, Void value, CompletedCallback completed) {
//do nothing
}
/**
* Copy/pasted from {@link com.google.android.exoplayer.upstream.HttpDataSource#buildRangeHeader(com.google.android.exoplayer.upstream.DataSpec)}
* @param dataSpec
* @return
*/
private String buildRangeHeader(DataSpec dataSpec) {
String rangeRequest = "bytes=" + dataSpec.position + "-";
if (dataSpec.length != C.LENGTH_UNBOUNDED) {
rangeRequest += (dataSpec.position + dataSpec.length - 1);
}
return rangeRequest;
}
/**
* Basically copy/pasted from {@link com.google.android.exoplayer.upstream.HttpDataSource#getContentLength(java.net.HttpURLConnection)}
* @param headers
* @return
*/
private long getContentLength(final RawHeaders headers) {
long contentLength = C.LENGTH_UNBOUNDED;
final String contentLengthHeader = headers.get("Content-Length");
if (!TextUtils.isEmpty(contentLengthHeader)) {
try {
contentLength = Long.parseLong(contentLengthHeader);
} catch (NumberFormatException e) {
Log.e(TAG, "Unexpected Content-Length [" + contentLengthHeader + "]");
}
}
final String contentRangeHeader = headers.get("Content-Range");
if (!TextUtils.isEmpty(contentRangeHeader)) {
Matcher matcher = CONTENT_RANGE_HEADER.matcher(contentRangeHeader);
if (matcher.find()) {
try {
long contentLengthFromRange =
Long.parseLong(matcher.group(2)) - Long.parseLong(matcher.group(1)) + 1;
if (contentLength < 0) {
// Some proxy servers strip the Content-Length header. Fall back to the length
// calculated here in this case.
contentLength = contentLengthFromRange;
} else if (contentLength != contentLengthFromRange) {
// If there is a discrepancy between the Content-Length and Content-Range headers,
// assume the one with the larger value is correct. We have seen cases where carrier
// change one of them to reduce the size of a request, but it is unlikely anybody would
// increase it.
Log.w(TAG, "Inconsistent headers [" + contentLengthHeader + "] [" + contentRangeHeader +
"]");
contentLength = Math.max(contentLength, contentLengthFromRange);
}
} catch (NumberFormatException e) {
Log.e(TAG, "Unexpected Content-Range [" + contentRangeHeader + "]");
}
}
}
return contentLength;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment