Skip to content

Instantly share code, notes, and snippets.

@vinothchandar
Created June 4, 2018 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vinothchandar/7cd380a213653b0ad80b973260d0632f to your computer and use it in GitHub Desktop.
Save vinothchandar/7cd380a213653b0ad80b973260d0632f to your computer and use it in GitHub Desktop.
Hoodie MicroBenchmark
import com.uber.hoodie.common.model.HoodieLogFile;
import com.uber.hoodie.common.table.log.HoodieLogFileReader;
import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
import com.uber.hoodie.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
import com.uber.hoodie.common.util.FSUtils;
import com.uber.hoodie.common.util.ParquetUtils;
import com.uber.hoodie.exception.HoodieIOException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
public class HoodieMicroBench {
static final Path logPath = new Path(
"file:///home/vinoth/hdd/cache/hoodie-bench/nyc-taxi-data-hoodie-gen/green/.1f7286f4-bad2-4db0-871e-358a777fb22d_001.log.1");
static final Path parquetPath = new Path(
"file:///home/vinoth/hdd/cache/hoodie-bench/nyc-taxi-data-hoodie-gen/green/1f7286f4-bad2-4db0-871e-358a777fb22d_0_001.parquet");
public static void timeHoodieParquetRecordRead() throws Exception {
long startMs = System.currentTimeMillis();
ParquetReader reader = null;
long totalRecords = 0;
try {
reader = AvroParquetReader.builder(parquetPath).withConf(new Configuration()).build();
Object obj = reader.read();
while (obj != null) {
if (obj instanceof GenericRecord) {
totalRecords++;
}
obj = reader.read();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to read avro records from Parquet " + parquetPath, e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
// ignore
}
}
}
System.out.println(
"ParquetRead (Full record): Read " + totalRecords + " records in " + (
System.currentTimeMillis()
- startMs) + " ms");
}
public static void timeHoodieParquetKeyRead() throws Exception {
long startMs = System.currentTimeMillis();
Set<String> filter = new HashSet<>();
filter.add("dummy_key");
Set<String> rowKeys = ParquetUtils
.filterParquetRowKeys(new Configuration(), parquetPath, filter);
System.out.println(
"ParquetRead (Key): Read " + rowKeys.size() + " keys in " + (System.currentTimeMillis()
- startMs) + " ms");
}
public static void timeHoodieLogFileReader() throws Exception {
Schema schema = ParquetUtils.readAvroSchema(new Configuration(), parquetPath);
HoodieLogFile logFile = new HoodieLogFile(logPath);
long startMs = System.currentTimeMillis();
HoodieLogFileReader logFileReader = new HoodieLogFileReader(
FSUtils.getFs(logPath.toString(), new Configuration()), logFile, schema, 1024 * 1024, true,
false);
long totalBlocks = 0;
List<HoodieAvroDataBlock> dataBlocks = new ArrayList<>();
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
totalBlocks += 1;
if (logBlock.getBlockType().equals(HoodieLogBlockType.AVRO_DATA_BLOCK)) {
dataBlocks.add((HoodieAvroDataBlock) logBlock);
}
}
System.out.println(
"HoodieLogFileReader: Read " + totalBlocks + " block metadata in " + (
System.currentTimeMillis() - startMs)
+ " ms");
startMs = System.currentTimeMillis();
long totalRecords = 0;
Iterator<HoodieAvroDataBlock> dataBlockIterator = dataBlocks.iterator();
while (dataBlockIterator.hasNext()) {
HoodieAvroDataBlock dataBlock = dataBlockIterator.next();
long blockReadStartMs = System.currentTimeMillis();
for (IndexedRecord record : dataBlock.getRecords()) {
totalRecords += 1;
}
//System.out.println("Read block in " + (System.currentTimeMillis() - blockReadStartMs));
dataBlockIterator.remove();
}
System.out.println(
"HoodieLogFileReader: Read " + totalRecords + " avro records in " + (
System.currentTimeMillis() - startMs)
+ " ms");
}
public static void timeLogBytesRead() throws Exception {
long startMs = System.currentTimeMillis();
FileSystem fs = FSUtils.getFs(logPath.toString(), new Configuration());
final int bufferSize = 1 * 1024 * 1024;
byte[] buffer = new byte[bufferSize];
FSDataInputStream inputStream = new FSDataInputStream(
new BufferedFSInputStream(
(FSInputStream) fs.open(logPath, bufferSize).getWrappedStream(),
bufferSize));
long totalBytes = 0;
while (true) {
int read = inputStream.read(buffer);
totalBytes += read;
if (read < buffer.length) {
break;
}
}
System.out.println(
"RawLogBytesRead: Read " + totalBytes + " bytes in " + (System.currentTimeMillis()
- startMs) + " ms");
}
public static void main(String[] args) throws Exception {
//timeLogBytesRead();
timeHoodieLogFileReader();
//timeHoodieParquetKeyRead();
//timeHoodieParquetRecordRead();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment