Skip to content

Instantly share code, notes, and snippets.

@paramsen
Last active January 11, 2018 12:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save paramsen/d779e56c034af395051951aabbbb968c to your computer and use it in GitHub Desktop.
Save paramsen/d779e56c034af395051951aabbbb968c to your computer and use it in GitHub Desktop.
Concurrent queue workers (download stuff synchronized in parallel)
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicBoolean;
// MIT License
//
// Copyright(c) 2017 Pär Amsen
//
// Permission is hereby granted,free of charge,to any person obtaining a copy
// of this software and associated documentation files(the"Software"),to deal
// in the Software without restriction,including without limitation the rights
// to use,copy,modify,merge,publish,distribute,sublicense,and/or sell
// copies of the Software,and to permit persons to whom the Software is
// furnished to do so,subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED"AS IS",WITHOUT WARRANTY OF ANY KIND,EXPRESS OR
// IMPLIED,INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,DAMAGES OR OTHER
// LIABILITY,WHETHER IN AN ACTION OF CONTRACT,TORT OR OTHERWISE,ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
/**
* Concurrently download from predefined list of urls on a predefined amount of worker threads.
* Workers execute requests until the Stack is depleted in a synchronized manor, since Stack
* is synchronized in Java.
*
* Does not care about IO errors and/or pause/resume for now
*
* @author Pär Amsen 05/2017
*/
public class ConcurrentRequestSource {
public static final int WORKERS = 4;
private AtomicBoolean done;
private File imagesDir;
private Stack<String> urls;
public ConcurrentRequestSource(File imagesDir, List<String> urls) {
this.imagesDir = imagesDir;
this.urls = new Stack<>();
this.urls.addAll(urls);
done = new AtomicBoolean(false);
}
/**
* @param callback is a callback that will be called with each work result when available
*/
public void download(Callback callback) {
for(Thread worker : newWorkers(callback)) {
worker.start();
}
}
private List<Thread> newWorkers(Callback callback) {
ArrayList<Thread> workers = new ArrayList<>();
for (int i = 0; i < WORKERS; i++) {
workers.add(new Thread(newWorker(callback), "ConcurrentRequestSourceWorker-" + i));
}
return workers;
}
private Runnable newWorker(Callback callback) {
return new Runnable() {
@Override
public void run() {
while (!urls.empty()) {
callback.onEach(ConcurrentRequestSource.this.doDownload(urls.pop()));
}
if(done.compareAndSet(false, true)) {
callback.onDone(imagesDir);
}
System.out.println(String.format("Thread %s done", Thread.currentThread().getName()));
}
};
}
/**
* Download and persist file on disk
*
* @return File pointing to downloaded file on disk
*/
private File doDownload(String url) {
try {
Thread.sleep(10); //mock request time
} catch (InterruptedException e) {
//ignore
}
System.out.printf("Downloaded %s on thread: %s%n", url, Thread.currentThread().getName());
return null; // file pointing to downloaded image on disk
}
interface Callback {
void onEach(File image);
void onDone(File imagesDir);
}
}
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.*;
/**
* @author Pär Amsen 05/2017
*/
public class ConcurrentRequestSourceTest {
@Test
public void download() throws Exception {
List<File> results = new ArrayList<>();
final int[] done = {0};
List<String> urls = new ArrayList<>();
for (int i = 0; i < 2000; i++) {
urls.add(String.valueOf(2000 - i - 1));
}
new ConcurrentRequestSource(null, urls).download(new ConcurrentRequestSource.Callback() {
@Override
public void onEach(File image) {
results.add(image);
}
@Override
public void onDone(File imagesDir) {
done[0]++;
}
});
while (results.size() < 2000) {
Thread.sleep(10);
}
assertEquals(2000, results.size());
assertEquals(1, done[0]);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment