Skip to content

Instantly share code, notes, and snippets.

@hrchu
Last active October 4, 2016 13:09
Show Gist options
  • Save hrchu/462398bd0bc8428bd0d03c18cf45c2a8 to your computer and use it in GitHub Desktop.
Save hrchu/462398bd0bc8428bd0d03c18cf45c2a8 to your computer and use it in GitHub Desktop.
// import org.apache.commons.io.output.CountingOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Benchmark different strategies to upload file to multiple destinations
*
* Created by developer on 9/29/16.
*/
public class TestMultiWrite {
private static final int CHUNK_LENGTH = 4096;
static byte[] bytes;
static int testTimes = 1;
static String dataNodePath1 = "http://ip1:7500/dev107/";
static String dataNodePath2 = "http://ip2:7500/dev207/";
static int testKB = 128;
public static void main(String[] args) throws IOException {
try {
testKB = Integer.parseInt(System.getProperty("test.kb"));
} catch (Exception e) {
}
bytes = new byte[testKB * 1024];
try {
testTimes = Integer.parseInt(System.getProperty("test.times"));
} catch (Exception e) {
}
try {
String tmp = System.getProperty("test.dataNode1");
if (tmp != null && tmp.length() > 0) {
dataNodePath1 = tmp;
}
} catch (Exception e) {
}
try {
String tmp = System.getProperty("test.dataNode2");
if (tmp != null && tmp.length() > 0) {
dataNodePath2 = tmp;
}
} catch (Exception e) {
}
System.out.printf(
"Test config: testKB: %d KB, testTimes: %s, dataNodePath1: %s, dataNodePath2: %s\n", testKB,
testTimes, dataNodePath1, dataNodePath2);
TestMultiWrite testMultiWrite = new TestMultiWrite();
Arrays.fill(bytes, (byte) 1);
testMultiWrite.benchmarkTest("One Write", testMultiWrite::testOneWrite);
testMultiWrite.benchmarkTest("One thread", testMultiWrite::testOneThread);
testMultiWrite.benchmarkTest("Two thread", testMultiWrite::testTwoThread);
testMultiWrite.benchmarkTest("One Write", testMultiWrite::testOneWrite);
}
private void testOneWrite() {
try {
InputStream is = new ByteArrayInputStream(bytes);
OutputStream os1 = genPutOutputStream(dataNodePath1);
writeByOneThread(is, os1);
} catch (IOException e) {
e.printStackTrace();
}
}
public void benchmarkTest(String name, Runnable runnable) {
long start = System.nanoTime();
int runs = testTimes;
for (int i = 0; i < runs; i++) {
try {
Thread.sleep(5000); // prevent side effect of disk buffer flushing
} catch (InterruptedException e) {
e.printStackTrace();
}
runnable.run();
}
long time = System.nanoTime() - start;
System.out.printf("Time for single task to complete in {%s} %.1f ms%n", name,
(time / 1000.0 / 1000.0) / testTimes - 5000);
}
public void testOneThread() {
try {
InputStream is = new ByteArrayInputStream(bytes);
OutputStream os1 = genPutOutputStream(dataNodePath1);
OutputStream os2 = genPutOutputStream(dataNodePath2);
writeByOneThread(is, os1, os2);
} catch (IOException e) {
e.printStackTrace();
}
}
public void testTwoThread() {
try {
InputStream is = new ByteArrayInputStream(bytes);
OutputStream os1 = genPutOutputStream(dataNodePath1);
OutputStream os2 = genPutOutputStream(dataNodePath2);
writeByTwoThread(is, os1, os2);
} catch (IOException e) {
e.printStackTrace();
}
}
static String filePrefix = "qq";
private OutputStream genPutOutputStream(String dataNodePath) throws IOException {
HttpURLConnection httpConnection =
newConnection(new URL(dataNodePath + filePrefix + UUID.randomUUID().toString()));
httpConnection.setRequestMethod("PUT");
httpConnection.setChunkedStreamingMode(CHUNK_LENGTH);
httpConnection.setDoOutput(true);
httpConnection.setRequestProperty("Connection", "close");
// return new CountingOutputStream(httpConnection.getOutputStream());
return httpConnection.getOutputStream();
}
HttpURLConnection newConnection(URL url) throws IOException {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
return connection;
}
public void writeByOneThread(InputStream is, OutputStream os1, OutputStream os2)
throws IOException {
int read;
byte[] bytes = new byte[8192];
try (OutputStream _os1 = os1; OutputStream _os2 = os2) {
while ((read = is.read(bytes)) != -1) {
_os1.write(bytes, 0, read);
_os2.write(bytes, 0, read);
}
_os1.flush();
_os2.flush();
_os1.close();
_os2.close();
} catch (IOException e) {
throw e;
}
}
public void writeByOneThread(InputStream is, OutputStream os1) throws IOException {
int read;
byte[] bytes = new byte[8192];
try (OutputStream _os1 = os1) {
while ((read = is.read(bytes)) != -1) {
_os1.write(bytes, 0, read);
}
_os1.flush();
_os1.close();
} catch (IOException e) {
throw e;
}
}
public void writeByTwoThread(InputStream is, OutputStream os1, OutputStream os2)
throws IOException {
BlockingQueue<Chunk> queue1 = new LinkedBlockingQueue<>(10);
BlockingQueue<Chunk> queue2 = new LinkedBlockingQueue<>(10);
new Task(queue1, os1).start();
new Task(queue2, os2).start();
int read;
byte[] bytes = new byte[8192];
try {
while (true) {
read = is.read(bytes);
Chunk chunk = new Chunk(bytes, read);
queue1.put(chunk);
queue2.put(chunk);
if (read == -1) {
break;
}
}
} catch (IOException e) {
throw e;
} catch (InterruptedException e) {
e.printStackTrace();
}
// FIXME: should return after child thread done
}
class Chunk {
final byte[] data;
final int length;
public Chunk(byte[] data, int length) {
this.data = data;
this.length = length;
}
}
class Task extends Thread {
BlockingQueue<Chunk> queue;
OutputStream os;
public Task(BlockingQueue queue, OutputStream os) {
this.queue = queue;
this.os = os;
}
@Override
public void run() {
while (true) {
try {
Chunk chunk = queue.take();
if (chunk.length == -1) {
os.flush();
os.close();
break;
}
os.write(chunk.data, 0, chunk.length);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
@hrchu
Copy link
Author

hrchu commented Oct 4, 2016

Usage: java -Dtest.kb=1024000 -Dtest.times=3 -Dtest.dataNode1=http://203.64.138.8:7500/dev107/ TestMultiWrite
Sample result:

Time for tasks to complete in {One Write} 33062.5 ms
Time for tasks to complete in {One thread} 70832.8 ms
Time for tasks to complete in {Two thread} 67078.0 ms

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment