Skip to content

Instantly share code, notes, and snippets.

@gigq
Created September 6, 2010 19:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gigq/567406 to your computer and use it in GitHub Desktop.
Save gigq/567406 to your computer and use it in GitHub Desktop.
Index: test/org/apache/pig/test/TestSplitCombine.java
===================================================================
--- test/org/apache/pig/test/TestSplitCombine.java (revision 0)
+++ test/org/apache/pig/test/TestSplitCombine.java (revision 0)
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.impl.plan.OperatorKey;
+
+import junit.framework.Assert;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestSplitCombine {
+ private Configuration conf;
+ private TestPigInputFormat pigInputFormat;
+ private ArrayList<OperatorKey> ok;
+
+ class TestPigInputFormat extends PigInputFormat {
+ public List<InputSplit> getPigSplits(List<InputSplit> oneInputSplits,
+ int inputIndex, ArrayList<OperatorKey> targetOps,
+ Path path, boolean combinable, Configuration conf)
+ throws IOException, InterruptedException {
+ return super.getPigSplits(oneInputSplits, inputIndex, targetOps,
+ 1000, combinable, conf);
+ }
+ }
+
+ class DummyInputSplit extends InputSplit {
+ private final long length;
+ private final String[] locations;
+
+ public DummyInputSplit(long len, String[] locs) {
+ length = len;
+ locations = locs;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ @Override
+ public String[] getLocations() {
+ return locations;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+ conf.setLong("pig.maxCombinedSplitSize", 1000);
+ pigInputFormat = new TestPigInputFormat();
+ ok = new ArrayList<OperatorKey>();
+ ok.add(new OperatorKey());
+ }
+
+ @Test
+ public void test1() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(result.size(), 2);
+ int index = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (index == 0) {
+ Assert.assertEquals(2, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(400, pigSplit.getLength(1));
+ }
+ else {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l4", "l5"
+ });
+ Assert.assertEquals(400, pigSplit.getLength(0));
+ }
+ index++;
+ }
+ }
+
+ @Test
+ public void test2() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(600, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(700, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(800, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(result.size(), 3);
+ int index = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (index == 0) {
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l4", "l5"
+ });
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(800, pigSplit.getLength(0));
+ }
+ else if (index == 1) {
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(700, pigSplit.getLength(0));
+ }
+ else {
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(1, len);
+ Assert.assertEquals(600, pigSplit.getLength(0));
+ }
+ index++;
+ }
+ }
+
+ @Test
+ public void test3() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(1, result.size());
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ Assert.assertEquals(3, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3", "l4", "l5"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(200, pigSplit.getLength(1));
+ Assert.assertEquals(100, pigSplit.getLength(2));
+ }
+ }
+
+ @Test
+ public void test4() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l4", "l5"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(2, result.size());
+ int idx = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (idx == 0) {
+ Assert.assertEquals(2, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l4", "l5"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(100, pigSplit.getLength(1));
+ }
+ else {
+ Assert.assertEquals(4, len);
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(200, pigSplit.getLength(1));
+ Assert.assertEquals(200, pigSplit.getLength(2));
+ Assert.assertEquals(100, pigSplit.getLength(3));
+ }
+ idx++;
+ }
+ }
+
+ @Test
+ public void test5() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, false, conf);
+ Assert.assertEquals(3, result.size());
+ int index = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (index == 0) {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ }
+ else if (index == 1) {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(400, pigSplit.getLength(0));
+ }
+ else {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l4", "l5"
+ });
+ Assert.assertEquals(400, pigSplit.getLength(0));
+ }
+ index++;
+ }
+ }
+
+ @Test
+ public void test6() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(600, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(300, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(3, result.size());
+ int idx = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (idx == 0) {
+ Assert.assertEquals(2, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(600, pigSplit.getLength(0));
+ Assert.assertEquals(400, pigSplit.getLength(1));
+ }
+ else if (idx == 1) {
+ Assert.assertEquals(3, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(300, pigSplit.getLength(1));
+ Assert.assertEquals(200, pigSplit.getLength(2));
+ }
+ else {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(100, pigSplit.getLength(0));
+ }
+ idx++;
+ }
+ }
+
+ @Test
+ public void test7() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(300, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(400, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(500, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(600, new String[] {
+ "l1", "l2", "l3"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(3, result.size());
+ int idx = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ if (idx == 0) {
+ Assert.assertEquals(2, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(600, pigSplit.getLength(0));
+ Assert.assertEquals(400, pigSplit.getLength(1));
+ }
+ else if (idx == 1) {
+ Assert.assertEquals(3, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(500, pigSplit.getLength(0));
+ Assert.assertEquals(300, pigSplit.getLength(1));
+ Assert.assertEquals(200, pigSplit.getLength(2));
+ }
+ else {
+ Assert.assertEquals(1, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3"
+ });
+ Assert.assertEquals(100, pigSplit.getLength(0));
+ }
+ idx++;
+ }
+ }
+
+ @Test
+ public void test8() throws IOException, InterruptedException {
+ ArrayList<InputSplit> rawSplits = new ArrayList<InputSplit>();
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(100, new String[] {
+ "l1", "l2", "l3"
+ }));
+ rawSplits.add(new DummyInputSplit(200, new String[] {
+ "l1", "l4", "l5"
+ }));
+ List<InputSplit> result = pigInputFormat.getPigSplits(rawSplits, 0, ok,
+ null, true, conf);
+ Assert.assertEquals(result.size(), 1);
+ int index = 0;
+ for (InputSplit split : result) {
+ PigSplit pigSplit = (PigSplit) split;
+ int len = pigSplit.getNumPaths();
+ Assert.assertEquals(3, len);
+ checkLocations(pigSplit.getLocations(), new String[] {
+ "l1", "l2", "l3", "l4", "l5"
+ });
+ Assert.assertEquals(200, pigSplit.getLength(0));
+ Assert.assertEquals(100, pigSplit.getLength(1));
+ Assert.assertEquals(100, pigSplit.getLength(2));
+ index++;
+ }
+ }
+
+ private void checkLocations(String[] actual, String[] expected) {
+ HashSet<String> expectedSet = new HashSet<String>();
+ for (String str : expected)
+ expectedSet.add(str);
+ int count = 0;
+ for (String str : actual) {
+ if (expectedSet.contains(str)) count++;
+ }
+ Assert.assertEquals(count, expected.length);
+ }
+
+}
Index: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
===================================================================
--- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (revision 993080)
+++ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (working copy)
@@ -1364,6 +1364,9 @@
MRPlan.connect(rightMROpr, curMROp);
}
phyToMROpMap.put(joinOp, curMROp);
+ // no combination of small splits as there is currently no way to guarantee the sortness
+ // of the combined splits.
+ curMROp.noCombineSmallSplits();
}
catch(PlanException e){
int errCode = 2034;
Index: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
===================================================================
--- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (revision 993080)
+++ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (working copy)
@@ -132,7 +132,13 @@
// Name of the partition file generated by sampling process,
// Used by Skewed Join
private String skewedJoinPartitionFile;
+
+ // Flag to indicate if the small input splits need to be combined to form a larger
+ // one in order to reduce the number of mappers. For merge join, both tables
+ // are NOT combinable for correctness.
+ private boolean combineSmallSplits = true;
+
public MapReduceOper(OperatorKey k) {
super(k);
mapPlan = new PhysicalPlan();
@@ -383,4 +389,12 @@
public void setUseSecondaryKey(boolean useSecondaryKey) {
this.useSecondaryKey = useSecondaryKey;
}
+
+ protected void noCombineSmallSplits() {
+ combineSmallSplits = false;
+ }
+
+ public boolean combineSmallSplits() {
+ return combineSmallSplits;
+ }
}
Index: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
===================================================================
--- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (revision 993080)
+++ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (working copy)
@@ -51,13 +51,26 @@
*/
Tuple curValue = null;
- // the underlying RecordReader used by the loader
+ // the current wrapped RecordReader used by the loader
@SuppressWarnings("unchecked")
- private RecordReader wrappedReader;
+ private RecordReader curReader;
// the loader object
private LoadFunc loadfunc;
+ // the wrapped inputformat
+ private InputFormat inputformat;
+
+ // the wrapped splits
+ private PigSplit pigSplit;
+
+ // the wrapped split index in use
+ private int idx;
+
+ private long progress;
+
+ private TaskAttemptContext context;
+
/**
* the Configuration object with data specific to the input the underlying
* RecordReader will process (this is obtained after a
@@ -70,11 +83,17 @@
* @param conf
*
*/
- public PigRecordReader(RecordReader wrappedReader,
- LoadFunc loadFunc, Configuration conf) {
- this.wrappedReader = wrappedReader;
- this.loadfunc = loadFunc;
- this.inputSpecificConf = conf;
+ public PigRecordReader(InputFormat inputformat, PigSplit pigSplit,
+ LoadFunc loadFunc, TaskAttemptContext context) throws IOException, InterruptedException {
+ this.inputformat = inputformat;
+ this.pigSplit = pigSplit;
+ this.loadfunc = loadFunc;
+ this.context = context;
+ this.inputSpecificConf = context.getConfiguration();
+ curReader = null;
+ progress = 0;
+ idx = 0;
+ initNextRecordReader();
}
/* (non-Javadoc)
@@ -82,7 +101,10 @@
*/
@Override
public void close() throws IOException {
- wrappedReader.close();
+ if (curReader != null) {
+ curReader.close();
+ curReader = null;
+ }
}
/* (non-Javadoc)
@@ -108,7 +130,12 @@
*/
@Override
public float getProgress() throws IOException, InterruptedException {
- return wrappedReader.getProgress();
+ long subprogress = 0; // bytes processed in current split
+ if (null != curReader) {
+ // idx is always one past the current subsplit's true index.
+ subprogress = (long)(curReader.getProgress() * pigSplit.getLength(idx - 1));
+ }
+ return Math.min(1.0f, (progress + subprogress)/(float)(pigSplit.getLength()));
}
/* (non-Javadoc)
@@ -121,7 +148,8 @@
// object - this is achieved by merging the Context corresponding to
// the input split this Reader is supposed to process with the context
// passed in.
- PigSplit pigSplit = (PigSplit)split;
+ this.pigSplit = (PigSplit)split;
+ this.context = context;
ConfigurationUtil.mergeConf(context.getConfiguration(),
inputSpecificConf);
// Pass loader signature to LoadFunc and to InputFormat through
@@ -130,8 +158,10 @@
context.getConfiguration());
// now invoke initialize() on underlying RecordReader with
// the "adjusted" conf
- wrappedReader.initialize(pigSplit.getWrappedSplit(), context);
- loadfunc.prepareToRead(wrappedReader, pigSplit);
+ if (null != curReader) {
+ curReader.initialize(pigSplit.getWrappedSplit(), context);
+ loadfunc.prepareToRead(curReader, pigSplit);
+ }
}
/* (non-Javadoc)
@@ -139,8 +169,48 @@
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
- curValue = loadfunc.getNext();
- return curValue != null;
+ while ((curReader == null) || (curValue = loadfunc.getNext()) == null) {
+ if (!initNextRecordReader()) {
+ return false;
+ }
+ }
+ return true;
}
+ /**
+ * Get the record reader for the next chunk in this CombineFileSplit.
+ */
+ protected boolean initNextRecordReader() throws IOException, InterruptedException {
+
+ if (curReader != null) {
+ curReader.close();
+ curReader = null;
+ if (idx > 0) {
+ progress += pigSplit.getLength(idx-1); // done processing so far
+ }
+ }
+
+ // if all chunks have been processed, nothing more to do.
+ if (idx == pigSplit.getNumPaths()) {
+ return false;
+ }
+
+ // get a record reader for the idx-th chunk
+ try {
+
+
+ curReader = inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
+
+ if (idx > 0) {
+ // initialize() for the first RecordReader will be called by MapTask;
+ // we're responsible for initializing subsequent RecordReaders.
+ curReader.initialize(pigSplit.getWrappedSplit(idx), context);
+ loadfunc.prepareToRead(curReader, pigSplit);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException (e);
+ }
+ idx++;
+ return true;
+ }
}
Index: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
===================================================================
--- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (revision 993080)
+++ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (working copy)
@@ -608,7 +608,20 @@
conf.set(PIG_MAP_STORES, ObjectSerializer.serialize(mapStores));
conf.set(PIG_REDUCE_STORES, ObjectSerializer.serialize(reduceStores));
-
+ String tmp;
+ long maxCombinedSplitSize = 0;
+ if (!mro.combineSmallSplits() || pigContext.getProperties().getProperty("pig.splitCombination", "true").equals("false"))
+ conf.setBoolean("pig.noSplitCombination", true);
+ else if ((tmp = pigContext.getProperties().getProperty("pig.maxCombinedSplitSize", null)) != null) {
+ try {
+ maxCombinedSplitSize = Long.parseLong(tmp);
+ } catch (NumberFormatException e) {
+ log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
+ }
+ }
+ if (maxCombinedSplitSize > 0)
+ conf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
+
// Serialize the UDF specific context info.
UDFContext.getUDFContext().serialize(conf);
Job cjob = new Job(new JobConf(nwJob.getConfiguration()), new ArrayList());
Index: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
===================================================================
--- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (revision 993080)
+++ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java (working copy)
@@ -19,7 +19,11 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,8 +40,11 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.pig.ExecType;
import org.apache.pig.FuncSpec;
+import org.apache.pig.IndexableLoadFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
+import org.apache.pig.CollectableLoadFunc;
+import org.apache.pig.OrderedLoadFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -107,11 +114,8 @@
PigInputFormat.sJob = conf;
InputFormat inputFormat = loadFunc.getInputFormat();
- // now invoke the createRecordReader() with this "adjusted" conf
- RecordReader reader = inputFormat.createRecordReader(
- pigSplit.getWrappedSplit(), context);
- return new PigRecordReader(reader, loadFunc, conf);
+ return new PigRecordReader(inputFormat, pigSplit, loadFunc, context);
}
@@ -243,6 +247,12 @@
FuncSpec loadFuncSpec = inputs.get(i).getFuncSpec();
LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(
loadFuncSpec);
+ boolean combinable = !(loadFunc instanceof MergeJoinIndexer) &&
+ !(IndexableLoadFunc.class.isAssignableFrom(loadFunc.getClass())) &&
+ !(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()) &&
+ OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()));
+ if (combinable)
+ combinable = !conf.getBoolean("pig.noSplitCombination", false);
Configuration confClone = new Configuration(conf);
Job inputSpecificJob = new Job(confClone);
// Pass loader signature to LoadFunc and to InputFormat through
@@ -258,8 +268,8 @@
List<InputSplit> oneInputSplits = inpFormat.getSplits(
new JobContext(inputSpecificJob.getConfiguration(),
jobcontext.getJobID()));
- List<PigSplit> oneInputPigSplits = getPigSplits(
- oneInputSplits, i, inpTargets.get(i), conf);
+ List<InputSplit> oneInputPigSplits = getPigSplits(
+ oneInputSplits, i, inpTargets.get(i), fs.getDefaultBlockSize(), combinable, confClone);
splits.addAll(oneInputPigSplits);
} catch (ExecException ee) {
throw ee;
@@ -281,19 +291,40 @@
return splits;
}
- private List<PigSplit> getPigSplits(List<InputSplit> oneInputSplits,
- int inputIndex, ArrayList<OperatorKey> targetOps, Configuration conf) {
- int splitIndex = 0;
- ArrayList<PigSplit> pigSplits = new ArrayList<PigSplit>();
- for (InputSplit inputSplit : oneInputSplits) {
- PigSplit pigSplit = new PigSplit(inputSplit, inputIndex, targetOps,
- splitIndex++);
- pigSplit.setConf(conf);
- pigSplits.add(pigSplit);
+ protected List<InputSplit> getPigSplits(List<InputSplit> oneInputSplits,
+ int inputIndex, ArrayList<OperatorKey> targetOps, long blockSize, boolean combinable, Configuration conf)
+ throws IOException, InterruptedException {
+ ArrayList<InputSplit> pigSplits = new ArrayList<InputSplit>();
+ if (!combinable) {
+ int splitIndex = 0;
+ for (InputSplit inputSplit : oneInputSplits) {
+ PigSplit pigSplit = new PigSplit(new InputSplit[] {inputSplit}, inputIndex, targetOps,
+ splitIndex++);
+ pigSplit.setConf(conf);
+ pigSplits.add(pigSplit);
+ }
+ return pigSplits;
+ } else {
+ long maxCombinedSplitSize = conf.getLong("pig.maxCombinedSplitSize", 0);
+ if (maxCombinedSplitSize== 0)
+ // default is the block size
+ maxCombinedSplitSize = blockSize;
+ List<List<InputSplit>> combinedSplits =
+ MapRedUtil.getCombinePigSplits(oneInputSplits, maxCombinedSplitSize, conf);
+ for (int i = 0; i < combinedSplits.size(); i++)
+ pigSplits.add(createPigSplit(combinedSplits.get(i), inputIndex, targetOps, i, conf));
+ return pigSplits;
}
- return pigSplits;
}
+ private InputSplit createPigSplit(List<InputSplit> combinedSplits,
+ int inputIndex, ArrayList<OperatorKey> targetOps, int splitIndex, Configuration conf)
+ {
+ PigSplit pigSplit = new PigSplit(combinedSplits.toArray(new InputSplit[0]), inputIndex, targetOps, splitIndex);
+ pigSplit.setConf(conf);
+ return pigSplit;
+ }
+
public static PigSplit getActiveSplit() {
return activeSplit;
}
Index: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
===================================================================
--- src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (revision 993080)
+++ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (working copy)
@@ -29,6 +29,8 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
+import java.util.HashSet;
+import java.lang.StringBuilder;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
@@ -65,7 +67,7 @@
private int inputIndex;
// The real InputSplit this split is wrapping
- private InputSplit wrappedSplit;
+ private InputSplit[] wrappedSplits;
// index of the wrappedSplit in the list of splits returned by
// InputFormat.getSplits()
@@ -82,20 +84,29 @@
* total number of splits - required by skew join
*/
private int totalSplits;
+
+ /**
+ * total length
+ */
+ private long length = -1;
+
+ /**
+ * overall locations
+ */
+ String[] locations = null;
// this seems necessary for Hadoop to instatiate this split on the
// backend
public PigSplit() {}
- public PigSplit(InputSplit wrappedSplit, int inputIndex,
+ public PigSplit(InputSplit[] wrappedSplits, int inputIndex,
List<OperatorKey> targetOps, int splitIndex) {
- this.wrappedSplit = wrappedSplit;
+ this.wrappedSplits = wrappedSplits;
this.inputIndex = inputIndex;
this.targetOps = new ArrayList<OperatorKey>(targetOps);
this.splitIndex = splitIndex;
}
-
-
+
public List<OperatorKey> getTargetOps() {
return new ArrayList<OperatorKey>(targetOps);
}
@@ -107,12 +118,34 @@
* @return the wrappedSplit
*/
public InputSplit getWrappedSplit() {
- return wrappedSplit;
+ return wrappedSplits[0];
}
+
+ /**
+ *
+ * @param idx the index into the wrapped splits
+ * @return the specified wrapped split
+ */
+ public InputSplit getWrappedSplit(int idx) {
+ return wrappedSplits[idx];
+ }
@Override
public String[] getLocations() throws IOException, InterruptedException {
- return wrappedSplit.getLocations();
+ if (locations == null) {
+ HashSet<String> locSet = new HashSet<String>();
+ for (int i = 0; i < wrappedSplits.length; i++)
+ {
+ String[] locs = wrappedSplits[i].getLocations();
+ for (int j = 0; j < locs.length; j++)
+ locSet.add(locs[j]);
+ }
+ locations = new String[locSet.size()];
+ int i = 0;
+ for (String loc : locSet)
+ locations[i++] = loc;
+ }
+ return locations;
}
/* (non-Javadoc)
@@ -120,23 +153,42 @@
*/
@Override
public long getLength() throws IOException, InterruptedException {
- return wrappedSplit.getLength();
+ if (length == -1) {
+ length = 0;
+ for (int i = 0; i < wrappedSplits.length; i++)
+ length += wrappedSplits[i].getLength();
+ }
+ return length;
}
+
+ /**
+ * Return the length of a wrapped split
+ * @param idx the index into the wrapped splits
+ * @return number of wrapped splits
+ */
+ public long getLength(int idx) throws IOException, InterruptedException {
+ return wrappedSplits[idx].getLength();
+ }
@SuppressWarnings("unchecked")
public void readFields(DataInput is) throws IOException {
splitIndex = is.readInt();
inputIndex = is.readInt();
targetOps = (ArrayList<OperatorKey>) readObject(is);
+ int splitLen = is.readInt();
String splitClassName = is.readUTF();
try {
Class splitClass = conf.getClassByName(splitClassName);
- wrappedSplit = (InputSplit)ReflectionUtils.newInstance(splitClass, conf);
SerializationFactory sf = new SerializationFactory(conf);
// The correct call sequence for Deserializer is, we shall open, then deserialize, but we shall not close
Deserializer d = sf.getDeserializer(splitClass);
d.open((InputStream) is);
- d.deserialize(wrappedSplit);
+ wrappedSplits = new InputSplit[splitLen];
+ for (int i = 0; i < splitLen; i++)
+ {
+ wrappedSplits[i] = (InputSplit)ReflectionUtils.newInstance(splitClass, conf);
+ d.deserialize(wrappedSplits[i]);
+ }
} catch (ClassNotFoundException e) {
throw new IOException(e);
}
@@ -148,13 +200,17 @@
os.writeInt(splitIndex);
os.writeInt(inputIndex);
writeObject(targetOps, os);
- os.writeUTF(wrappedSplit.getClass().getName());
+ os.writeInt(wrappedSplits.length);
+ os.writeUTF(wrappedSplits[0].getClass().getName());
SerializationFactory sf = new SerializationFactory(conf);
Serializer s =
- sf.getSerializer(wrappedSplit.getClass());
- // The correct call sequence for Serializer is, we shall open, then serialize, but we shall not close
+ sf.getSerializer(wrappedSplits[0].getClass());
s.open((OutputStream) os);
- s.serialize(wrappedSplit);
+ for (int i = 0; i < wrappedSplits.length; i++)
+ {
+ // The correct call sequence for Serializer is, we shall open, then serialize, but we shall not close
+ s.serialize(wrappedSplits[i]);
+ }
}
@@ -223,6 +279,14 @@
int getInputIndex() {
return inputIndex;
}
+
+ /**
+ *
+ * @return the number of wrapped splits
+ */
+ public int getNumPaths() {
+ return wrappedSplits.length;
+ }
/**
* @return the totalSplits
@@ -244,4 +308,23 @@
this.totalSplits = totalSplits;
}
+ @Override
+ public String toString() {
+ StringBuilder st = new StringBuilder();
+ st.append("Number of splits :" + wrappedSplits.length+"\n");
+ try {
+ st.append("Total Length = "+ getLength()+"\n");
+ for (int i = 0; i < wrappedSplits.length; i++) {
+ st.append("Input split["+i+"]:\n Length = "+ wrappedSplits[i].getLength()+"\n Locations:\n");
+ for (String location : wrappedSplits[i].getLocations())
+ st.append(" "+location+"\n");
+ st.append("\n-----------------------\n");
+ }
+ } catch (IOException e) {
+ return null;
+ } catch (InterruptedException e) {
+ return null;
+ }
+ return st.toString();
+ }
}
Index: src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
===================================================================
--- src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (revision 993080)
+++ src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (working copy)
@@ -21,9 +21,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Comparator;
+import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,7 +35,9 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTupleFactory;
@@ -168,4 +173,397 @@
return !name.startsWith("_") && !name.startsWith(".");
}
};
+
+ /* The following codes are for split combination: see PIG-1518
+ *
+ */
+ private static Comparator<Node> nodeComparator = new Comparator<Node>() {
+ @Override
+ public int compare(Node o1, Node o2) {
+ long cmp = o1.length - o2.length;
+ return cmp == 0 ? 0 : cmp < 0 ? -1 : 1;
+ }
+ };
+
+ private static final class ComparableSplit implements Comparable<ComparableSplit> {
+ private InputSplit rawInputSplit;
+ private HashSet<Node> nodes;
+ // id used as a tie-breaker when two splits are of equal size.
+ private long id;
+ ComparableSplit(InputSplit split, long id) {
+ rawInputSplit = split;
+ nodes = new HashSet<Node>();
+ this.id = id;
+ }
+
+ void add(Node node) {
+ nodes.add(node);
+ }
+
+ void removeFromNodes() {
+ for (Node node : nodes)
+ node.remove(this);
+ }
+
+ public InputSplit getSplit() {
+ return rawInputSplit;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null || !(other instanceof ComparableSplit))
+ return false;
+ return (compareTo((ComparableSplit) other) == 0);
+ }
+
+ @Override
+ public int hashCode() {
+ return 41;
+ }
+
+ @Override
+ public int compareTo(ComparableSplit other) {
+ try {
+ long cmp = rawInputSplit.getLength() - other.rawInputSplit.getLength();
+ // in descending order
+ return cmp == 0 ? (id == other.id ? 0 : id < other.id ? -1 : 1) : cmp < 0 ? 1 : -1;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private static class DummySplit extends InputSplit {
+ private long length;
+
+ @Override
+ public String[] getLocations() {
+ return null;
+ }
+
+ @Override
+ public long getLength() {
+ return length;
+ }
+
+ public void setLength(long length) {
+ this.length = length;
+ }
+ }
+
+ private static class Node {
+ private long length = 0;
+ private ArrayList<ComparableSplit> splits;
+ private boolean sorted;
+
+ Node() throws IOException, InterruptedException {
+ length = 0;
+ splits = new ArrayList<ComparableSplit>();
+ sorted = false;
+ }
+
+ void add(ComparableSplit split) throws IOException, InterruptedException {
+ splits.add(split);
+ length++;
+ }
+
+ void remove(ComparableSplit split) {
+ if (!sorted)
+ sort();
+ int index = Collections.binarySearch(splits, split);
+ if (index >= 0) {
+ splits.remove(index);
+ length--;
+ }
+ }
+
+ void sort() {
+ if (!sorted) {
+ Collections.sort(splits);
+ sorted = true;
+ }
+ }
+
+ ArrayList<ComparableSplit> getSplits() {
+ return splits;
+ }
+
+ public long getLength() {
+ return length;
+ }
+ }
+
+ public static List<List<InputSplit>> getCombinePigSplits(List<InputSplit>
+ oneInputSplits, long maxCombinedSplitSize, Configuration conf)
+ throws IOException, InterruptedException {
+ ArrayList<Node> nodes = new ArrayList<Node>();
+ HashMap<String, Node> nodeMap = new HashMap<String, Node>();
+ List<List<InputSplit>> result = new ArrayList<List<InputSplit>>();
+ List<Long> resultLengths = new ArrayList<Long>();
+ long comparableSplitId = 0;
+
+ int size = 0, nSplits = oneInputSplits.size();
+ InputSplit lastSplit = null;
+ int emptyCnt = 0;
+ for (InputSplit split : oneInputSplits) {
+ if (split.getLength() == 0) {
+ emptyCnt++;
+ continue;
+ }
+ if (split.getLength() >= maxCombinedSplitSize) {
+ comparableSplitId++;
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ combinedSplits.add(split);
+ result.add(combinedSplits);
+ resultLengths.add(split.getLength());
+ } else {
+ ComparableSplit csplit = new ComparableSplit(split, comparableSplitId++);
+ String[] locations = split.getLocations();
+ HashSet<String> locationSeen = new HashSet<String>();
+ for (String location : locations)
+ {
+ if (!locationSeen.contains(location))
+ {
+ Node node = nodeMap.get(location);
+ if (node == null) {
+ node = new Node();
+ nodes.add(node);
+ nodeMap.put(location, node);
+ }
+ node.add(csplit);
+ csplit.add(node);
+ locationSeen.add(location);
+ }
+ }
+ lastSplit = split;
+ size++;
+ }
+ }
+ /* verification code: debug purpose
+ {
+ ArrayList<ComparableSplit> leftoverSplits = new ArrayList<ComparableSplit>();
+ HashSet<InputSplit> seen = new HashSet<InputSplit>();
+ for (Node node : nodes) {
+ if (node.getLength() > 0)
+ {
+ ArrayList<ComparableSplit> splits = node.getSplits();
+ for (ComparableSplit split : splits) {
+ if (!seen.contains(split.getSplit())) {
+ // remove duplicates. The set has to be on the raw input split not the
+ // comparable input split as the latter overrides the compareTo method
+ // so its equality semantics is changed and not we want here
+ seen.add(split.getSplit());
+ leftoverSplits.add(split);
+ }
+ }
+ }
+ }
+
+ int combinedSplitLen = 0;
+ for (PigSplit split : result)
+ combinedSplitLen += split.getNumPaths();
+ if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt) {
+ throw new AssertionError("number of combined splits {"+combinedSplitLen+"+"+leftoverSplits.size()+"-"+size+"} does not match the number of original splits ["+nSplits+"].");
+ }
+ }
+ */
+ if (nSplits > 0 && emptyCnt == nSplits)
+ {
+ // if all splits are empty, add a single empty split as currently an empty directory is
+ // not properly handled somewhere
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ combinedSplits.add(oneInputSplits.get(0));
+ result.add(combinedSplits);
+ }
+ else if (size == 1) {
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ combinedSplits.add(lastSplit);
+ result.add(combinedSplits);
+ } else if (size > 1) {
+ // combine small splits
+ Collections.sort(nodes, nodeComparator);
+ DummySplit dummy = new DummySplit();
+ // dummy is used to search for next split of suitable size to be combined
+ ComparableSplit dummyComparableSplit = new ComparableSplit(dummy, -1);
+ for (Node node : nodes) {
+ // sort the splits on this node in descending order
+ node.sort();
+ long totalSize = 0;
+ ArrayList<ComparableSplit> splits = node.getSplits();
+ int idx;
+ int lenSplits;
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>();
+ while (!splits.isEmpty()) {
+ combinedSplits.add(splits.get(0).getSplit());
+ combinedComparableSplits.add(splits.get(0));
+ int startIdx = 1;
+ lenSplits = splits.size();
+ totalSize += splits.get(0).getSplit().getLength();
+ long spaceLeft = maxCombinedSplitSize - totalSize;
+ dummy.setLength(spaceLeft);
+ idx = Collections.binarySearch(node.getSplits().subList(startIdx, lenSplits), dummyComparableSplit);
+ idx = -idx-1+startIdx;
+ while (idx < lenSplits)
+ {
+ long thisLen = splits.get(idx).getSplit().getLength();
+ combinedSplits.add(splits.get(idx).getSplit());
+ combinedComparableSplits.add(splits.get(idx));
+ totalSize += thisLen;
+ spaceLeft -= thisLen;
+ if (spaceLeft <= 0)
+ break;
+ // find next combinable chunk
+ startIdx = idx + 1;
+ if (startIdx >= lenSplits)
+ break;
+ dummy.setLength(spaceLeft);
+ idx = Collections.binarySearch(node.getSplits().subList(startIdx, lenSplits), dummyComparableSplit);
+ idx = -idx-1+startIdx;
+ }
+ if (totalSize > maxCombinedSplitSize/2) {
+ result.add(combinedSplits);
+ resultLengths.add(totalSize);
+ removeSplits(combinedComparableSplits);
+ totalSize = 0;
+ combinedSplits = new ArrayList<InputSplit>();
+ combinedComparableSplits.clear();
+ splits = node.getSplits();
+ } else {
+ if (combinedSplits.size() != lenSplits)
+ throw new AssertionError("Combined split logic error!");
+ break;
+ }
+ }
+ }
+ // handle leftovers
+ ArrayList<ComparableSplit> leftoverSplits = new ArrayList<ComparableSplit>();
+ HashSet<InputSplit> seen = new HashSet<InputSplit>();
+ for (Node node : nodes) {
+ for (ComparableSplit split : node.getSplits()) {
+ if (!seen.contains(split.getSplit())) {
+ // remove duplicates. The set has to be on the raw input split not the
+ // comparable input split as the latter overrides the compareTo method
+ // so its equality semantics is changed and not we want here
+ seen.add(split.getSplit());
+ leftoverSplits.add(split);
+ }
+ }
+ }
+
+ /* verification code
+ int combinedSplitLen = 0;
+ for (PigSplit split : result)
+ combinedSplitLen += split.getNumPaths();
+ if (combinedSplitLen + leftoverSplits.size()!= nSplits-emptyCnt)
+ throw new AssertionError("number of combined splits ["+combinedSplitLen+"+"+leftoverSplits.size()+"] does not match the number of original splits ["+nSplits+"].");
+ */
+ if (!leftoverSplits.isEmpty())
+ {
+ long totalSize = 0;
+ ArrayList<InputSplit> combinedSplits = new ArrayList<InputSplit>();
+ ArrayList<ComparableSplit> combinedComparableSplits = new ArrayList<ComparableSplit>();
+
+ int splitLen = leftoverSplits.size();
+ for (int i = 0; i < splitLen; i++)
+ {
+ ComparableSplit split = leftoverSplits.get(i);
+ long thisLen = split.getSplit().getLength();
+ if (totalSize + thisLen >= maxCombinedSplitSize) {
+ removeSplits(combinedComparableSplits);
+ result.add(combinedSplits);
+ resultLengths.add(totalSize);
+ combinedSplits = new ArrayList<InputSplit>();
+ combinedComparableSplits.clear();
+ totalSize = 0;
+ }
+ combinedSplits.add(split.getSplit());
+ combinedComparableSplits.add(split);
+ totalSize += split.getSplit().getLength();
+ if (i == splitLen - 1) {
+ // last piece: it could be very small, try to see it can be squeezed into any existing splits
+ for (int j =0; j < result.size(); j++)
+ {
+ if (resultLengths.get(j) + totalSize <= maxCombinedSplitSize)
+ {
+ List<InputSplit> isList = result.get(j);
+ for (InputSplit csplit : combinedSplits) {
+ isList.add(csplit);
+ }
+ removeSplits(combinedComparableSplits);
+ combinedSplits.clear();
+ break;
+ }
+ }
+ if (!combinedSplits.isEmpty()) {
+ // last piece can not be squeezed in, create a new combined split for them.
+ removeSplits(combinedComparableSplits);
+ result.add(combinedSplits);
+ }
+ }
+ }
+ }
+ }
+ /* verification codes
+ int combinedSplitLen = 0;
+ for (PigSplit split : result)
+ combinedSplitLen += split.getNumPaths();
+ if (combinedSplitLen != nSplits-emptyCnt)
+ throw new AssertionError("number of combined splits ["+combinedSplitLen+"] does not match the number of original splits ["+nSplits+"].");
+
+ long totalLen = 0;
+ for (PigSplit split : result)
+ totalLen += split.getLength();
+
+ long origTotalLen = 0;
+ for (InputSplit split : oneInputSplits)
+ origTotalLen += split.getLength();
+ if (totalLen != origTotalLen)
+ throw new AssertionError("The total length ["+totalLen+"] does not match the original ["+origTotalLen+"]");
+ */
+ log.info("Total input paths (combined) to process : " + result.size());
+ return result;
+ }
+
+ private static void removeSplits(List<ComparableSplit> splits) {
+ for (ComparableSplit split: splits)
+ split.removeFromNodes();
+ }
+
+ public String inputSplitToString(InputSplit[] splits) throws IOException, InterruptedException {
+ // debugging purpose only
+ StringBuilder st = new StringBuilder();
+ st.append("Number of splits :" + splits.length+"\n");
+ long len = 0;
+ for (InputSplit split: splits)
+ len += split.getLength();
+ st.append("Total Length = "+ len +"\n");
+ for (int i = 0; i < splits.length; i++) {
+ st.append("Input split["+i+"]:\n Length = "+ splits[i].getLength()+"\n Locations:\n");
+ for (String location : splits[i].getLocations())
+ st.append(" "+location+"\n");
+ st.append("\n-----------------------\n");
+ }
+ return st.toString();
+ }
+
+ /* verification code: debug purpose only
+ public String inputSplitToString(ArrayList<ComparableSplit> splits) throws IOException, InterruptedException {
+ StringBuilder st = new StringBuilder();
+ st.append("Number of splits :" + splits.size()+"\n");
+ long len = 0;
+ for (ComparableSplit split: splits)
+ len += split.getSplit().getLength();
+ st.append("Total Length = "+ len +"\n");
+ for (int i = 0; i < splits.size(); i++) {
+ st.append("Input split["+i+"]:\n Length = "+ splits.get(i).getSplit().getLength()+"\n Locations:\n");
+ for (String location : splits.get(i).getSplit().getLocations())
+ st.append(" "+location+"\n");
+ st.append("\n-----------------------\n");
+ }
+ return st.toString();
+ }
+ */
}
Index: src/org/apache/pig/impl/io/ReadToEndLoader.java
===================================================================
--- src/org/apache/pig/impl/io/ReadToEndLoader.java (revision 993080)
+++ src/org/apache/pig/impl/io/ReadToEndLoader.java (working copy)
@@ -174,7 +174,7 @@
// create a dummy pigsplit - other than the actual split, the other
// params are really not needed here where we are just reading the
// input completely
- PigSplit pigSplit = new PigSplit(curSplit, -1,
+ PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1,
new ArrayList<OperatorKey>(), -1);
wrappedLoadFunc.prepareToRead(reader, pigSplit);
return true;
@gigq
Copy link
Author

gigq commented Sep 6, 2010

Backport of Yan Zhou's multi file input format to load small files with an optimal number of mappers for Pig 0.7.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment