Skip to content

Instantly share code, notes, and snippets.

@Miha-x64
Created May 6, 2020 11:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save Miha-x64/e2fe6b7a0e7810c65d04bfa347b859d1 to your computer and use it in GitHub Desktop.
Save Miha-x64/e2fe6b7a0e7810c65d04bfa347b859d1 to your computer and use it in GitHub Desktop.
import android.os.Handler;
import android.os.Looper;
import androidx.annotation.WorkerThread;
import java.io.File;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* original: https://gist.github.com/techyourchance/6602815188294c1c58779d3e8d16f12b
*/
abstract class BaseObservable<LISTENER_CLASS> {
private final Object MONITOR = new Object();
private final Set<LISTENER_CLASS> mListeners = new HashSet<>();
public void registerListener(LISTENER_CLASS listener) {
synchronized (MONITOR) {
boolean hadNoListeners = mListeners.size() == 0;
mListeners.add(listener);
if (hadNoListeners && mListeners.size() == 1) {
onFirstListenerRegistered();
}
}
}
public void unregisterListener(LISTENER_CLASS listener) {
synchronized (MONITOR) {
boolean hadOneListener = mListeners.size() == 1;
mListeners.remove(listener);
if (hadOneListener && mListeners.size() == 0) {
onLastListenerUnregistered();
}
}
}
protected Set<LISTENER_CLASS> getListeners() {
synchronized (MONITOR) {
return Collections.unmodifiableSet(new HashSet<>(mListeners));
}
}
protected void onFirstListenerRegistered() {
}
protected void onLastListenerUnregistered() {
}
}
/**
* original: https://gist.github.com/techyourchance/44670734917d4ce085224a62cb9edf81
*/
abstract class BaseBusyObservable<LISTENER_CLASS> extends BaseObservable<LISTENER_CLASS> {
private final AtomicBoolean mIsBusy = new AtomicBoolean(false);
public final boolean isBusy() {
return mIsBusy.get();
}
protected final void assertNotBusyAndBecomeBusy() {
if (!mIsBusy.compareAndSet(false, true)) {
throw new IllegalStateException("assertion violation: mustn't be busy");
}
}
protected final boolean isFreeAndBecomeBusy() {
return mIsBusy.compareAndSet(false, true);
}
protected final void becomeNotBusy() {
mIsBusy.set(false);
}
}
class OperationFailedException extends Exception {
public OperationFailedException() { super(); }
public OperationFailedException(String message) { super(message); }
}
class HttpManager {
public static final HttpManager getInstance = new HttpManager();
private static final Executor HTTP_EXECUTOR = new ThreadPoolExecutor(0, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>());
private static final Executor IN_PLACE_EXECUTOR = Runnable::run;
// hello, I'm both callback-based and blocking API.
public Future<Response> uploadFiles(File file, Executor listenerExecutor, HttpRequestListener listener) {
FutureTask<Response> task = new FutureTask<Response>(() -> {
// TODO use file
return new Response(200, new byte[0]);
}) {
@Override protected void done() {
if (listener != null) {
(listenerExecutor == null ? IN_PLACE_EXECUTOR : listenerExecutor).execute(() -> {
try {
Response response = get();
listener.onDone(response.code, response.body);
} catch (ExecutionException | InterruptedException e) {
listener.onFailure();
}
});
}
}
};
HTTP_EXECUTOR.execute(task);
return task;
}
public static final class Response {
public final int code;
public final byte[] body;
public Response(int code, byte[] body) {
this.code = code;
this.body = body;
}
}
}
interface HttpRequestListener {
void onDone(int code, byte[] body);
void onFailure();
}
class UploadFilesUseCase extends BaseBusyObservable<UploadFilesUseCase.Listener> {
public interface Listener {
void onFilesUploaded();
void onFilesUploadFailed();
}
private final static int MAX_RETRIES = 3;
private final Handler uiHandler = new Handler(Looper.getMainLooper());
public void uploadFiles() {
if (!isFreeAndBecomeBusy()) {
// log concurrent invocation attempt
return;
}
new Thread(() -> uploadFilesSync(0)).start();
}
@WorkerThread
private void uploadFilesSync(int retryCount) {
// you'd better create executors for different load types: network IO, file IO, etc
ExecutorService executor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
final Future<File> mergedA = executor.submit(this::processAndMergeFilesOfTypeA);
final Future<File> mergedB = executor.submit(this::processAndMergeFilesOfTypeA);
executor.shutdown();
try {
uploadFileToServer(compressMergedFiles(mergedA.get(), mergedB.get()));
deleteTempDir();
notifySuccess();
} catch (ExecutionException|InterruptedException|OperationFailedException e) {
retryOrFail(retryCount);
}
}
@WorkerThread
private void uploadFileToServer(File archive) throws ExecutionException {
try {
HttpManager.Response response = HttpManager.getInstance.uploadFiles(archive, null, null).get();
if (response.code / 100 != 2) {
throw new ExecutionException("HTTP " + response.code, null);
}
} catch (InterruptedException e) {
throw new RuntimeException("unexpected interrupt");
}
}
@WorkerThread
private void retryOrFail(int currentRetryCount) {
deleteTempDir();
if (currentRetryCount >= MAX_RETRIES - 1) {
notifyFailure();
} else {
uploadFilesSync(currentRetryCount + 1);
}
}
@WorkerThread
private File processAndMergeFilesOfTypeA() throws OperationFailedException {
throw new UnsupportedOperationException();
}
@WorkerThread
private File processAndMergeFilesOfTypeB() throws OperationFailedException {
throw new UnsupportedOperationException();
}
@WorkerThread
private File compressMergedFiles(File fileA, File fileB) throws OperationFailedException {
throw new UnsupportedOperationException();
}
@WorkerThread
private void deleteTempDir() {
throw new UnsupportedOperationException();
}
private void notifySuccess() {
uiHandler.post(() -> {
for (Listener listener : getListeners()) {
listener.onFilesUploaded();
}
becomeNotBusy();
});
}
private void notifyFailure() {
uiHandler.post(() -> {
for (Listener listener : getListeners()) {
listener.onFilesUploadFailed();
}
becomeNotBusy();
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment