Skip to content

Instantly share code, notes, and snippets.

@ctslater
Created April 27, 2018 00:08
Show Gist options
  • Save ctslater/453e4486f136ea4c80b5ca36cd01d8c6 to your computer and use it in GitHub Desktop.
Save ctslater/453e4486f136ea4c80b5ca36cd01d8c6 to your computer and use it in GitHub Desktop.
Gaia ingest into spark
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"\n",
"from pyspark.sql.types import *"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"def spark_start(project_path, metastore=None):\n",
" from pyspark.sql import SparkSession\n",
"\n",
" warehouse_location = os.path.join(project_path, 'spark-warehouse')\n",
"\n",
" local_dir = os.path.join(project_path, 'spark-tmp')\n",
"\n",
" spark = ( \n",
" SparkSession.builder\n",
" .appName(\"LSD2\")\n",
" .config(\"spark.sql.warehouse.dir\", warehouse_location)\n",
" .config('spark.master', \"local[12]\")\n",
" .config('spark.driver.memory', '16G') # 128\n",
" .config('spark.local.dir', local_dir)\n",
" .config('spark.memory.offHeap.enabled', 'true')\n",
" .config('spark.memory.offHeap.size', '16G') # 256\n",
" .config(\"spark.sql.execution.arrow.enabled\", \"true\")\n",
" .config(\"spark.driver.maxResultSize\", \"16G\")\n",
" .config(\"spark.driver.extraJavaOptions\", f\"-Dderby.system.home={metastore}\")\n",
" .enableHiveSupport()\n",
" .getOrCreate()\n",
" ) \n",
"\n",
" return spark"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"root_dir = \"/epyc/users/ctslater\"\n"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"spark = spark_start(root_dir, metastore=os.path.join(root_dir, 'metastore_db')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#\n",
"# These types are not authoritative at all, just reasonable judgement.\n",
"#\n",
"col_names = [\"solution_id\", \"designation\", \"source_id\", \"random_index\", \"ref_epoch\", \"ra\",\n",
" \"ra_error\", \"dec\", \"dec_error\", \"parallax\", \"parallax_error\",\n",
" \"parallax_over_error\", \"pmra\", \"pmra_error\", \"pmdec\", \"pmdec_error\",\n",
" \"ra_dec_corr\", \"ra_parallax_corr\", \"ra_pmra_corr\", \"ra_pmdec_corr\",\n",
" \"dec_parallax_corr\", \"dec_pmra_corr\", \"dec_pmdec_corr\", \"parallax_pmra_corr\",\n",
" \"parallax_pmdec_corr\", \"pmra_pmdec_corr\", \"astrometric_n_obs_al\",\n",
" \"astrometric_n_obs_ac\", \"astrometric_n_good_obs_al\", \"astrometric_n_bad_obs_al\",\n",
" \"astrometric_gof_al\", \"astrometric_chi2_al\", \"astrometric_excess_noise\",\n",
" \"astrometric_excess_noise_sig\", \"astrometric_params_solved\",\n",
" \"astrometric_primary_flag\", \"astrometric_weight_al\",\n",
" \"astrometric_pseudo_colour\", \"astrometric_pseudo_colour_error\",\n",
" \"mean_varpi_factor_al\", \"astrometric_matched_observations\",\n",
" \"visibility_periods_used\", \"astrometric_sigma5d_max\",\n",
" \"frame_rotator_object_type\", \"matched_observations\", \"duplicated_source\",\n",
" \"phot_g_n_obs\", \"phot_g_mean_flux\", \"phot_g_mean_flux_error\",\n",
" \"phot_g_mean_flux_over_error\", \"phot_g_mean_mag\", \"phot_bp_n_obs\",\n",
" \"phot_bp_mean_flux\", \"phot_bp_mean_flux_error\", \"phot_bp_mean_flux_over_error\",\n",
" \"phot_bp_mean_mag\", \"phot_rp_n_obs\", \"phot_rp_mean_flux\",\n",
" \"phot_rp_mean_flux_error\", \"phot_rp_mean_flux_over_error\", \"phot_rp_mean_mag\",\n",
" \"phot_bp_rp_excess_factor\", \"phot_proc_mode\", \"bp_rp\", \"bp_g\", \"g_rp\",\n",
" \"radial_velocity\", \"radial_velocity_error\", \"rv_nb_transits\",\n",
" \"rv_template_teff\", \"rv_template_logg\", \"rv_template_fe_h\",\n",
" \"phot_variable_flag\", \"l\", \"b\", \"ecl_lon\", \"ecl_lat\", \"priam_flags\", \"teff_val\",\n",
" \"teff_percentile_lower\", \"teff_percentile_upper\", \"a_g_val\",\n",
" \"a_g_percentile_lower\", \"a_g_percentile_upper\", \"e_bp_min_rp_val\",\n",
" \"e_bp_min_rp_percentile_lower\", \"e_bp_min_rp_percentile_upper\", \"flame_flags\",\n",
" \"radius_val\", \"radius_percentile_lower\", \"radius_percentile_upper\", \"lum_val\",\n",
" \"lum_percentile_lower\", \"lum_percentile_upper\"]\n",
"\n",
"col_types = [LongType(), StringType(), LongType(), LongType(), FloatType(), DoubleType(),\n",
" FloatType(), DoubleType(), FloatType(), FloatType(), FloatType(), FloatType(),\n",
" FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n",
" FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n",
" FloatType(), FloatType(), IntegerType(), IntegerType(), IntegerType(),\n",
" IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(),\n",
" IntegerType(), StringType(), FloatType(), FloatType(), FloatType(), FloatType(),\n",
" IntegerType(), IntegerType(), FloatType(), IntegerType(), IntegerType(),\n",
" StringType(), IntegerType(), DoubleType(), FloatType(), FloatType(),\n",
" FloatType(), IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(),\n",
" IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n",
" IntegerType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n",
" IntegerType(), FloatType(), FloatType(), FloatType(), StringType(),\n",
" DoubleType(), DoubleType(), DoubleType(), DoubleType(), IntegerType(),\n",
" FloatType(), FloatType(), FloatType(), FloatType(), FloatType(), FloatType(),\n",
" FloatType(), FloatType(), FloatType(), IntegerType(), FloatType(), FloatType(),\n",
" FloatType(), FloatType(), FloatType(), FloatType() ]"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"fields = [StructField(field_name, type_class, True) for field_name, type_class in zip(col_names, col_types)]\n",
"schema = StructType(fields)\n",
"\n",
"single_filename = \"/data/epyc/data/gaia_dr2_csv/gaia_source/GaiaSource_1763645856731413248_1763758698407549184.csv.gz\"\n",
"df = spark.read.load(single_filename, format=\"csv\", schema=schema, infer_schema=False, header=\"true\")"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 15.7 ms, sys: 2.36 ms, total: 18 ms\n",
"Wall time: 34.9 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"gaia_wildcard = \"/data/epyc/data/gaia_dr2_csv/gaia_source/GaiaSource_*.csv.gz\"\n",
"df_all = spark.read.load(gaia_wildcard, format=\"csv\", schema=schema, infer_schema=False, header=\"true\")"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.79 s, sys: 308 ms, total: 2.1 s\n",
"Wall time: 2h 26min 8s\n"
]
}
],
"source": [
"%%time \n",
"\n",
"df_all.write.format(\"parquet\").save(os.path.join(root_dir, \"gaia_dr2.parquet\"))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment