Skip to content

Instantly share code, notes, and snippets.

@BCsl
Last active December 8, 2017 07:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save BCsl/824a705631a5be9fbd148fbb08b11288 to your computer and use it in GitHub Desktop.
Save BCsl/824a705631a5be9fbd148fbb08b11288 to your computer and use it in GitHub Desktop.
多线程断点下载
/**
* Created by chensuilun on 2017/12/08.
*/
public class DownloadManagerRx implements IDownloadListener {
private static final String TAG = "DownloadManagerRx";
private static DownloadManagerRx mInstance;
private HashMap<Object, Disposable> mRunningRequest;
private Map<Object, Float> mProgress;
private List<IDownloadListener> mIDownloadListeners;
public DownloadManagerRx() {
mRunningRequest = new HashMap<Object, Disposable>();
mProgress = new HashMap<Object, Float>();
mIDownloadListeners = new LinkedList<IDownloadListener>();
}
public void addListener(IDownloadListener listener) {
if (!mIDownloadListeners.contains(listener)) {
mIDownloadListeners.add(listener);
}
for (Map.Entry<Object, Float> entry : mProgress.entrySet()) {
onProgress(entry.getKey(), Math.max(0, entry.getValue()));
}
}
public void removeListener(IDownloadListener listener) {
mIDownloadListeners.remove(listener);
}
public static DownloadManagerRx getInstance() {
if (mInstance == null) {
synchronized (DownloadManagerRx.class) {
if (mInstance == null) {
mInstance = new DownloadManagerRx();
}
}
}
return mInstance;
}
/**
* 开始下载
*
* @param downloadRequest
*/
public synchronized void start(DownloadRequest downloadRequest) {
final Object tag = downloadRequest.getTag();
Disposable existCall = mRunningRequest.get(tag);
if (existCall != null) {
if (BuildConfig.DEBUG) {
Log.d(TAG, "start: request is downloading");
}
return;
}
new FileDownloadObservable(downloadRequest)
.compose(RxTransformer.<Float>async())
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
if (BuildConfig.DEBUG) {
Log.i(TAG, "开始下载\n" + tag);
}
mRunningRequest.put(tag, disposable);
onStart(tag);
}
})
.subscribe(new Consumer<Float>() {
@Override
public void accept(Float progress) throws Exception {
if (BuildConfig.DEBUG) {
Log.v(TAG, tag + ",下载进度" + progress);
}
onProgress(tag, progress);
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
if (BuildConfig.DEBUG) {
Log.e(TAG, tag + "\n下载异常:" + throwable.getMessage());
}
onCancel(tag);
}
}, new Action() {
@Override
public void run() throws Exception {
Log.e(TAG, "下载结束" + tag);
onSuccess(tag);
}
});
}
/**
* 取消下载
*
* @param tag
*/
public synchronized void cancel(Object tag) {
if (BuildConfig.DEBUG) {
Log.d(TAG, "cancel() called with: " + "tag = [" + tag + "]");
}
Disposable existCall = mRunningRequest.get(tag);
if (existCall != null && !existCall.isDisposed()) {
existCall.dispose();
onCancel(tag);
}
}
@Override
public void onStart(Object tag) {
mProgress.put(tag, 0f);
for (IDownloadListener iDownloadListener : mIDownloadListeners) {
iDownloadListener.onStart(tag);
}
}
@Override
public void onCancel(Object tag) {
mProgress.remove(tag);
mRunningRequest.remove(tag);
for (IDownloadListener iDownloadListener : mIDownloadListeners) {
iDownloadListener.onCancel(tag);
}
}
public void onProgress(Object tag, float progress) {
for (IDownloadListener iDownloadListener : mIDownloadListeners) {
iDownloadListener.onProgress(tag, progress);
}
}
@Override
public void onFailed(Object tag) {
mProgress.remove(tag);
mRunningRequest.remove(tag);
for (IDownloadListener iDownloadListener : mIDownloadListeners) {
iDownloadListener.onFailed(tag);
}
}
@Override
public void onSuccess(Object tag) {
mProgress.remove(tag);
mRunningRequest.remove(tag);
AppApplication.getGlobalEventBus().post(new CacheInvalidEvent());
for (IDownloadListener iDownloadListener : mIDownloadListeners) {
iDownloadListener.onSuccess(tag);
}
}
/**
* @param tag 是否正在下载
*/
public synchronized boolean isInDownloadQueue(Object tag) {
Disposable existCall = mRunningRequest.get(tag);
if (existCall != null && !existCall.isDisposed()) {
return true;
}
return false;
}
}
/**
* Created by chensuilun on 2017/4/19.
*/
public class DownloadRequest {
private final Object mTag;
//存储路径,例如:/storage/download/
private final String mSavedPath;
//下载路径
private final String mRemoteUrl;
//文件名,例如 file.png
private final String mName;
public DownloadRequest(Object tag, String remoteUrl, String savedPath, String name) {
mTag = tag;
mSavedPath = savedPath;
mRemoteUrl = remoteUrl;
mName = name;
}
public Object getTag() {
return mTag;
}
public String getSavedPath() {
return mSavedPath;
}
public String getRemoteUrl() {
return mRemoteUrl;
}
public String getName() {
return mName;
}
public String getFullFilePath() {
return mSavedPath + mName;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DownloadRequest that = (DownloadRequest) o;
if (mTag != null ? !mTag.equals(that.mTag) : that.mTag != null) return false;
if (mSavedPath != null ? !mSavedPath.equals(that.mSavedPath) : that.mSavedPath != null)
return false;
if (mRemoteUrl != null ? !mRemoteUrl.equals(that.mRemoteUrl) : that.mRemoteUrl != null)
return false;
return mName != null ? mName.equals(that.mName) : that.mName == null;
}
@Override
public int hashCode() {
int result = mTag != null ? mTag.hashCode() : 0;
result = 31 * result + (mSavedPath != null ? mSavedPath.hashCode() : 0);
result = 31 * result + (mRemoteUrl != null ? mRemoteUrl.hashCode() : 0);
result = 31 * result + (mName != null ? mName.hashCode() : 0);
return result;
}
@Override
public String toString() {
return "DownloadRequest{" +
"mTag=" + mTag +
", mSavedPath='" + mSavedPath + '\'' +
", mRemoteUrl='" + mRemoteUrl + '\'' +
", mName='" + mName + '\'' +
'}';
}
}
/**
* 多线程下载
* Created by chensuilun on 2017/12/7.
*/
public class FileDownloadObservable extends Observable<Float> {
private static final String TAG = "FileDownloadObservable";
public static final int DEFAULT_THREAD_COUNT = 3;
private DownloadRequest mDownloadRequest;
private int mThreadCount;
private DownloadDispose mDownloadTask;
public FileDownloadObservable(@NonNull DownloadRequest downloadRequest) {
this(downloadRequest, DEFAULT_THREAD_COUNT);
}
public FileDownloadObservable(@NonNull DownloadRequest request, int threadCount) {
mDownloadRequest = request;
mThreadCount = Math.max(1, threadCount);
}
@Override
protected void subscribeActual(Observer<? super Float> observer) {
if (mDownloadTask != null) {
Log.e(TAG, "FileDownloadObservable has been subscribed !!!");
return;
}
//只能注册一次,注册后开始下载
mDownloadTask = new DownloadDispose(observer, mDownloadRequest, mThreadCount);
mDownloadTask.startDownload();
}
/**
* 下载任务
*/
static final class DownloadDispose extends ThreadSafeDispose {
public static final String FIELD_PROGRESS = "thread_progress";
public static final String FIELD_FILE_LENGTH = "fileLength";
public static final String CONFIG_SUFFIX = ".download";
public static final String TEMP_SUFFIX = ".temp";
private DownloadRequest mRequest;
private int mThreadCount;
private Observer<? super Float> mObserver;
private FileDownloadService mDownloadService;
private SharedPreferences mPreferences;//存储下载进度
/**
* 记录正在加载的任务
*/
private CompositeDisposable mDisposable = new CompositeDisposable();
private FlowableProcessor<Integer> mCompleteProcessor;
public DownloadDispose(Observer<? super Float> observer, DownloadRequest downloadRequest, int threadCount) {
mObserver = observer;
mRequest = downloadRequest;
mThreadCount = threadCount;
mObserver = observer;
mPreferences = SpUtils.getSharedPreferences(mRequest.getName() + CONFIG_SUFFIX);
mDownloadService = RetrofitUtil.createService(FileDownloadService.class);
mCompleteProcessor = PublishProcessor.<Integer>create().toSerialized();
}
public void startDownload() {
mObserver.onSubscribe(this);
File file = new File(mRequest.getSavedPath(), mRequest.getName());
if (file.exists()) {
if (BuildConfig.DEBUG) {
Log.d(TAG, "file already exist!");
}
mObserver.onNext(1.f);
mObserver.onComplete();
return;
}
onPreStart();
mDisposable.add(mDownloadService
.getContentLengthRx(mRequest.getRemoteUrl())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<ResponseBody>() {
@Override
public void accept(final ResponseBody response) throws Exception {
AppThread.assertWorkerThread("GET-FILE-LENGTH");
final long fileLength = response.contentLength();
if (!isDisposed()) {
if (fileLength >= 0) {
final File temp = new File(mRequest.getSavedPath(), mRequest.getName() + TEMP_SUFFIX);
if (temp.getParentFile().exists() || (!temp.getParentFile().exists() && temp.getParentFile().mkdirs())) {
if (!temp.exists()) {
if (BuildConfig.DEBUG) {
Log.d(TAG, temp.getName() + " 临时文件不存在,清除进度缓存");
}
deleteCacheSp();
}
final RandomAccessFile tmpDownloadFile = new RandomAccessFile(temp, "rw");
tmpDownloadFile.setLength(fileLength);
close(tmpDownloadFile);
mPreferences.edit().putLong(FIELD_FILE_LENGTH, fileLength).commit();
final long blockSize = fileLength / mThreadCount;
mCompleteProcessor
.take(mThreadCount)
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
if (BuildConfig.DEBUG) {
Log.d(TAG, "run:download completed:" + mRequest.getRemoteUrl());
}
if (temp.exists()) {
temp.renameTo(new File(mRequest.getSavedPath(), mRequest.getName()));
deleteCacheSp();
mObserver.onComplete();
} else {
mObserver.onError(new DownloadException("Unknown Error"));
}
}
}).subscribe();
for (int threadId = 0; threadId < mThreadCount; threadId++) {
final long startIndex = threadId * blockSize;
long endIndex = -1;
if (threadId == (mThreadCount - 1)) {
endIndex = fileLength - 1;
} else {
endIndex = (threadId + 1) * blockSize - 1;
}
final long fixEndIndex = endIndex;
final int id = threadId;
final String threadKey = FIELD_PROGRESS + id;
//上次加载的进度
final long fixStartIndex = startIndex + mPreferences.getLong(threadKey, 0);
if (fixStartIndex >= fixEndIndex) {
if (!isDisposed()) {
Log.d(TAG, "download thread: " + id + " end(suc)!!!");
mCompleteProcessor.onNext(id);
}
continue;
}
if (BuildConfig.DEBUG) {
Log.d(TAG, "download start,id:" + id + ",startIndex:" + fixStartIndex + ",fixEndIndex:" + fixEndIndex + ",total:" + fileLength + ",url:" + mRequest.getRemoteUrl());
}
//线程下载文件
final RandomAccessFile threadTmpDownloadFile = new RandomAccessFile(temp, "rw");
mDisposable.add(mDownloadService.downloadWithRangeRx(mRequest.getRemoteUrl(), buildRange(startIndex, fixEndIndex))
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<ResponseBody>() {
@Override
public void accept(ResponseBody responseBody) throws Exception {
AppThread.assertWorkerThread("DOWNLOAD_THREAD_SEG");
if (!isDisposed()) {
if (responseBody.contentLength() >= 0) {
threadTmpDownloadFile.seek(fixStartIndex);
InputStream is = responseBody.byteStream();// 获取流
byte[] buffer = new byte[4086];
int length = -1;
int total = 0;// 记录本次下载文件的大小
long progress = 0;
SharedPreferences.Editor editor = mPreferences.edit();
while (!isDisposed() && (length = is.read(buffer)) > 0) {
threadTmpDownloadFile.write(buffer, 0, length);
total += length;
progress = fixStartIndex + total - startIndex;
//记录进度
editor.putLong(threadKey, progress);
editor.commit();
updateProgress();
}
if (BuildConfig.DEBUG) {
Log.d(TAG, "download thread: " + id + " end(suc or not)!!!");
}
close(is, response);
} else {
throw new IllegalStateException("Invalid content length for " + mRequest.getRemoteUrl() + " of segment : " + id);
}
}
}
}, new Consumer<Throwable>() {
@Override
public void accept(final Throwable throwable) throws Exception {
throwable.printStackTrace();
close(threadTmpDownloadFile);
if (!isDisposed()) {
//一个线程出错,全部都要停止了
dispose();
mObserver.onError(throwable);
}
}
}, new Action() {
@Override
public void run() throws Exception {
close(threadTmpDownloadFile);
if (!isDisposed()) {
mCompleteProcessor.onNext(id);
}
}
})
);
}
} else {
throw new DownloadException("Cannot create temp file for " + mRequest.getRemoteUrl());
}
} else {
throw new DownloadException("Invalid content length for " + mRequest.getRemoteUrl());
}
}
response.close();
}
}
, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
throwable.printStackTrace();
if (!isDisposed()) {
mObserver.onError(throwable);
}
}
}));
}
/**
* 关闭资源
*
* @param closeables
*/
private void close(Closeable... closeables) {
if (closeables == null) {
return;
}
int length = closeables.length;
try {
for (int i = 0; i < length; i++) {
Closeable closeable = closeables[i];
if (null != closeable)
closeables[i].close();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
for (int i = 0; i < length; i++) {
closeables[i] = null;
}
}
}
/**
* 删除临时文件
*/
private void cleanFile(File... files) {
if (files == null) {
return;
}
for (int i = 0, length = files.length; i < length; i++) {
if (null != files[i])
files[i].delete();
}
}
/**
* 删除保存进度用的SharedPreferences.
*/
void deleteCacheSp() {
//清空文件
if (mPreferences != null) {
mPreferences.edit().clear().apply();
}
//删除文件
File file = new File("/data/data/" + BuildConfig.APPLICATION_ID + "/shared_prefs", mRequest.getName() + CONFIG_SUFFIX + ".xml");
if (file.exists()) {
file.delete();
}
}
/**
* 准备开始之前先进行缓存进度的回调
*/
private void onPreStart() {
updateProgress();
}
private void updateProgress() {
if (!isDisposed()) {
long sum = 0;
for (int i = 0; i < mThreadCount; i++) {
long progress = mPreferences.getLong(FIELD_PROGRESS + i, 0);
sum += progress;
}
long fileLength = mPreferences.getLong(FIELD_FILE_LENGTH, 0);
mObserver.onNext(sum * 1.0f / fileLength);
}
}
@Override
protected void onDispose() {
Log.e(TAG, "onDispose: 下载取消");
mDisposable.dispose();
}
private String buildRange(long start, long end) {
return "bytes=" + start + "-" + end;
}
}
}
/**
* 动态文件下载服务 for retrofit
* tutorials:https://futurestud.io/tutorials/retrofit-2-how-to-download-files-from-server
* Created by chensuilun on 2017/9/26.
*/
public interface FileDownloadService {
@Streaming
@GET
Flowable<ResponseBody> downloadFileWithRx(@Url String fileUrl);
@Streaming
@GET
Observable<ResponseBody> getContentLengthRx(@Url String fileUrl);
@Streaming
@GET
Observable<ResponseBody> downloadWithRangeRx(@Url String fileUrl, @Header("Range") String range);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment