Created
April 27, 2018 00:08
-
-
Save ctslater/453e4486f136ea4c80b5ca36cd01d8c6 to your computer and use it in GitHub Desktop.
Gaia ingest into spark
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import os\n", | |
"\n", | |
"from pyspark.sql.types import *" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def spark_start(project_path, metastore=None):\n", | |
" from pyspark.sql import SparkSession\n", | |
"\n", | |
" warehouse_location = os.path.join(project_path, 'spark-warehouse')\n", | |
"\n", | |
" local_dir = os.path.join(project_path, 'spark-tmp')\n", | |
"\n", | |
" spark = ( \n", | |
" SparkSession.builder\n", | |
" .appName(\"LSD2\")\n", | |
" .config(\"spark.sql.warehouse.dir\", warehouse_location)\n", | |
" .config('spark.master', \"local[12]\")\n", | |
" .config('spark.driver.memory', '16G') # 128\n", | |
" .config('spark.local.dir', local_dir)\n", | |
" .config('spark.memory.offHeap.enabled', 'true')\n", | |
" .config('spark.memory.offHeap.size', '16G') # 256\n", | |
" .config(\"spark.sql.execution.arrow.enabled\", \"true\")\n", | |
" .config(\"spark.driver.maxResultSize\", \"16G\")\n", | |
" .config(\"spark.driver.extraJavaOptions\", f\"-Dderby.system.home={metastore}\")\n", | |
" .enableHiveSupport()\n", | |
" .getOrCreate()\n", | |
" ) \n", | |
"\n", | |
" return spark" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"root_dir = \"/epyc/users/ctslater\"\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"spark = spark_start(root_dir, metastore=os.path.join(root_dir, 'metastore_db')" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": null, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"#\n", | |
"# These types are not authoritative at all, just reasonable judgement.\n", | |
"#\n", | |
"col_names = [\"solution_id\", \"designation\", \"source_id\", \"random_index\", \"ref_epoch\", \"ra\",\n", | |
" \"ra_error\", \"dec\", \"dec_error\", \"parallax\", \"parallax_error\",\n", | |
" \"parallax_over_error\", \"pmra\", \"pmra_error\", \"pmdec\", \"pmdec_error\",\n", | |
" \"ra_dec_corr\", \"ra_parallax_corr\", \"ra_pmra_corr\", \"ra_pmdec_corr\",\n", | |
" \"dec_parallax_corr\", \"dec_pmra_corr\", \"dec_pmdec_corr\", \"parallax_pmra_corr\",\n", | |
" \"parallax_pmdec_corr\", \"pmra_pmdec_corr\", \"astrometric_n_obs_al\",\n", | |
" \"astrometric_n_obs_ac\", \"astrometric_n_good_obs_al\", \"astrometric_n_bad_obs_al\",\n", | |
" \"astrometric_gof_al\", \"astrometric_chi2_al\", \"astrometric_excess_noise\",\n", | |
" \"astrometric_excess_noise_sig\", \"astrometric_params_solved\",\n", | |
" \"astrometric_primary_flag\", \"astrometric_weight_al\",\n", | |
" \"astrometric_pseudo_colour\", \"astrometric_pseudo_colour_error\",\n", | |
" \"mean_varpi_factor_al\", \"astrometric_matched_observations\",\n", | |
" \"visibility_periods_used\", \"astrometric_sigma5d_max\",\n", | |
" \"frame_rotator_object_type\", \"matched_observations\", \"duplicated_source\",\n", | |
" \"phot_g_n_obs\", \"phot_g_mean_flux\", \"phot_g_mean_flux_error\",\n", | |
" \"phot_g_mean_flux_over_error\", \"phot_g_mean_mag\", \"phot_bp_n_obs\",\n", | |
" \"phot_bp_mean_flux\", \"phot_bp_mean_flux_error\", \"phot_bp_mean_flux_over_error\",\n", | |
" \"phot_bp_mean_mag\", \"phot_rp_n_obs\", \"phot_rp_mean_flux\",\n", | |
" \"phot_rp_mean_flux_error\", \"phot_rp_mean_flux_over_error\", \"phot_rp_mean_mag\",\n", | |
" \"phot_bp_rp_excess_factor\", \"phot_proc_mode\", \"bp_rp\", \"bp_g\", \"g_rp\",\n", | |
" \"radial_velocity\", \"radial_velocity_error\", \"rv_nb_transits\",\n", | |
" \"rv_template_teff\", \"rv_template_logg\", \"rv_template_fe_h\",\n", | |
" \"phot_variable_flag\", \"l\", \"b\", \"ecl_lon\", \"ecl_lat\", \"priam_flags\", \"teff_val\",\n", | |
" \"teff_percentile_lower\", \"teff_percentile_upper\", \"a_g_val\",\n", | |
" \"a_g_percentile_lower\", \"a_g_percentile_upper\", \"e_bp_min_rp_val\",\n", | |
" \"e_bp_min_rp_percentile_lower\", \"e_bp_min_rp_percentile_upper\", \"flame_flags\",\n", | |
" \"radius_val\", \"radius_percentile_lower\", \"radius_percentile_upper\", \"lum_val\",\n", | |
" \"lum_percentile_lower\", \"lum_percentile_upper\"]\n", | |
"\n", | |
"col_types = [LongType(), StringType(), LongType(), LongType(), FloatType(), DoubleType(),\n", | |
" FloatType(), DoubleType(), FloatType(), FloatType(), FloatType(), FloatType(),\n", | |
" FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n", | |
" FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n", | |
" FloatType(), FloatType(), IntegerType(), IntegerType(), IntegerType(),\n", | |
" IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(),\n", | |
" IntegerType(), StringType(), FloatType(), FloatType(), FloatType(), FloatType(),\n", | |
" IntegerType(), IntegerType(), FloatType(), IntegerType(), IntegerType(),\n", | |
" StringType(), IntegerType(), DoubleType(), FloatType(), FloatType(),\n", | |
" FloatType(), IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(),\n", | |
" IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n", | |
" IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n", | |
" IntegerType(), FloatType(), FloatType(), FloatType(), StringType(),\n", | |
" DoubleType(), DoubleType(), DoubleType(), DoubleType(), IntegerType(),\n", | |
" FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n", | |
" FloatType(), FloatType(), FloatType(), IntegerType(), FloatType(), FloatType(),\n", | |
" FloatType(), FloatType(), FloatType(), FloatType() ]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"fields = [StructField(field_name, type_class, True) for field_name, type_class in zip(col_names, col_types)]\n", | |
"schema = StructType(fields)\n", | |
"\n", | |
"single_filename = \"/data/epyc/data/gaia_dr2_csv/gaia_source/GaiaSource_1763645856731413248_1763758698407549184.csv.gz\"\n", | |
"df = spark.read.load(single_filename, format=\"csv\", schema=schema, infer_schema=False, header=\"true\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 25, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 15.7 ms, sys: 2.36 ms, total: 18 ms\n", | |
"Wall time: 34.9 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"\n", | |
"gaia_wildcard = \"/data/epyc/data/gaia_dr2_csv/gaia_source/GaiaSource_*.csv.gz\"\n", | |
"df_all = spark.read.load(gaia_wildcard, format=\"csv\", schema=schema, infer_schema=False, header=\"true\")" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 26, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 1.79 s, sys: 308 ms, total: 2.1 s\n", | |
"Wall time: 2h 26min 8s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time \n", | |
"\n", | |
"df_all.write.format(\"parquet\").save(os.path.join(root_dir, \"gaia_dr2.parquet\"))" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.6.4" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment