jmhodges (owner)

Revisions

gist: 150217 Download_button fork
public
Public Clone URL: git://gist.github.com/150217.git
Embed All Files: show embed
TableInputFormatBase.java #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
  /**
* Calculates the splits that will serve as input for the map tasks.
* <ul>
* Splits are created in number equal to the smallest between numSplits and
* the number of {@link HRegion}s in the table. If the number of splits is
* smaller than the number of {@link HRegion}s then splits are spanned across
* multiple {@link HRegion}s and are grouped the most evenly possible. In the
* case splits are uneven the bigger splits are placed first in the
* {@link InputSplit} array.
*
* @param job the map task {@link JobConf}
* @param numSplits a hint to calculate the number of splits (mapred.map.tasks).
*
* @return the input splits
*
* @see org.apache.hadoop.mapred.InputFormat#getSplits(org.apache.hadoop.mapred.JobConf, int)
*/
  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
    byte [][] startKeys = this.table.getStartKeys();
    if (startKeys == null || startKeys.length == 0) {
      throw new IOException("Expecting at least one region");
    }
    if (this.table == null) {
      throw new IOException("No table was provided");
    }
    if (this.inputColumns == null || this.inputColumns.length == 0) {
      throw new IOException("Expecting at least one column");
    }
    int realNumSplits = numSplits > startKeys.length? startKeys.length:
      numSplits;
    InputSplit[] splits = new InputSplit[realNumSplits];
    int middle = startKeys.length / realNumSplits;
    int startPos = 0;
    for (int i = 0; i < realNumSplits; i++) {
      int lastPos = startPos + middle;
      lastPos = startKeys.length % realNumSplits > i ? lastPos + 1 : lastPos;
      String regionLocation = table.getRegionLocation(startKeys[startPos]).
        getServerAddress().getHostname();
      splits[i] = new TableSplit(this.table.getTableName(),
        startKeys[startPos], ((i + 1) < realNumSplits) ? startKeys[lastPos]:
          HConstants.EMPTY_START_ROW, regionLocation);
      LOG.info("split: " + i + "->" + splits[i]);
      startPos = lastPos;
    }
    return splits;
  }