Skip to content

Instantly share code, notes, and snippets.

@mwiewior
Forked from agaszmurlo/carbon.scala
Created July 31, 2019 17:12
Show Gist options
  • Save mwiewior/231bef07c18b37037eb16642a4be9bda to your computer and use it in GitHub Desktop.
Save mwiewior/231bef07c18b37037eb16642a4be9bda to your computer and use it in GitHub Desktop.
Carbon data varia
// ./spark-shell -v --master yarn-client --driver-memory 1G --executor-memory 2G --executor-cores 2 \
// --jars /tmp/apache-carbondata-1.6.0-SNAPSHOT-bin-spark2.3.2-hadoop2.7.2.jar \
// --conf spark.hadoop.hive.metastore.uris=thrift://cdh01.cl.ii.pw.edu.pl:9083 \
// --conf spark.hadoop.yarn.timeline-service.enabled=false \
// --conf spark.driver.extraJavaOptions=-Dhdp.version=3.1.0.0-78 \
// --conf spark.yarn.am.extraJavaOptions=-Dhdp.version=3.1.0.0-78 \
// --conf spark.hadoop.metastore.catalog.default=hive
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("hdfs://cdh01.cl.ii.pw.edu.pl:8020/opt/carbonstore")
carbon.sql("use igap_dev")
carbon.sql(
s"""
| CREATE TABLE etl_genotypes_ac_carbon
| STORED AS carbondata
| AS SELECT * FROM etg_genotypes_ac_ext
""".stripMargin)
carbon.sql(
s"""
| CREATE DATAMAP agg_alleles
| ON TABLE etl_genotypes_ac_carbon
| USING "preaggregate"
| AS
| SELECT chr, pos, posend, ref, sum(al_cnt) as sum_al
| FROM etl_genotypes_ac_carbon
| GROUP BY chr, pos, posend, ref, alt
""".stripMargin)
//"opTime":"272850 ms"
//distinct not supported in preagg
// carbon.sql(
// s"""
// | CREATE DATAMAP agg_samples
// | ON TABLE etl_genotypes_ac_carbon
// | USING "preaggregate"
// | AS
// | SELECT count (distinct sample_id) as sample_count
// | FROM etl_genotypes_ac_carbon
// """.stripMargin)
carbon.sql(
s"""
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af
| FROM
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles
""".stripMargin).explain
// +- *(1) FileScan carbondata igap_dev.etl_genotypes_ac_carbon_agg_alleles[etl_genotypes_ac_carbon_chr#2161,etl_genotypes_ac_carbon_pos#2162,etl_genotypes_ac_carbon_posend#2163,etl_genotypes_ac_carbon_ref#2164,etl_genotypes_ac_carbon_al_cnt_sum#2165L,etl_genotypes_ac_carbon_alt#2166] ReadSchema: struct<etl_genotypes_ac_carbon_al_cnt_sum:bigint,etl_genotypes_ac_carbon_alt:string,etl_genotypes...
carbon.sql(
s"""
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af
| FROM
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles
""".stripMargin).count
spark.time(carbon.sql(
s"""
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af
| FROM
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles
""".stripMargin).count )
// Time taken: 14620 ms = ~14s
// res25: Long = 22064623
spark.time(carbon.sql(
s"""
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af
| FROM
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles
| WHERE CHR='1' AND POS=13483 AND POSEND=13483
""".stripMargin).show )
+---+-----+------+---+---+--------------------+
|chr| pos|posend|ref|alt| af|
+---+-----+------+---+---+--------------------+
| 1|13483| 13483| G| C|0.006329113924050633|
+---+-----+------+---+---+--------------------+
Time taken: 14046 ms
carbon.sql(
s"""
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etl_genotypes_ac_carbon) as af
| FROM
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etl_genotypes_ac_carbon group by chr, pos, posend, ref, alt) alleles
| WHERE CHR='1' AND POS=13483 AND POSEND=13483
""".stripMargin).explain
== Physical Plan ==
*(2) HashAggregate(keys=[etl_genotypes_ac_carbon_chr#2161, etl_genotypes_ac_carbon_pos#2162, etl_genotypes_ac_carbon_posend#2163, etl_genotypes_ac_carbon_ref#2164, etl_genotypes_ac_carbon_alt#2166], functions=[sum(etl_genotypes_ac_carbon_al_cnt_sum#2165L)])
: +- Subquery subquery2685
: +- *(3) HashAggregate(keys=[], functions=[count(distinct sample_id#2039)])
: +- Exchange SinglePartition
: +- *(2) HashAggregate(keys=[], functions=[partial_count(distinct sample_id#2039)])
: +- *(2) HashAggregate(keys=[sample_id#2039], functions=[])
: +- Exchange hashpartitioning(sample_id#2039, 200)
: +- *(1) HashAggregate(keys=[sample_id#2039], functions=[])
: +- *(1) FileScan carbondata igap_dev.etl_genotypes_ac_carbon[sample_id#2039] ReadSchema: struct<sample_id:string>
+- Exchange hashpartitioning(etl_genotypes_ac_carbon_chr#2161, etl_genotypes_ac_carbon_pos#2162, etl_genotypes_ac_carbon_posend#2163, etl_genotypes_ac_carbon_ref#2164, etl_genotypes_ac_carbon_alt#2166, 200)
+- *(1) HashAggregate(keys=[etl_genotypes_ac_carbon_chr#2161, etl_genotypes_ac_carbon_pos#2162, etl_genotypes_ac_carbon_posend#2163, etl_genotypes_ac_carbon_ref#2164, etl_genotypes_ac_carbon_alt#2166], functions=[partial_sum(etl_genotypes_ac_carbon_al_cnt_sum#2165L)])
+- *(1) Filter (((((isnotnull(etl_genotypes_ac_carbon_posend#2163) && isnotnull(etl_genotypes_ac_carbon_pos#2162)) && isnotnull(etl_genotypes_ac_carbon_chr#2161)) && (etl_genotypes_ac_carbon_chr#2161 = 1)) && (etl_genotypes_ac_carbon_pos#2162 = 13483)) && (etl_genotypes_ac_carbon_posend#2163 = 13483))
+- *(1) FileScan carbondata igap_dev.etl_genotypes_ac_carbon_agg_alleles[etl_genotypes_ac_carbon_chr#2161,etl_genotypes_ac_carbon_pos#2162,etl_genotypes_ac_carbon_posend#2163,etl_genotypes_ac_carbon_ref#2164,etl_genotypes_ac_carbon_al_cnt_sum#2165L,etl_genotypes_ac_carbon_alt#2166] PushedFilters: [IsNotNull(etl_genotypes_ac_carbon_posend), IsNotNull(etl_genotypes_ac_carbon_pos), IsNotNull(etl..., ReadSchema: struct<etl_genotypes_ac_carbon_al_cnt_sum:bigint,etl_genotypes_ac_carbon_alt:string,etl_genotypes...
sql("use igap_dev")
spark.time(sql(
s"""
| SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etg_genotypes_ac_ext) as af
| FROM
| (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etg_genotypes_ac_ext group by chr, pos, posend, ref, alt) alleles
""".stripMargin).count )
// Time taken: 337621 ms = ~5,5m
// res26: Long = 22064623
// in beeline
// select count (*) from (SELECT chr, pos, posend, ref, alt, sum_al/(SELECT count (distinct sample_id) as s_cnt from etg_genotypes_ac) as af
// FROM
// (SELECT chr, pos, posend, ref, alt, sum(al_cnt) as sum_al from etg_genotypes_ac group by chr, pos, posend, ref, alt) alleles);
// > 5,5 m
// create external table etg_genotypes_ac_ext as select *, case gt
// when ref||'|'||ref then 0
// when ref||'|'||alt then 1
// when alt||'|'||ref then 1
// when alt||'|'||alt then 2
// when ref then 0
// when alt then 1
// else 0
// end as al_cnt from etl_genotypes;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment