Skip to content

Instantly share code, notes, and snippets.

@bigsquirrel
Created April 15, 2015 13:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bigsquirrel/748b50daf15d5b9cb640 to your computer and use it in GitHub Desktop.
Save bigsquirrel/748b50daf15d5b9cb640 to your computer and use it in GitHub Desktop.
big data hw1
package com.ivanchou;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import java.io.*;
import java.net.URI;
import java.util.Enumeration;
import java.util.Hashtable;
/**
* Created by ivanchou on 4/10/15.
*/
public class Hw1Grp2 {
private static final String TABLE_NAME = "Result";
private static final String COLUMN_FAMILY = "res";
private static final String COLUMN_COUNT = "count";
private static final String COLUMN_AVG = "avg";
private static final String COLUMN_MAX = "max";
private static final String USAGE = "usage: java Hw1Grp2 R=[FilePath] groupby:R[X] res:[count | avg(R[X]) | max]";
private String filePath;
private int groupByColumn;
private boolean countFlag;
private boolean maxFlag;
private boolean avgFlag;
private int avgColumn;
private Hashtable<String, DataStructure> hashTable;
private BufferedReader bufferedReader;
private FileSystem fileSystem;
private void handleArgs(String[] args) throws UsageNotCorrectException{
int flag = 0;
if (args.length != 3) {
throw new UsageNotCorrectException();
}
for (int i = 0; i < args.length; i++) {
if (args[i].contains("R=")) {
flag++;
filePath = args[i].split("\\=")[1];
}
if (args[i].contains("groupby")) {
flag++;
groupByColumn = Character.getNumericValue((args[i].charAt(args[i].length() - 1)));
}
if (args[i].contains("res")) {
flag++;
String tmp = args[i].split(":")[1];
String[] resType = tmp.split(",");
for (int j = 0; j < resType.length; j++) {
if (resType[j].equals("count")) {
countFlag = true;
} else if (resType[j].equals("max")) {
maxFlag = true;
} else if (resType[j].contains("avg")) {
avgFlag = true;
if (resType[j].length() == 7) {
avgColumn = Character.getNumericValue(resType[j].charAt(5));
} else {
throw new UsageNotCorrectException();
}
}
}
}
}
if (flag != 3) {
throw new UsageNotCorrectException();
}
}
private void readFileFromHadoop() throws IOException {
Configuration conf = new Configuration();
fileSystem = FileSystem.get(URI.create(filePath), conf);
Path path = new Path(filePath);
FSDataInputStream is = fileSystem.open(path);
bufferedReader = new BufferedReader(new InputStreamReader(is));
}
private void handleFile() throws IOException, NumberFormatException {
String str;
DataStructure ds;
hashTable = new Hashtable<String, DataStructure>();
while ((str = bufferedReader.readLine()) != null) {
String[] strs = str.split("\\|");
int tmp = Integer.valueOf(strs[avgColumn]);
if (hashTable.get(strs[groupByColumn]) != null) {
ds = hashTable.get(strs[groupByColumn]);
ds.count++;
ds.sum += tmp;
ds.avg = (float)ds.sum / ds.count;
if (ds.max < tmp) {
ds.max = tmp;
}
hashTable.put(strs[groupByColumn], ds);
} else {
ds = new DataStructure();
ds.avg = ds.max = ds.sum = tmp;
ds.count++;
hashTable.put(strs[groupByColumn], ds);
}
}
bufferedReader.close();
fileSystem.close();
}
private void writeToHBase() throws IOException {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
HColumnDescriptor cf = new HColumnDescriptor(COLUMN_FAMILY);
htd.addFamily(cf);
Configuration conf = HBaseConfiguration.create();
HBaseAdmin hAdmin = new HBaseAdmin(conf);
if (hAdmin.tableExists(TABLE_NAME)) {
hAdmin.disableTable(TABLE_NAME);
hAdmin.deleteTable(TABLE_NAME);
}
hAdmin.createTable(htd);
hAdmin.close();
HTable hTable = new HTable(conf, TABLE_NAME);
Enumeration<String> enumKey = hashTable.keys();
while (enumKey.hasMoreElements()) {
String key = enumKey.nextElement();
DataStructure val = hashTable.get(key);
Put put = new Put(key.getBytes());
if (countFlag) {
put.add(COLUMN_FAMILY.getBytes(), COLUMN_COUNT.getBytes(), String.valueOf(val.count).getBytes());
}
if (maxFlag) {
put.add(COLUMN_FAMILY.getBytes(), COLUMN_MAX.getBytes(), String.valueOf(val.max).getBytes());
}
if (avgFlag) {
put.add(COLUMN_FAMILY.getBytes(), (COLUMN_AVG + "(R" + avgColumn + ")").getBytes(), String.valueOf(val.avg).getBytes());
}
hTable.put(put);
}
hTable.close();
}
public static void main(String[] args) {
Hw1Grp2 hw1Grp2 = new Hw1Grp2();
try {
// 处理输入
hw1Grp2.handleArgs(args);
// Hadoop 读入文件
hw1Grp2.readFileFromHadoop();
// 分析文件
hw1Grp2.handleFile();
// 写入 HBase
hw1Grp2.writeToHBase();
} catch (NumberFormatException e) {
System.out.println("The avg column can't be string!");
} catch (UsageNotCorrectException e) {
System.out.println(USAGE);
} catch (IOException e) {
e.printStackTrace();
}
}
public class DataStructure {
public long count;
public float avg;
public long max;
public long sum;
public DataStructure() {
avg = count = max = sum = 0;
}
}
public class UsageNotCorrectException extends Exception {
public UsageNotCorrectException() {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment