Last active
December 8, 2017 07:51
-
-
Save BCsl/824a705631a5be9fbd148fbb08b11288 to your computer and use it in GitHub Desktop.
多线程断点下载
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by chensuilun on 2017/12/08. | |
*/ | |
public class DownloadManagerRx implements IDownloadListener { | |
private static final String TAG = "DownloadManagerRx"; | |
private static DownloadManagerRx mInstance; | |
private HashMap<Object, Disposable> mRunningRequest; | |
private Map<Object, Float> mProgress; | |
private List<IDownloadListener> mIDownloadListeners; | |
public DownloadManagerRx() { | |
mRunningRequest = new HashMap<Object, Disposable>(); | |
mProgress = new HashMap<Object, Float>(); | |
mIDownloadListeners = new LinkedList<IDownloadListener>(); | |
} | |
public void addListener(IDownloadListener listener) { | |
if (!mIDownloadListeners.contains(listener)) { | |
mIDownloadListeners.add(listener); | |
} | |
for (Map.Entry<Object, Float> entry : mProgress.entrySet()) { | |
onProgress(entry.getKey(), Math.max(0, entry.getValue())); | |
} | |
} | |
public void removeListener(IDownloadListener listener) { | |
mIDownloadListeners.remove(listener); | |
} | |
public static DownloadManagerRx getInstance() { | |
if (mInstance == null) { | |
synchronized (DownloadManagerRx.class) { | |
if (mInstance == null) { | |
mInstance = new DownloadManagerRx(); | |
} | |
} | |
} | |
return mInstance; | |
} | |
/** | |
* 开始下载 | |
* | |
* @param downloadRequest | |
*/ | |
public synchronized void start(DownloadRequest downloadRequest) { | |
final Object tag = downloadRequest.getTag(); | |
Disposable existCall = mRunningRequest.get(tag); | |
if (existCall != null) { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "start: request is downloading"); | |
} | |
return; | |
} | |
new FileDownloadObservable(downloadRequest) | |
.compose(RxTransformer.<Float>async()) | |
.doOnSubscribe(new Consumer<Disposable>() { | |
@Override | |
public void accept(Disposable disposable) throws Exception { | |
if (BuildConfig.DEBUG) { | |
Log.i(TAG, "开始下载\n" + tag); | |
} | |
mRunningRequest.put(tag, disposable); | |
onStart(tag); | |
} | |
}) | |
.subscribe(new Consumer<Float>() { | |
@Override | |
public void accept(Float progress) throws Exception { | |
if (BuildConfig.DEBUG) { | |
Log.v(TAG, tag + ",下载进度" + progress); | |
} | |
onProgress(tag, progress); | |
} | |
}, new Consumer<Throwable>() { | |
@Override | |
public void accept(Throwable throwable) throws Exception { | |
throwable.printStackTrace(); | |
if (BuildConfig.DEBUG) { | |
Log.e(TAG, tag + "\n下载异常:" + throwable.getMessage()); | |
} | |
onCancel(tag); | |
} | |
}, new Action() { | |
@Override | |
public void run() throws Exception { | |
Log.e(TAG, "下载结束" + tag); | |
onSuccess(tag); | |
} | |
}); | |
} | |
/** | |
* 取消下载 | |
* | |
* @param tag | |
*/ | |
public synchronized void cancel(Object tag) { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "cancel() called with: " + "tag = [" + tag + "]"); | |
} | |
Disposable existCall = mRunningRequest.get(tag); | |
if (existCall != null && !existCall.isDisposed()) { | |
existCall.dispose(); | |
onCancel(tag); | |
} | |
} | |
@Override | |
public void onStart(Object tag) { | |
mProgress.put(tag, 0f); | |
for (IDownloadListener iDownloadListener : mIDownloadListeners) { | |
iDownloadListener.onStart(tag); | |
} | |
} | |
@Override | |
public void onCancel(Object tag) { | |
mProgress.remove(tag); | |
mRunningRequest.remove(tag); | |
for (IDownloadListener iDownloadListener : mIDownloadListeners) { | |
iDownloadListener.onCancel(tag); | |
} | |
} | |
public void onProgress(Object tag, float progress) { | |
for (IDownloadListener iDownloadListener : mIDownloadListeners) { | |
iDownloadListener.onProgress(tag, progress); | |
} | |
} | |
@Override | |
public void onFailed(Object tag) { | |
mProgress.remove(tag); | |
mRunningRequest.remove(tag); | |
for (IDownloadListener iDownloadListener : mIDownloadListeners) { | |
iDownloadListener.onFailed(tag); | |
} | |
} | |
@Override | |
public void onSuccess(Object tag) { | |
mProgress.remove(tag); | |
mRunningRequest.remove(tag); | |
AppApplication.getGlobalEventBus().post(new CacheInvalidEvent()); | |
for (IDownloadListener iDownloadListener : mIDownloadListeners) { | |
iDownloadListener.onSuccess(tag); | |
} | |
} | |
/** | |
* @param tag 是否正在下载 | |
*/ | |
public synchronized boolean isInDownloadQueue(Object tag) { | |
Disposable existCall = mRunningRequest.get(tag); | |
if (existCall != null && !existCall.isDisposed()) { | |
return true; | |
} | |
return false; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Created by chensuilun on 2017/4/19. | |
*/ | |
public class DownloadRequest { | |
private final Object mTag; | |
//存储路径,例如:/storage/download/ | |
private final String mSavedPath; | |
//下载路径 | |
private final String mRemoteUrl; | |
//文件名,例如 file.png | |
private final String mName; | |
public DownloadRequest(Object tag, String remoteUrl, String savedPath, String name) { | |
mTag = tag; | |
mSavedPath = savedPath; | |
mRemoteUrl = remoteUrl; | |
mName = name; | |
} | |
public Object getTag() { | |
return mTag; | |
} | |
public String getSavedPath() { | |
return mSavedPath; | |
} | |
public String getRemoteUrl() { | |
return mRemoteUrl; | |
} | |
public String getName() { | |
return mName; | |
} | |
public String getFullFilePath() { | |
return mSavedPath + mName; | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o == null || getClass() != o.getClass()) return false; | |
DownloadRequest that = (DownloadRequest) o; | |
if (mTag != null ? !mTag.equals(that.mTag) : that.mTag != null) return false; | |
if (mSavedPath != null ? !mSavedPath.equals(that.mSavedPath) : that.mSavedPath != null) | |
return false; | |
if (mRemoteUrl != null ? !mRemoteUrl.equals(that.mRemoteUrl) : that.mRemoteUrl != null) | |
return false; | |
return mName != null ? mName.equals(that.mName) : that.mName == null; | |
} | |
@Override | |
public int hashCode() { | |
int result = mTag != null ? mTag.hashCode() : 0; | |
result = 31 * result + (mSavedPath != null ? mSavedPath.hashCode() : 0); | |
result = 31 * result + (mRemoteUrl != null ? mRemoteUrl.hashCode() : 0); | |
result = 31 * result + (mName != null ? mName.hashCode() : 0); | |
return result; | |
} | |
@Override | |
public String toString() { | |
return "DownloadRequest{" + | |
"mTag=" + mTag + | |
", mSavedPath='" + mSavedPath + '\'' + | |
", mRemoteUrl='" + mRemoteUrl + '\'' + | |
", mName='" + mName + '\'' + | |
'}'; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* 多线程下载 | |
* Created by chensuilun on 2017/12/7. | |
*/ | |
public class FileDownloadObservable extends Observable<Float> { | |
private static final String TAG = "FileDownloadObservable"; | |
public static final int DEFAULT_THREAD_COUNT = 3; | |
private DownloadRequest mDownloadRequest; | |
private int mThreadCount; | |
private DownloadDispose mDownloadTask; | |
public FileDownloadObservable(@NonNull DownloadRequest downloadRequest) { | |
this(downloadRequest, DEFAULT_THREAD_COUNT); | |
} | |
public FileDownloadObservable(@NonNull DownloadRequest request, int threadCount) { | |
mDownloadRequest = request; | |
mThreadCount = Math.max(1, threadCount); | |
} | |
@Override | |
protected void subscribeActual(Observer<? super Float> observer) { | |
if (mDownloadTask != null) { | |
Log.e(TAG, "FileDownloadObservable has been subscribed !!!"); | |
return; | |
} | |
//只能注册一次,注册后开始下载 | |
mDownloadTask = new DownloadDispose(observer, mDownloadRequest, mThreadCount); | |
mDownloadTask.startDownload(); | |
} | |
/** | |
* 下载任务 | |
*/ | |
static final class DownloadDispose extends ThreadSafeDispose { | |
public static final String FIELD_PROGRESS = "thread_progress"; | |
public static final String FIELD_FILE_LENGTH = "fileLength"; | |
public static final String CONFIG_SUFFIX = ".download"; | |
public static final String TEMP_SUFFIX = ".temp"; | |
private DownloadRequest mRequest; | |
private int mThreadCount; | |
private Observer<? super Float> mObserver; | |
private FileDownloadService mDownloadService; | |
private SharedPreferences mPreferences;//存储下载进度 | |
/** | |
* 记录正在加载的任务 | |
*/ | |
private CompositeDisposable mDisposable = new CompositeDisposable(); | |
private FlowableProcessor<Integer> mCompleteProcessor; | |
public DownloadDispose(Observer<? super Float> observer, DownloadRequest downloadRequest, int threadCount) { | |
mObserver = observer; | |
mRequest = downloadRequest; | |
mThreadCount = threadCount; | |
mObserver = observer; | |
mPreferences = SpUtils.getSharedPreferences(mRequest.getName() + CONFIG_SUFFIX); | |
mDownloadService = RetrofitUtil.createService(FileDownloadService.class); | |
mCompleteProcessor = PublishProcessor.<Integer>create().toSerialized(); | |
} | |
public void startDownload() { | |
mObserver.onSubscribe(this); | |
File file = new File(mRequest.getSavedPath(), mRequest.getName()); | |
if (file.exists()) { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "file already exist!"); | |
} | |
mObserver.onNext(1.f); | |
mObserver.onComplete(); | |
return; | |
} | |
onPreStart(); | |
mDisposable.add(mDownloadService | |
.getContentLengthRx(mRequest.getRemoteUrl()) | |
.subscribeOn(Schedulers.io()) | |
.subscribe(new Consumer<ResponseBody>() { | |
@Override | |
public void accept(final ResponseBody response) throws Exception { | |
AppThread.assertWorkerThread("GET-FILE-LENGTH"); | |
final long fileLength = response.contentLength(); | |
if (!isDisposed()) { | |
if (fileLength >= 0) { | |
final File temp = new File(mRequest.getSavedPath(), mRequest.getName() + TEMP_SUFFIX); | |
if (temp.getParentFile().exists() || (!temp.getParentFile().exists() && temp.getParentFile().mkdirs())) { | |
if (!temp.exists()) { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, temp.getName() + " 临时文件不存在,清除进度缓存"); | |
} | |
deleteCacheSp(); | |
} | |
final RandomAccessFile tmpDownloadFile = new RandomAccessFile(temp, "rw"); | |
tmpDownloadFile.setLength(fileLength); | |
close(tmpDownloadFile); | |
mPreferences.edit().putLong(FIELD_FILE_LENGTH, fileLength).commit(); | |
final long blockSize = fileLength / mThreadCount; | |
mCompleteProcessor | |
.take(mThreadCount) | |
.observeOn(AndroidSchedulers.mainThread()) | |
.doOnComplete(new Action() { | |
@Override | |
public void run() throws Exception { | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "run:download completed:" + mRequest.getRemoteUrl()); | |
} | |
if (temp.exists()) { | |
temp.renameTo(new File(mRequest.getSavedPath(), mRequest.getName())); | |
deleteCacheSp(); | |
mObserver.onComplete(); | |
} else { | |
mObserver.onError(new DownloadException("Unknown Error")); | |
} | |
} | |
}).subscribe(); | |
for (int threadId = 0; threadId < mThreadCount; threadId++) { | |
final long startIndex = threadId * blockSize; | |
long endIndex = -1; | |
if (threadId == (mThreadCount - 1)) { | |
endIndex = fileLength - 1; | |
} else { | |
endIndex = (threadId + 1) * blockSize - 1; | |
} | |
final long fixEndIndex = endIndex; | |
final int id = threadId; | |
final String threadKey = FIELD_PROGRESS + id; | |
//上次加载的进度 | |
final long fixStartIndex = startIndex + mPreferences.getLong(threadKey, 0); | |
if (fixStartIndex >= fixEndIndex) { | |
if (!isDisposed()) { | |
Log.d(TAG, "download thread: " + id + " end(suc)!!!"); | |
mCompleteProcessor.onNext(id); | |
} | |
continue; | |
} | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "download start,id:" + id + ",startIndex:" + fixStartIndex + ",fixEndIndex:" + fixEndIndex + ",total:" + fileLength + ",url:" + mRequest.getRemoteUrl()); | |
} | |
//线程下载文件 | |
final RandomAccessFile threadTmpDownloadFile = new RandomAccessFile(temp, "rw"); | |
mDisposable.add(mDownloadService.downloadWithRangeRx(mRequest.getRemoteUrl(), buildRange(startIndex, fixEndIndex)) | |
.subscribeOn(Schedulers.io()) | |
.subscribe(new Consumer<ResponseBody>() { | |
@Override | |
public void accept(ResponseBody responseBody) throws Exception { | |
AppThread.assertWorkerThread("DOWNLOAD_THREAD_SEG"); | |
if (!isDisposed()) { | |
if (responseBody.contentLength() >= 0) { | |
threadTmpDownloadFile.seek(fixStartIndex); | |
InputStream is = responseBody.byteStream();// 获取流 | |
byte[] buffer = new byte[4086]; | |
int length = -1; | |
int total = 0;// 记录本次下载文件的大小 | |
long progress = 0; | |
SharedPreferences.Editor editor = mPreferences.edit(); | |
while (!isDisposed() && (length = is.read(buffer)) > 0) { | |
threadTmpDownloadFile.write(buffer, 0, length); | |
total += length; | |
progress = fixStartIndex + total - startIndex; | |
//记录进度 | |
editor.putLong(threadKey, progress); | |
editor.commit(); | |
updateProgress(); | |
} | |
if (BuildConfig.DEBUG) { | |
Log.d(TAG, "download thread: " + id + " end(suc or not)!!!"); | |
} | |
close(is, response); | |
} else { | |
throw new IllegalStateException("Invalid content length for " + mRequest.getRemoteUrl() + " of segment : " + id); | |
} | |
} | |
} | |
}, new Consumer<Throwable>() { | |
@Override | |
public void accept(final Throwable throwable) throws Exception { | |
throwable.printStackTrace(); | |
close(threadTmpDownloadFile); | |
if (!isDisposed()) { | |
//一个线程出错,全部都要停止了 | |
dispose(); | |
mObserver.onError(throwable); | |
} | |
} | |
}, new Action() { | |
@Override | |
public void run() throws Exception { | |
close(threadTmpDownloadFile); | |
if (!isDisposed()) { | |
mCompleteProcessor.onNext(id); | |
} | |
} | |
}) | |
); | |
} | |
} else { | |
throw new DownloadException("Cannot create temp file for " + mRequest.getRemoteUrl()); | |
} | |
} else { | |
throw new DownloadException("Invalid content length for " + mRequest.getRemoteUrl()); | |
} | |
} | |
response.close(); | |
} | |
} | |
, new Consumer<Throwable>() { | |
@Override | |
public void accept(Throwable throwable) throws Exception { | |
throwable.printStackTrace(); | |
if (!isDisposed()) { | |
mObserver.onError(throwable); | |
} | |
} | |
})); | |
} | |
/** | |
* 关闭资源 | |
* | |
* @param closeables | |
*/ | |
private void close(Closeable... closeables) { | |
if (closeables == null) { | |
return; | |
} | |
int length = closeables.length; | |
try { | |
for (int i = 0; i < length; i++) { | |
Closeable closeable = closeables[i]; | |
if (null != closeable) | |
closeables[i].close(); | |
} | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} finally { | |
for (int i = 0; i < length; i++) { | |
closeables[i] = null; | |
} | |
} | |
} | |
/** | |
* 删除临时文件 | |
*/ | |
private void cleanFile(File... files) { | |
if (files == null) { | |
return; | |
} | |
for (int i = 0, length = files.length; i < length; i++) { | |
if (null != files[i]) | |
files[i].delete(); | |
} | |
} | |
/** | |
* 删除保存进度用的SharedPreferences. | |
*/ | |
void deleteCacheSp() { | |
//清空文件 | |
if (mPreferences != null) { | |
mPreferences.edit().clear().apply(); | |
} | |
//删除文件 | |
File file = new File("/data/data/" + BuildConfig.APPLICATION_ID + "/shared_prefs", mRequest.getName() + CONFIG_SUFFIX + ".xml"); | |
if (file.exists()) { | |
file.delete(); | |
} | |
} | |
/** | |
* 准备开始之前先进行缓存进度的回调 | |
*/ | |
private void onPreStart() { | |
updateProgress(); | |
} | |
private void updateProgress() { | |
if (!isDisposed()) { | |
long sum = 0; | |
for (int i = 0; i < mThreadCount; i++) { | |
long progress = mPreferences.getLong(FIELD_PROGRESS + i, 0); | |
sum += progress; | |
} | |
long fileLength = mPreferences.getLong(FIELD_FILE_LENGTH, 0); | |
mObserver.onNext(sum * 1.0f / fileLength); | |
} | |
} | |
@Override | |
protected void onDispose() { | |
Log.e(TAG, "onDispose: 下载取消"); | |
mDisposable.dispose(); | |
} | |
private String buildRange(long start, long end) { | |
return "bytes=" + start + "-" + end; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* 动态文件下载服务 for retrofit | |
* tutorials:https://futurestud.io/tutorials/retrofit-2-how-to-download-files-from-server | |
* Created by chensuilun on 2017/9/26. | |
*/ | |
public interface FileDownloadService { | |
@Streaming | |
@GET | |
Flowable<ResponseBody> downloadFileWithRx(@Url String fileUrl); | |
@Streaming | |
@GET | |
Observable<ResponseBody> getContentLengthRx(@Url String fileUrl); | |
@Streaming | |
@GET | |
Observable<ResponseBody> downloadWithRangeRx(@Url String fileUrl, @Header("Range") String range); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment