Skip to content

Instantly share code, notes, and snippets.

@rajkrrsingh
Last active May 19, 2020 11:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save rajkrrsingh/c021153cb2ee3637e29b7d7e6e36a173 to your computer and use it in GitHub Desktop.
Save rajkrrsingh/c021153cb2ee3637e29b7d7e6e36a173 to your computer and use it in GitHub Desktop.
a quick start guide to query kafka topic from hive table
#### ENV: HDP-3.1
#### Data setup:
```
cat sample-data.json
{"name": "Raj","address": {"a": "b","c": "d","e": "f"}}
{"name": "Raj1","address": {"a": "bb","c": "dd","e": "ff"}}
```
#### Create topic in Kafka and Ingest data into it.
```
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic dummytopic1
cat sample-data.json | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list `hostname -f`:6667 --topic dummytopic1
```
#### Create Hive table:
```
CREATE EXTERNAL TABLE kafka_test_table (name string, address struct<a:string,c:string,e:string>) STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' TBLPROPERTIES ("kafka.topic" = "dummytopic1" , "kafka.bootstrap.servers" = "hdp31a:6667" , "kafka.consumer.security.protocol" = "PLAINTEXT");
-- show create table
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE EXTERNAL TABLE `kafka_test_table`( |
| `name` string COMMENT 'from deserializer', |
| `address` struct<a:string,c:string,e:string> COMMENT 'from deserializer', |
| `__key` binary COMMENT 'from deserializer', |
| `__partition` int COMMENT 'from deserializer', |
| `__offset` bigint COMMENT 'from deserializer', |
| `__timestamp` bigint COMMENT 'from deserializer') |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.kafka.KafkaSerDe' |
| STORED BY |
| 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' |
| WITH SERDEPROPERTIES ( |
| 'serialization.format'='1') |
| LOCATION |
| 'hdfs://hdp31a.hdp.local:8020/warehouse/tablespace/external/hive/kafka_test_table' |
| TBLPROPERTIES ( |
| 'bucketing_version'='2', |
| 'hive.kafka.max.retries'='6', |
| 'hive.kafka.metadata.poll.timeout.ms'='30000', |
| 'hive.kafka.optimistic.commit'='false', |
| 'hive.kafka.poll.timeout.ms'='5000', |
| 'kafka.bootstrap.servers'='hdp31a:6667', |
| 'kafka.consumer.security.protocol'='PLAINTEXT', |
| 'kafka.serde.class'='org.apache.hadoop.hive.serde2.JsonSerDe', |
| 'kafka.topic'='dummytopic1', |
| 'kafka.write.semantic'='AT_LEAST_ONCE', |
| 'transient_lastDdlTime'='1553536265') |
+----------------------------------------------------+
```
#### Query table:
```
-- select * from kafka_test_table;
+------------------------+-------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+
| kafka_test_table.name | kafka_test_table.address | kafka_test_table.__key | kafka_test_table.__partition | kafka_test_table.__offset | kafka_test_table.__timestamp |
+------------------------+-------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+
| Raj | {"a":"b","c":"d","e":"f"} | NULL | 0 | 0 | 1553536022213 |
| Raj1 | {"a":"bb","c":"dd","e":"ff"} | NULL | 0 | 1 | 1553536464796 |
+------------------------+-------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+
2 rows selected (0.549 seconds)
-- select name,address.a,address.c,address.e from kafka_test_table;
+-------+-----+-----+-----+
| name | a | c | e |
+-------+-----+-----+-----+
| Raj | b | d | f |
| Raj1 | bb | dd | ff |
+-------+-----+-----+-----+
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment