Last active
May 19, 2020 11:17
-
-
Save rajkrrsingh/c021153cb2ee3637e29b7d7e6e36a173 to your computer and use it in GitHub Desktop.
a quick start guide to query kafka topic from hive table
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#### ENV: HDP-3.1 | |
#### Data setup: | |
``` | |
cat sample-data.json | |
{"name": "Raj","address": {"a": "b","c": "d","e": "f"}} | |
{"name": "Raj1","address": {"a": "bb","c": "dd","e": "ff"}} | |
``` | |
#### Create topic in Kafka and Ingest data into it. | |
``` | |
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper `hostname`:2181 --replication-factor 1 --partitions 1 --topic dummytopic1 | |
cat sample-data.json | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list `hostname -f`:6667 --topic dummytopic1 | |
``` | |
#### Create Hive table: | |
``` | |
CREATE EXTERNAL TABLE kafka_test_table (name string, address struct<a:string,c:string,e:string>) STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' TBLPROPERTIES ("kafka.topic" = "dummytopic1" , "kafka.bootstrap.servers" = "hdp31a:6667" , "kafka.consumer.security.protocol" = "PLAINTEXT"); | |
-- show create table | |
+----------------------------------------------------+ | |
| createtab_stmt | | |
+----------------------------------------------------+ | |
| CREATE EXTERNAL TABLE `kafka_test_table`( | | |
| `name` string COMMENT 'from deserializer', | | |
| `address` struct<a:string,c:string,e:string> COMMENT 'from deserializer', | | |
| `__key` binary COMMENT 'from deserializer', | | |
| `__partition` int COMMENT 'from deserializer', | | |
| `__offset` bigint COMMENT 'from deserializer', | | |
| `__timestamp` bigint COMMENT 'from deserializer') | | |
| ROW FORMAT SERDE | | |
| 'org.apache.hadoop.hive.kafka.KafkaSerDe' | | |
| STORED BY | | |
| 'org.apache.hadoop.hive.kafka.KafkaStorageHandler' | | |
| WITH SERDEPROPERTIES ( | | |
| 'serialization.format'='1') | | |
| LOCATION | | |
| 'hdfs://hdp31a.hdp.local:8020/warehouse/tablespace/external/hive/kafka_test_table' | | |
| TBLPROPERTIES ( | | |
| 'bucketing_version'='2', | | |
| 'hive.kafka.max.retries'='6', | | |
| 'hive.kafka.metadata.poll.timeout.ms'='30000', | | |
| 'hive.kafka.optimistic.commit'='false', | | |
| 'hive.kafka.poll.timeout.ms'='5000', | | |
| 'kafka.bootstrap.servers'='hdp31a:6667', | | |
| 'kafka.consumer.security.protocol'='PLAINTEXT', | | |
| 'kafka.serde.class'='org.apache.hadoop.hive.serde2.JsonSerDe', | | |
| 'kafka.topic'='dummytopic1', | | |
| 'kafka.write.semantic'='AT_LEAST_ONCE', | | |
| 'transient_lastDdlTime'='1553536265') | | |
+----------------------------------------------------+ | |
``` | |
#### Query table: | |
``` | |
-- select * from kafka_test_table; | |
+------------------------+-------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+ | |
| kafka_test_table.name | kafka_test_table.address | kafka_test_table.__key | kafka_test_table.__partition | kafka_test_table.__offset | kafka_test_table.__timestamp | | |
+------------------------+-------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+ | |
| Raj | {"a":"b","c":"d","e":"f"} | NULL | 0 | 0 | 1553536022213 | | |
| Raj1 | {"a":"bb","c":"dd","e":"ff"} | NULL | 0 | 1 | 1553536464796 | | |
+------------------------+-------------------------------+-------------------------+-------------------------------+----------------------------+-------------------------------+ | |
2 rows selected (0.549 seconds) | |
-- select name,address.a,address.c,address.e from kafka_test_table; | |
+-------+-----+-----+-----+ | |
| name | a | c | e | | |
+-------+-----+-----+-----+ | |
| Raj | b | d | f | | |
| Raj1 | bb | dd | ff | | |
+-------+-----+-----+-----+ | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment