Skip to content

Instantly share code, notes, and snippets.

@zhaoawd
Last active April 10, 2019 02:06
Show Gist options
  • Save zhaoawd/33b98cf7c0f4be4a9a49d2fc6e078d3c to your computer and use it in GitHub Desktop.
Save zhaoawd/33b98cf7c0f4be4a9a49d2fc6e078d3c to your computer and use it in GitHub Desktop.
[HBase 数据持久化] #flink #hbase
import com.google.common.collect.Lists;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Map;
/**
* HBase 数据持久化
*
* @author yuanyifan
*/
public class HBaseRichOutputFormat extends RichOutputFormat<PutCollection> {
private static final Logger LOGGER = LoggerFactory.getLogger(HBaseRichOutputFormat.class);
private Connection conn;
@Override
public void configure(Configuration parameters) {
// 初始化连接
try {
conn = ConnectionFactory.createConnection(
// TODO 这里根据你的需要加载配置
);
LOGGER.info("HBase connection established.");
} catch (Exception ex) {
LOGGER.error("Error while creating HBase connection from configure.", ex);
}
}
@Override
public void open(int taskNumber, int numTasks) throws IOException {
// 打开连接,由于不涉及数据分片之类的问题,所以这里不需要做什么操作
LOGGER.info("HBaseRichOutputFormat opened which taskNumber={}, numTasks={}", taskNumber, numTasks);
}
@Override
public void writeRecord(PutCollection data) throws IOException {
// TODO 批量写入数据,略
}
@Override
public void close() throws IOException {
// 结束的时候记得关闭连接(其实永远不会结束)
if (!conn.isClosed()) {
conn.close();
}
LOGGER.info("HBaseRichOutputFormat closed.");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment