Created
January 22, 2019 20:05
-
-
Save staticor/7094942f28acd3c82f5be5aa7f5f1e0c to your computer and use it in GitHub Desktop.
Hive Input Split 相关的代码
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { | |
//扫描每一个分区 | |
for (Path dir : dirs) { | |
PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir); | |
//获取分区的输入格式 | |
Class inputFormatClass = part.getInputFileFormatClass(); | |
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); | |
//按照相应格式的分片算法获取分片 | |
//注意:这里的Inputformat只是old version API:org.apache.hadoop.mapred而不是org.apache.hadoop.mapreduce,因此不能采用新的API,否则在查询时会报异常:Input format must implement InputFormat.区别就是新的API的计算inputsplit size(Math.max(minSize, Math.min(maxSize, blockSize))和老的(Math.max(minSize, Math.min(goalSize, blockSize)))不一样; | |
InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length); | |
for (InputSplit is : iss) { | |
//封装结果,返回 | |
result.add(new HiveInputSplit(is, inputFormatClass.getName())); | |
} | |
} | |
return result.toArray(new HiveInputSplit[result.size()]); | |
} | |
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { | |
//加载CombineFileInputFormatShim,这个类继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat | |
CombineFileInputFormatShim combine = ShimLoader.getHadoopShims() | |
.getCombineFileInputFormat(); | |
if (combine == null) { | |
//若为空则采用HiveInputFormat的方式,下同 | |
return super.getSplits(job, numSplits); | |
} | |
Path[] paths = combine.getInputPathsShim(job); | |
for (Path path : paths) { | |
//若是外部表,则按照HiveInputFormat方式分片 | |
if ((tableDesc != null) && tableDesc.isNonNative()) { | |
return super.getSplits(job, numSplits); | |
} | |
Class inputFormatClass = part.getInputFileFormatClass(); | |
String inputFormatClassName = inputFormatClass.getName(); | |
InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job); | |
if (this.mrwork != null && !this.mrwork.getHadoopSupportsSplittable()) { | |
if (inputFormat instanceof TextInputFormat) { | |
if ((new CompressionCodecFactory(job)).getCodec(path) != null) | |
//在未开启hive.hadoop.supports.splittable.combineinputformat(MAPREDUCE-1597)参数情况下,对于TextInputFormat并且为压缩则采用HiveInputFormat分片算法 | |
return super.getSplits(job, numSplits); | |
} | |
} | |
//对于连接式同上 | |
if (inputFormat instanceof SymlinkTextInputFormat) { | |
return super.getSplits(job, numSplits); | |
} | |
CombineFilter f = null; | |
boolean done = false; | |
Path filterPath = path; | |
//由参数hive.mapper.cannot.span.multiple.partitions控制,默认false;如果没true,则对每一个partition创建一个pool,以下省略为true的处理;对于同一个表的同一个文件格式的split创建一个pool为combine做准备; | |
if (!mrwork.isMapperCannotSpanPartns()) { | |
opList = HiveFileFormatUtils.doGetWorksFromPath( | |
pathToAliases, aliasToWork, filterPath); | |
f = poolMap.get(new CombinePathInputFormat(opList, inputFormatClassName)); | |
} | |
if (!done) { | |
if (f == null) { | |
f = new CombineFilter(filterPath); | |
combine.createPool(job, f); | |
} else { | |
f.addPath(filterPath); | |
} | |
} | |
} | |
if (!mrwork.isMapperCannotSpanPartns()) { | |
//到这里才调用combine的分片算法,继承了org.apache.hadoop.mapred.lib.CombineFileInputFormat extends 新版本CombineFileInputformat | |
iss = Arrays.asList(combine.getSplits(job, 1)); | |
} | |
//对于sample查询特殊处理 | |
if (mrwork.getNameToSplitSample() != null && !mrwork.getNameToSplitSample().isEmpty()) { | |
iss = sampleSplits(iss); | |
} | |
//封装结果返回 | |
for (InputSplitShim is : iss) { | |
CombineHiveInputSplit csplit = new CombineHiveInputSplit(job, is); | |
result.add(csplit); | |
} | |
return result.toArray(new CombineHiveInputSplit[result.size()]); | |
} | |
public List<InputSplit> getSplits(JobContext job) | |
throws IOException { | |
//决定切分的几个参数 | |
if (minSplitSizeNode != 0) { | |
minSizeNode = minSplitSizeNode; | |
} else { | |
minSizeNode = conf.getLong(SPLIT_MINSIZE_PERNODE, 0); | |
} | |
if (minSplitSizeRack != 0) { | |
minSizeRack = minSplitSizeRack; | |
} else { | |
minSizeRack = conf.getLong(SPLIT_MINSIZE_PERRACK, 0); | |
} | |
if (maxSplitSize != 0) { | |
maxSize = maxSplitSize; | |
} else { | |
maxSize= = conf.getLong("mapreduce.input.fileinputformat.split.maxsize", 0); | |
} | |
for (MultiPathFilter onepool : pools) { | |
ArrayList<Path> myPaths = new ArrayList<Path>(); | |
// create splits for all files in this pool. | |
getMoreSplits(job, myPaths.toArray(new Path[myPaths.size()]), | |
maxSize, minSizeNode, minSizeRack, splits); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment