Last active
May 26, 2021 10:32
-
-
Save kzzzr/f82e1511e8c38aa7d5c352a0ce308868 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
## Инициализация | |
# устанавливаем проект | |
gcloud projects list | |
gcloud config set project <my-project> | |
# выбираем зону для кластера, например, europe-west3-a | |
gcloud compute regions list | |
gcloud compute zones list | |
# устанавливаем переменные среды | |
export REGION=europe-west3 | |
export ZONE=europe-west3-a | |
export PROJECT=$(gcloud info --format='value(config.project)') | |
export BUCKET_NAME=${PROJECT}-warehouse | |
# создаем бакет в google storage | |
gsutil mb -l ${REGION} gs://${BUCKET_NAME} | |
gsutil ls | |
# создадим инстанс MySQL для Hive Metastore | |
# может занять 3-5 минут | |
gcloud sql instances create hive-metastore-mysql \ | |
--database-version="MYSQL_5_7" \ | |
--activation-policy=ALWAYS \ | |
--zone $ZONE | |
gcloud sql instances list | |
## Создаем кластер Dataproc | |
gcloud config set compute/zone $ZONE | |
# Убедиться, что в строке --metadata в конце указано верное имя инстанса MySQL (предыдущий шаг) | |
# Можно создать кластер из одной или нескольких нод | |
# Для этого указать либо '--single-node' либо '--num-workers=2' | |
gcloud dataproc clusters create hive-cluster \ | |
--region=$REGION \ | |
--scopes cloud-platform \ | |
--image-version 1.3 \ | |
--bucket=$BUCKET_NAME \ | |
--master-machine-type=n1-standard-1 \ | |
--num-workers=2 \ | |
--worker-machine-type=n1-standard-1 \ | |
--optional-components=PRESTO \ | |
--initialization-actions gs://goog-dataproc-initialization-actions-${REGION}/cloud-sql-proxy/cloud-sql-proxy.sh \ | |
--properties hive:hive.metastore.warehouse.dir=gs://${PROJECT}-warehouse/datasets \ | |
--metadata "hive-metastore-instance=${PROJECT}:${REGION}:hive-metastore-mysql" | |
# загрузить датасет | |
gsutil cp gs://hive-solution/part-00000.parquet \ | |
gs://${PROJECT}-warehouse/datasets/transactions/part-00000.parquet | |
gsutil ls -lr gs://${BUCKET_NAME}/datasets/ | |
# создать external Hive table | |
gcloud dataproc jobs submit hive \ | |
--cluster hive-cluster \ | |
--execute " | |
CREATE EXTERNAL TABLE transactions | |
(SubmissionDate DATE, TransactionAmount DOUBLE, TransactionType STRING) | |
STORED AS PARQUET | |
LOCATION 'gs://${PROJECT}-warehouse/datasets/transactions';" \ | |
--region $REGION | |
## запуск запросов с помощью jobs API: | |
gcloud dataproc jobs submit hive \ | |
--cluster hive-cluster \ | |
--execute " | |
SELECT * | |
FROM transactions | |
LIMIT 10;" \ | |
--region $REGION \ | |
--quiet | |
## интерактивный режим в Hive CLI beeline | |
# подключимся к мастер-ноде | |
gcloud compute ssh hive-cluster-m | |
beeline -u "jdbc:hive2://hive-cluster-m:10000" | |
# запустим запросы | |
!tables | |
!columns transactions | |
SELECT TransactionType, AVG(TransactionAmount) AS AverageAmount | |
FROM transactions | |
WHERE SubmissionDate = '2017-12-22' | |
GROUP BY TransactionType; | |
# выйдем из beeline | |
!quit | |
## запустим запросы с помощью pyspark | |
pyspark | |
# контекст и запрос | |
from pyspark.sql import HiveContext | |
hc = HiveContext(sc) | |
hc.sql(""" | |
SELECT SubmissionDate, AVG(TransactionAmount) as AvgDebit | |
FROM transactions | |
WHERE TransactionType = 'debit' | |
GROUP BY SubmissionDate | |
HAVING SubmissionDate >= '2017-10-01' AND SubmissionDate < '2017-10-06' | |
ORDER BY SubmissionDate | |
""").show() | |
exit() | |
# покинем сессию ssh и мастер-ноду | |
exit | |
## Выгрузим второй датасет из BigQuery - chicago_taxi_trips | |
bq --location=us extract --destination_format=CSV \ | |
--field_delimiter=',' --print_header=false \ | |
"bigquery-public-data:chicago_taxi_trips.taxi_trips" \ | |
gs://${BUCKET_NAME}/chicago_taxi_trips/csv/shard-*.csv | |
# посмотрим на список выгруженных файлов (72 GiB) | |
gsutil ls -rl gs://${BUCKET_NAME}/chicago_taxi_trips/csv/ | |
# удалим часть файлов для ускорения работы | |
gsutil ls gs://${BUCKET_NAME}/chicago_taxi_trips/csv/ | head -261 | xargs gsutil rm # для single-node head -281 | |
gsutil ls -rl gs://${BUCKET_NAME}/chicago_taxi_trips/csv/ # 7.71 GiB | |
# Создадим Hive external table для chicago_taxi_trips_csv | |
gcloud dataproc jobs submit hive \ | |
--cluster hive-cluster \ | |
--region=${REGION} \ | |
--execute " | |
CREATE EXTERNAL TABLE chicago_taxi_trips_csv( | |
unique_key STRING, | |
taxi_id STRING, | |
trip_start_timestamp TIMESTAMP, | |
trip_end_timestamp TIMESTAMP, | |
trip_seconds INT, | |
trip_miles FLOAT, | |
pickup_census_tract INT, | |
dropoff_census_tract INT, | |
pickup_community_area INT, | |
dropoff_community_area INT, | |
fare FLOAT, | |
tips FLOAT, | |
tolls FLOAT, | |
extras FLOAT, | |
trip_total FLOAT, | |
payment_type STRING, | |
company STRING, | |
pickup_latitude FLOAT, | |
pickup_longitude FLOAT, | |
pickup_location STRING, | |
dropoff_latitude FLOAT, | |
dropoff_longitude FLOAT, | |
dropoff_location STRING) | |
ROW FORMAT DELIMITED | |
FIELDS TERMINATED BY ',' | |
STORED AS TEXTFILE | |
location 'gs://${BUCKET_NAME}/chicago_taxi_trips/csv/';" | |
# убедимся в создании таблицы и ее работе (20 млн записей) | |
gcloud dataproc jobs submit hive \ | |
--cluster hive-cluster \ | |
--region=${REGION} \ | |
--execute "SELECT COUNT(*) FROM chicago_taxi_trips_csv;" | |
# создадим таблицу в формате parquet | |
gcloud dataproc jobs submit hive \ | |
--cluster hive-cluster \ | |
--region=${REGION} \ | |
--execute " | |
CREATE EXTERNAL TABLE chicago_taxi_trips_parquet( | |
unique_key STRING, | |
taxi_id STRING, | |
trip_start_timestamp TIMESTAMP, | |
trip_end_timestamp TIMESTAMP, | |
trip_seconds INT, | |
trip_miles FLOAT, | |
pickup_census_tract INT, | |
dropoff_census_tract INT, | |
pickup_community_area INT, | |
dropoff_community_area INT, | |
fare FLOAT, | |
tips FLOAT, | |
tolls FLOAT, | |
extras FLOAT, | |
trip_total FLOAT, | |
payment_type STRING, | |
company STRING, | |
pickup_latitude FLOAT, | |
pickup_longitude FLOAT, | |
pickup_location STRING, | |
dropoff_latitude FLOAT, | |
dropoff_longitude FLOAT, | |
dropoff_location STRING) | |
STORED AS PARQUET | |
location 'gs://${BUCKET_NAME}/chicago_taxi_trips/parquet/';" | |
# запишем данные в таблицу формата parquet (примерно 7 минут) | |
gcloud dataproc jobs submit hive \ | |
--cluster hive-cluster \ | |
--region=${REGION} \ | |
--execute " | |
INSERT OVERWRITE TABLE chicago_taxi_trips_parquet | |
SELECT * FROM chicago_taxi_trips_csv;" | |
# убедимся, что там такое же количество строк | |
gcloud dataproc jobs submit hive \ | |
--cluster hive-cluster \ | |
--region=${REGION} \ | |
--execute "SELECT COUNT(*) FROM chicago_taxi_trips_parquet;" | |
# ssh | |
gcloud compute ssh hive-cluster-m | |
# на этот раз используем движок presto для sql-запросов | |
presto --catalog hive --schema default | |
show tables; | |
# запустим запросы. Эти же запросы можно запускать в Hive CLI (Beeling) | |
select count(*) from chicago_taxi_trips_csv where trip_miles > 50; # 0:42 [20.7M rows, 7.71GB] [490K rows/s, 187MB/s] | |
select count(*) from chicago_taxi_trips_parquet where trip_miles > 50; # 0:11 [20.7M rows, 52.2MB] [1.89M rows/s, 4.76MB/s] | |
select distinct company, payment_type from chicago_taxi_trips_csv limit 10; # 0:02 [228K rows, 87.5MB] [137K rows/s, 52.3MB/s] | |
select distinct company, payment_type from chicago_taxi_trips_parquet limit 10; # 0:01 [8.19K rows, 1.93MB] [6.21K rows/s, 1.46MB/s] | |
select company, payment_type, avg(fare), count(tips), sum(trip_total) | |
from chicago_taxi_trips_csv where trip_miles > 10 | |
group by 1, 2 order by 5 desc limit 10 ; # 0:40 [20.7M rows, 7.71GB] [519K rows/s, 198MB/s] | |
select company, payment_type, avg(fare), count(tips), sum(trip_total) | |
from chicago_taxi_trips_parquet where trip_miles > 10 | |
group by 1, 2 order by 5 desc limit 10 ; # 0:10 [20.7M rows, 154MB] [2M rows/s, 14.9MB/s] | |
# покинуть сессию presto | |
exit | |
## Посмотрим внутрь Hive Metastore | |
gcloud sql connect hive-metastore-mysql --user=root # без пароля (ENTER) | |
SHOW DATABASES; | |
USE hive_metastore; | |
SHOW TABLES; | |
# bucket location в metastore | |
SELECT DB_LOCATION_URI FROM DBS; | |
# список таблиц | |
SELECT TBL_NAME, TBL_TYPE FROM TBLS; | |
# список колонок таблицы | |
SELECT COLUMN_NAME, TYPE_NAME | |
FROM COLUMNS_V2 c, TBLS t | |
WHERE c.CD_ID = t.SD_ID AND t.TBL_NAME = 'chicago_taxi_trips_parquet'; | |
# формат файла и расположение | |
SELECT INPUT_FORMAT, LOCATION | |
FROM SDS s, TBLS t | |
WHERE s.SD_ID = t.SD_ID ; | |
# покинуть сессию mysql | |
exit | |
## Освободить ресурсы, удалить инстансы и виртуалки | |
gcloud dataproc clusters delete hive-cluster --region=$REGION --quiet | |
gcloud sql instances delete hive-metastore-mysql --quiet | |
gsutil rm -r gs://${BUCKET_NAME} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
У меня поля trip_start_timestamp, trip_end_timestamp автоматически в timestamp не преобразовались.
Решил проблему так:
SELECT
unique_key,
taxi_id,
to_utc_timestamp(substr(trip_start_timestamp,1,19), 'UTC') as trip_start_timestamp,
to_utc_timestamp(substr(trip_end_timestamp,1,19), 'UTC') as trip_end_timestamp,
trip_seconds,
trip_miles,
pickup_census_tract,
dropoff_census_tract,
pickup_community_area,
dropoff_community_area,
fare,
tips,
tolls,
extras,
trip_total,
payment_type,
company,
pickup_latitude,
pickup_longitude,
pickup_location,
dropoff_latitude,
dropoff_longitude,
dropoff_location
FROM chicago_taxi_trips_csv