Last active
October 4, 2016 13:09
-
-
Save hrchu/462398bd0bc8428bd0d03c18cf45c2a8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// import org.apache.commons.io.output.CountingOutputStream; | |
import java.io.ByteArrayInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.net.HttpURLConnection; | |
import java.net.URL; | |
import java.util.Arrays; | |
import java.util.UUID; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.LinkedBlockingQueue; | |
/** | |
* Benchmark different strategies to upload file to multiple destinations | |
* | |
* Created by developer on 9/29/16. | |
*/ | |
public class TestMultiWrite { | |
private static final int CHUNK_LENGTH = 4096; | |
static byte[] bytes; | |
static int testTimes = 1; | |
static String dataNodePath1 = "http://ip1:7500/dev107/"; | |
static String dataNodePath2 = "http://ip2:7500/dev207/"; | |
static int testKB = 128; | |
public static void main(String[] args) throws IOException { | |
try { | |
testKB = Integer.parseInt(System.getProperty("test.kb")); | |
} catch (Exception e) { | |
} | |
bytes = new byte[testKB * 1024]; | |
try { | |
testTimes = Integer.parseInt(System.getProperty("test.times")); | |
} catch (Exception e) { | |
} | |
try { | |
String tmp = System.getProperty("test.dataNode1"); | |
if (tmp != null && tmp.length() > 0) { | |
dataNodePath1 = tmp; | |
} | |
} catch (Exception e) { | |
} | |
try { | |
String tmp = System.getProperty("test.dataNode2"); | |
if (tmp != null && tmp.length() > 0) { | |
dataNodePath2 = tmp; | |
} | |
} catch (Exception e) { | |
} | |
System.out.printf( | |
"Test config: testKB: %d KB, testTimes: %s, dataNodePath1: %s, dataNodePath2: %s\n", testKB, | |
testTimes, dataNodePath1, dataNodePath2); | |
TestMultiWrite testMultiWrite = new TestMultiWrite(); | |
Arrays.fill(bytes, (byte) 1); | |
testMultiWrite.benchmarkTest("One Write", testMultiWrite::testOneWrite); | |
testMultiWrite.benchmarkTest("One thread", testMultiWrite::testOneThread); | |
testMultiWrite.benchmarkTest("Two thread", testMultiWrite::testTwoThread); | |
testMultiWrite.benchmarkTest("One Write", testMultiWrite::testOneWrite); | |
} | |
private void testOneWrite() { | |
try { | |
InputStream is = new ByteArrayInputStream(bytes); | |
OutputStream os1 = genPutOutputStream(dataNodePath1); | |
writeByOneThread(is, os1); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public void benchmarkTest(String name, Runnable runnable) { | |
long start = System.nanoTime(); | |
int runs = testTimes; | |
for (int i = 0; i < runs; i++) { | |
try { | |
Thread.sleep(5000); // prevent side effect of disk buffer flushing | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
runnable.run(); | |
} | |
long time = System.nanoTime() - start; | |
System.out.printf("Time for single task to complete in {%s} %.1f ms%n", name, | |
(time / 1000.0 / 1000.0) / testTimes - 5000); | |
} | |
public void testOneThread() { | |
try { | |
InputStream is = new ByteArrayInputStream(bytes); | |
OutputStream os1 = genPutOutputStream(dataNodePath1); | |
OutputStream os2 = genPutOutputStream(dataNodePath2); | |
writeByOneThread(is, os1, os2); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
public void testTwoThread() { | |
try { | |
InputStream is = new ByteArrayInputStream(bytes); | |
OutputStream os1 = genPutOutputStream(dataNodePath1); | |
OutputStream os2 = genPutOutputStream(dataNodePath2); | |
writeByTwoThread(is, os1, os2); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
static String filePrefix = "qq"; | |
private OutputStream genPutOutputStream(String dataNodePath) throws IOException { | |
HttpURLConnection httpConnection = | |
newConnection(new URL(dataNodePath + filePrefix + UUID.randomUUID().toString())); | |
httpConnection.setRequestMethod("PUT"); | |
httpConnection.setChunkedStreamingMode(CHUNK_LENGTH); | |
httpConnection.setDoOutput(true); | |
httpConnection.setRequestProperty("Connection", "close"); | |
// return new CountingOutputStream(httpConnection.getOutputStream()); | |
return httpConnection.getOutputStream(); | |
} | |
HttpURLConnection newConnection(URL url) throws IOException { | |
HttpURLConnection connection = (HttpURLConnection) url.openConnection(); | |
return connection; | |
} | |
public void writeByOneThread(InputStream is, OutputStream os1, OutputStream os2) | |
throws IOException { | |
int read; | |
byte[] bytes = new byte[8192]; | |
try (OutputStream _os1 = os1; OutputStream _os2 = os2) { | |
while ((read = is.read(bytes)) != -1) { | |
_os1.write(bytes, 0, read); | |
_os2.write(bytes, 0, read); | |
} | |
_os1.flush(); | |
_os2.flush(); | |
_os1.close(); | |
_os2.close(); | |
} catch (IOException e) { | |
throw e; | |
} | |
} | |
public void writeByOneThread(InputStream is, OutputStream os1) throws IOException { | |
int read; | |
byte[] bytes = new byte[8192]; | |
try (OutputStream _os1 = os1) { | |
while ((read = is.read(bytes)) != -1) { | |
_os1.write(bytes, 0, read); | |
} | |
_os1.flush(); | |
_os1.close(); | |
} catch (IOException e) { | |
throw e; | |
} | |
} | |
public void writeByTwoThread(InputStream is, OutputStream os1, OutputStream os2) | |
throws IOException { | |
BlockingQueue<Chunk> queue1 = new LinkedBlockingQueue<>(10); | |
BlockingQueue<Chunk> queue2 = new LinkedBlockingQueue<>(10); | |
new Task(queue1, os1).start(); | |
new Task(queue2, os2).start(); | |
int read; | |
byte[] bytes = new byte[8192]; | |
try { | |
while (true) { | |
read = is.read(bytes); | |
Chunk chunk = new Chunk(bytes, read); | |
queue1.put(chunk); | |
queue2.put(chunk); | |
if (read == -1) { | |
break; | |
} | |
} | |
} catch (IOException e) { | |
throw e; | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
// FIXME: should return after child thread done | |
} | |
class Chunk { | |
final byte[] data; | |
final int length; | |
public Chunk(byte[] data, int length) { | |
this.data = data; | |
this.length = length; | |
} | |
} | |
class Task extends Thread { | |
BlockingQueue<Chunk> queue; | |
OutputStream os; | |
public Task(BlockingQueue queue, OutputStream os) { | |
this.queue = queue; | |
this.os = os; | |
} | |
@Override | |
public void run() { | |
while (true) { | |
try { | |
Chunk chunk = queue.take(); | |
if (chunk.length == -1) { | |
os.flush(); | |
os.close(); | |
break; | |
} | |
os.write(chunk.data, 0, chunk.length); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Usage: java -Dtest.kb=1024000 -Dtest.times=3 -Dtest.dataNode1=http://203.64.138.8:7500/dev107/ TestMultiWrite
Sample result: