Skip to content

Instantly share code, notes, and snippets.

@dungdm93
Last active February 11, 2020 02:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dungdm93/f108caa4e0701e33d4352446eef125e5 to your computer and use it in GitHub Desktop.
Save dungdm93/f108caa4e0701e33d4352446eef125e5 to your computer and use it in GitHub Desktop.
faster import nyc-taxi-data

PostgreSQL

Monitoring Progress of Index Construction

PostgreSQL 12+

SELECT 
  now()::TIME(0), 
  a.query, 
  p.phase, 
  p.blocks_total, 
  p.blocks_done, 
  p.tuples_total, 
  p.tuples_done
FROM pg_stat_progress_create_index p 
JOIN pg_stat_activity a ON p.pid = a.pid;

Determine the size of databases and tables

SELECT pg_size_pretty( pg_database_size('dbname') );
SELECT pg_size_pretty( pg_total_relation_size('tablename') );

Note:

  • Connect to PostgreSQL in host machine from docker container
$ docker run -it --rm \
    --name=transfer \
    --net=host \
    -v /opt:/opt \
    -v /etc/passwd:/etc/passwd:ro \
    -v /var/run/postgresql/:/var/run/postgresql/ \
    postgres:12 su postgres
$ psql

ClickHouse

Install ClickHouse

echo "deb http://repo.yandex.ru/clickhouse/deb/stable/ main/" > /etc/apt/sources.list.d/clickhouse.list
apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4
apt-get update
apt-get install -y clickhouse-client

Transfer data from PostgreSQL to ClickHouse

psql --dbname="nyc-taxi-data" --file="postgres-export-to-csv.sql" \
    | clickhouse-client --database="nyc-taxi" --query="INSERT INTO trips FORMAT CSVWithNames"

Dump data to Parquet

clickhouse-client --database="nyc-taxi" --query="SELECT * FROM trips FORMAT Parquet" > trips.parquet
CREATE TABLE trips (
trip_id UInt32,
vendor_id String,
pickup_datetime DateTime,
dropoff_datetime Nullable(DateTime),
store_and_fwd_flag Nullable(FixedString(1)),
rate_code_id Nullable(UInt8),
pickup_longitude Nullable(Float64),
pickup_latitude Nullable(Float64),
dropoff_longitude Nullable(Float64),
dropoff_latitude Nullable(Float64),
passenger_count Nullable(UInt8),
trip_distance Nullable(Float64),
fare_amount Nullable(Float32),
extra Nullable(Float32),
mta_tax Nullable(Float32),
tip_amount Nullable(Float32),
tolls_amount Nullable(Float32),
ehail_fee Nullable(Float32),
improvement_surcharge Nullable(Float32),
total_amount Nullable(Float32),
payment_type Nullable(String),
trip_type Nullable(UInt8),
pickup Nullable(String),
dropoff Nullable(String),
cab_type Nullable(String),
precipitation Nullable(Float32),
snow_depth Nullable(Float32),
snowfall Nullable(Float32),
max_temperature Nullable(Float32),
min_temperature Nullable(Float32),
wind Nullable(Float32),
pickup_nyct2010_gid Nullable(UInt8),
pickup_ctlabel Nullable(String),
pickup_borocode Nullable(UInt8),
pickup_boroname Nullable(String),
pickup_ct2010 Nullable(String),
pickup_boroct2010 Nullable(String),
pickup_cdeligibil Nullable(FixedString(1)),
pickup_ntacode Nullable(String),
pickup_ntaname Nullable(String),
pickup_puma Nullable(String),
dropoff_nyct2010_gid Nullable(UInt8),
dropoff_ctlabel Nullable(String),
dropoff_borocode Nullable(UInt8),
dropoff_boroname Nullable(String),
dropoff_ct2010 Nullable(String),
dropoff_boroct2010 Nullable(String),
dropoff_cdeligibil Nullable(FixedString(1)),
dropoff_ntacode Nullable(String),
dropoff_ntaname Nullable(String),
dropoff_puma Nullable(String)
) ENGINE = Log;
SELECT 'Exporting begin';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2009 INTO OUTFILE '/opt/nyc-taxi-2009' FORMAT Parquet;
SELECT 'Done: export 2009 trips to Parquet file "/opt/nyc-taxi-2009"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2010 INTO OUTFILE '/opt/nyc-taxi-2010' FORMAT Parquet;
SELECT 'Done: export 2010 trips to Parquet file "/opt/nyc-taxi-2010"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2011 INTO OUTFILE '/opt/nyc-taxi-2011' FORMAT Parquet;
SELECT 'Done: export 2011 trips to Parquet file "/opt/nyc-taxi-2011"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2012 INTO OUTFILE '/opt/nyc-taxi-2012' FORMAT Parquet;
SELECT 'Done: export 2012 trips to Parquet file "/opt/nyc-taxi-2012"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2013 INTO OUTFILE '/opt/nyc-taxi-2013' FORMAT Parquet;
SELECT 'Done: export 2013 trips to Parquet file "/opt/nyc-taxi-2013"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2014 INTO OUTFILE '/opt/nyc-taxi-2014' FORMAT Parquet;
SELECT 'Done: export 2014 trips to Parquet file "/opt/nyc-taxi-2014"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2015 INTO OUTFILE '/opt/nyc-taxi-2015' FORMAT Parquet;
SELECT 'Done: export 2015 trips to Parquet file "/opt/nyc-taxi-2015"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2016 INTO OUTFILE '/opt/nyc-taxi-2016' FORMAT Parquet;
SELECT 'Done: export 2016 trips to Parquet file "/opt/nyc-taxi-2016"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2017 INTO OUTFILE '/opt/nyc-taxi-2017' FORMAT Parquet;
SELECT 'Done: export 2017 trips to Parquet file "/opt/nyc-taxi-2017"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2018 INTO OUTFILE '/opt/nyc-taxi-2018' FORMAT Parquet;
SELECT 'Done: export 2018 trips to Parquet file "/opt/nyc-taxi-2018"';
SELECT * FROM trips WHERE toYear(pickup_datetime) = 2019 INTO OUTFILE '/opt/nyc-taxi-2019' FORMAT Parquet;
SELECT 'Done: export 2019 trips to Parquet file "/opt/nyc-taxi-2019"';
SELECT * FROM trips WHERE rand() <= 2770566 INTO OUTFILE '/opt/nyc-taxi-rand' FORMAT Parquet;
SELECT 'Done: export random trips to Parquet file "/opt/nyc-taxi-rand"';
-- 2770566 = SELECT POWER(2, 32 + 20) / COUNT(1) FROM trips;
#!/bin/bash
ls -ah1 data/*_tripdata_*.csv | xargs -n 1 -P 65 "./import_file.sh"
echo "Populate 'green_trips'"
psql "nyc-taxi-data" -f "setup_files/populate_green_trips.sql"
echo "Populate 'yellow_trips'"
psql "nyc-taxi-data" -f "setup_files/populate_yellow_trips.sql"
echo "Create 'pickup_datetime' index on 'trips'"
psql "nyc-taxi-data" -c "CREATE INDEX ON trips USING BRIN (pickup_datetime) WITH (pages_per_range = 32);"
echo "Populate 'fhv_trips'"
psql "nyc-taxi-data" -f "setup_files/populate_fhv_trips.sql"
echo "Create 'pickup_datetime' index on 'fhv_trips'"
psql "nyc-taxi-data" -c "CREATE INDEX ON fhv_trips USING BRIN (pickup_datetime) WITH (pages_per_range = 32);"
#!/bin/bash
filename=$1
regex="([[:alnum:]]+)_tripdata_([[:digit:]]{4})-([[:digit:]]{2})"
if [[ $filename =~ $regex ]]; then
type=${BASH_REMATCH[1]}
year=${BASH_REMATCH[2]}
month=$((10#${BASH_REMATCH[3]}))
else
echo >&2 "Pattern unrecognized file ${filename}"
exit 1
fi
green_schema_pre_2015="(vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,total_amount,payment_type,trip_type,junk1,junk2)"
green_schema_2015_h1="(vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2)"
green_schema_2015_h2_2016_h1="(vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type)"
green_schema_2016_h2="(vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_location_id,dropoff_location_id,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,junk1,junk2)"
green_schema_2017_h1="(vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_location_id,dropoff_location_id,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type)"
green_schema_2019_h1="(vendor_id,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_location_id,dropoff_location_id,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge)"
yellow_schema_pre_2015="(vendor_id,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,total_amount)"
yellow_schema_2015_2016_h1="(vendor_id,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,rate_code_id,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount)"
yellow_schema_2016_h2="(vendor_id,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,junk1,junk2)"
yellow_schema_2017_h1="(vendor_id,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount)"
yellow_schema_2019_h1="(vendor_id,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge)"
fhv_schema_pre_2017="(dispatching_base_num,pickup_datetime,pickup_location_id)"
fhv_schema_2017_h1="(dispatching_base_num,pickup_datetime,dropoff_datetime,pickup_location_id,dropoff_location_id)"
fhv_schema_2017_h2="(dispatching_base_num,pickup_datetime,dropoff_datetime,pickup_location_id,dropoff_location_id,shared_ride)"
fhv_schema_2018="(pickup_datetime,dropoff_datetime,pickup_location_id,dropoff_location_id,shared_ride,dispatching_base_num,junk)"
fhv_schema_2019="(dispatching_base_num,pickup_datetime,dropoff_datetime,pickup_location_id,dropoff_location_id,shared_ride)"
fhvhv_schema="(hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,pickup_location_id,dropoff_location_id,shared_ride)"
#################################################
function sql_green {
local schema=""
if [ $year -lt 2015 ]; then
schema=$green_schema_pre_2015
elif [ $year -eq 2015 ] && [ $month -lt 7 ]; then
schema=$green_schema_2015_h1
elif [ $year -eq 2015 ] || ([ $year -eq 2016 ] && [ $month -lt 7 ]); then
schema=$green_schema_2015_h2_2016_h1
elif [ $year -eq 2016 ] && [ $month -gt 6 ]; then
schema=$green_schema_2016_h2
elif [ $year -lt 2019 ]; then
schema=$green_schema_2017_h1
else
schema=$green_schema_2019_h1
fi
SQL="COPY green_tripdata_staging ${schema} FROM stdin WITH CSV HEADER;"
}
function sql_yellow {
local schema=""
if [ $year -lt 2015 ]; then
schema=$yellow_schema_pre_2015
elif [ $year -eq 2015 ] || ([ $year -eq 2016 ] && [ $month -lt 7 ]); then
schema=$yellow_schema_2015_2016_h1
elif [ $year -eq 2016 ] && [ $month -gt 6 ]; then
schema=$yellow_schema_2016_h2
elif [ $year -lt 2019 ]; then
schema=$yellow_schema_2017_h1
else
schema=$yellow_schema_2019_h1
fi
SQL="COPY yellow_tripdata_staging ${schema} FROM stdin WITH CSV HEADER;"
}
function sql_fhv {
local schema=""
if [ $year -lt 2017 ]; then
schema=$fhv_schema_pre_2017
elif [ $year -eq 2017 ] && [ $month -lt 7 ]; then
schema=$fhv_schema_2017_h1
elif [ $year -eq 2017 ]; then
schema=$fhv_schema_2017_h2
elif [ $year -eq 2018 ]; then
schema=$fhv_schema_2018
else
schema=$fhv_schema_2019
fi
SQL="COPY fhv_trips_staging ${schema} FROM stdin WITH CSV HEADER;"
}
function sql_fhvhv {
local schema="${fhvhv_schema}"
SQL="COPY fhv_trips_staging ${schema} FROM stdin WITH CSV HEADER;"
}
#################################################
SQL=""
"sql_${type}"
echo "$(date -Isecond): beginning raw load for ${filename}"
sed $'s/\r$//' $filename | sed '/^\s*$/d' | psql "nyc-taxi-data" -c "${SQL}"
echo "$(date -Isecond): finished raw load for ${filename}"
COPY (
SELECT
trips.id,
trips.vendor_id,
trips.pickup_datetime,
trips.dropoff_datetime,
trips.store_and_fwd_flag,
trips.rate_code_id,
trips.pickup_longitude,
trips.pickup_latitude,
trips.dropoff_longitude,
trips.dropoff_latitude,
trips.passenger_count,
trips.trip_distance,
trips.fare_amount,
trips.extra,
trips.mta_tax,
trips.tip_amount,
trips.tolls_amount,
trips.ehail_fee,
trips.improvement_surcharge,
trips.total_amount,
trips.payment_type,
trips.trip_type,
zone_pick_up.zone pickup,
zone_drop_off.zone dropoff,
cab_types.type cab_type,
weather.precipitation,
weather.snow_depth,
weather.snowfall,
weather.max_temperature,
weather.min_temperature,
weather.average_wind_speed wind,
pick_up.gid pickup_nyct2010_gid,
pick_up.ctlabel pickup_ctlabel,
pick_up.borocode pickup_borocode,
pick_up.boroname pickup_boroname,
pick_up.ct2010 pickup_ct2010,
pick_up.boroct2010 pickup_boroct2010,
pick_up.cdeligibil pickup_cdeligibil,
pick_up.ntacode pickup_ntacode,
pick_up.ntaname pickup_ntaname,
pick_up.puma pickup_puma,
drop_off.gid dropoff_nyct2010_gid,
drop_off.ctlabel dropoff_ctlabel,
drop_off.borocode dropoff_borocode,
drop_off.boroname dropoff_boroname,
drop_off.ct2010 dropoff_ct2010,
drop_off.boroct2010 dropoff_boroct2010,
drop_off.cdeligibil dropoff_cdeligibil,
drop_off.ntacode dropoff_ntacode,
drop_off.ntaname dropoff_ntaname,
drop_off.puma dropoff_puma
FROM trips
LEFT JOIN taxi_zones zone_pick_up
ON trips.pickup_location_id = zone_pick_up.locationid
LEFT JOIN taxi_zones zone_drop_off
ON trips.dropoff_location_id = zone_drop_off.locationid
LEFT JOIN cab_types
ON trips.cab_type_id = cab_types.id
LEFT JOIN central_park_weather_observations weather
ON weather.date = trips.pickup_datetime::date
LEFT JOIN nyct2010 pick_up
ON pick_up.gid = trips.pickup_nyct2010_gid
LEFT JOIN nyct2010 drop_off
ON drop_off.gid = trips.dropoff_nyct2010_gid
) TO STDOUT WITH CSV HEADER;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment