Last active
April 10, 2019 02:06
-
-
Save zhaoawd/33b98cf7c0f4be4a9a49d2fc6e078d3c to your computer and use it in GitHub Desktop.
[HBase 数据持久化] #flink #hbase
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.collect.Lists; | |
import org.apache.flink.api.common.io.RichOutputFormat; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.hadoop.hbase.TableName; | |
import org.apache.hadoop.hbase.client.Connection; | |
import org.apache.hadoop.hbase.client.ConnectionFactory; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.client.Table; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Map; | |
/** | |
* HBase 数据持久化 | |
* | |
* @author yuanyifan | |
*/ | |
public class HBaseRichOutputFormat extends RichOutputFormat<PutCollection> { | |
private static final Logger LOGGER = LoggerFactory.getLogger(HBaseRichOutputFormat.class); | |
private Connection conn; | |
@Override | |
public void configure(Configuration parameters) { | |
// 初始化连接 | |
try { | |
conn = ConnectionFactory.createConnection( | |
// TODO 这里根据你的需要加载配置 | |
); | |
LOGGER.info("HBase connection established."); | |
} catch (Exception ex) { | |
LOGGER.error("Error while creating HBase connection from configure.", ex); | |
} | |
} | |
@Override | |
public void open(int taskNumber, int numTasks) throws IOException { | |
// 打开连接,由于不涉及数据分片之类的问题,所以这里不需要做什么操作 | |
LOGGER.info("HBaseRichOutputFormat opened which taskNumber={}, numTasks={}", taskNumber, numTasks); | |
} | |
@Override | |
public void writeRecord(PutCollection data) throws IOException { | |
// TODO 批量写入数据,略 | |
} | |
@Override | |
public void close() throws IOException { | |
// 结束的时候记得关闭连接(其实永远不会结束) | |
if (!conn.isClosed()) { | |
conn.close(); | |
} | |
LOGGER.info("HBaseRichOutputFormat closed."); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment