CEDEN Arrow Files Example
# This example shows how to work with CEDEN data in the Apache parquet file format, using the arrow package in R
# The data (in parquet file format) for each type of CEDEN data is available on the California Open Data Portal at the
# following links:
# Water Chemistry:
# Habitat:
# Tissue:
# Toxicity:
# Benthic Macroinvertebrates:
# load packages
# enter the URL to the zip file containing the parquet files for one of the CEDEN data types
# this example uses the water chemistry dataset -- replace this URL with one of the URLs above to access a different CEDEN data type
# NOTE: be sure to replace the file name at the end of this link with the current version of the file name (you can also use the ckanr
# package's resource_show() function to retrieve the current filename -- e.g., for this dataset use:
# ckanr::resource_show(id = 'f4aa224d-4a59-403d-aad8-187955aa2e38', url = '')$url
data_url <- ''
# download the zip file to a temporary directory and unzip ----
zip_file_name <- basename(data_url)
directory_name <- file_path_sans_ext(zip_file_name)
# create temporary directory
temp_dir <- tempdir()
# download to temporary directory
download.file(url = data_url,
destfile = file.path(temp_dir, zip_file_name),
mode = 'wb')
# unzip (to temporary directory)
zip::unzip(zipfile = file.path(temp_dir, zip_file_name),
exdir = file.path(temp_dir, directory_name))
# create a connection to the dataset (with Arrow) ----
ds_con <- open_dataset(file.path(temp_dir, directory_name))
# ds_con # to view a list of all fields and their associated data types
# (NOTE: these queries use the CEDEN water chemistry dataset, but the same general process can be used for any dataset)
# Example query: pull E. coli data from 2010 to present, remove samples with certain station codes (which indicate QA data), and just select a few fields ----
tic() # start timer
ds_con %>%
filter(Analyte %in% c("E. coli"), # get E. coli data
Year >= 2010, # just get data from year 2010 through present
!(StationCode %in% c('LABQA_SWAMP', '0000', '000NONPJ'))) %>% # remove QA data
select(StationCode, SampleDate, Analyte, Unit, Result, MDL, RL, Year) %>% # just get certain fields
collect() # get the data
toc() # stop timer
# 0.56 sec elapsed
# # A tibble: 144,222 x 8
# StationCode SampleDate Analyte Unit Result MDL RL Year
# <chr> <dttm> <chr> <chr> <dbl> <dbl> <dbl> <int>
# 1 HSC-GHS 2010-06-12 00:00:00 E. coli MPN/100 mL 3.1 -88 1 2010
# 2 MVC-LIB 2010-06-12 00:00:00 E. coli MPN/100 mL 14 -88 1 2010
# 3 MVC-USFS 2010-06-12 00:00:00 E. coli MPN/100 mL 15 -88 1 2010
# 4 BAKER NE 2010-08-04 00:00:00 E. coli MPN/100 mL 10 -88 -88 2010
# 5 BAKER NE 2010-08-18 00:00:00 E. coli MPN/100 mL 10 -88 -88 2010
# 6 BAKER NE 2010-09-15 00:00:00 E. coli MPN/100 mL 20 -88 -88 2010
# 7 BAKER NE 2010-10-20 00:00:00 E. coli MPN/100 mL 10 -88 -88 2010
# 8 BAKER NW 2010-04-21 00:00:00 E. coli MPN/100 mL 63 -88 -88 2010
# 9 BAKER NW 2010-05-12 00:00:00 E. coli MPN/100 mL 10 -88 -88 2010
# 10 BAKER NW 2010-05-19 00:00:00 E. coli MPN/100 mL 41 -88 -88 2010
# # ... with 144,212 more rows
# Example query: pull E. coli data for all years, remove QA data, and calculate average result for each station, unit, and year ----
tic() # start timer
ds_con %>%
select(StationCode, Analyte, Unit, Result, Year) %>%
filter(Analyte %in% c("E. coli"), # get E. coli data
!(StationCode %in% c('LABQA_SWAMP', '0000', '000NONPJ'))) %>% # remove QA data
group_by(StationCode, Analyte, Unit, Year) %>% # group data for the calculations below
collect() %>% # get the data
summarize(avg_result = mean(Result, na.rm = TRUE), # calculate the average for each station, unit, and year
n = n())
toc() # stop timer
# 1.03 sec elapsed
# # A tibble: 15,688 x 6
# # Groups: StationCode, Analyte, Unit [4,171]
# StationCode Analyte Unit Year avg_result n
# <chr> <chr> <chr> <int> <dbl> <int>
# 1 01T_ODD3_EDI E. coli MPN/100 mL 2017 464. 5
# 2 01T_ODD3_EDI E. coli MPN/100 mL 2018 44569. 5
# 3 01T_ODD3_EDI E. coli MPN/100 mL 2019 548 6
# 4 01T_ODD3_EDI E. coli MPN/100 mL 2020 645 2
# 5 05T_HONDO E. coli MPN/100 mL 2017 3410 1
# 6 05T_HONDO E. coli MPN/100 mL 2018 13340 1
# 7 05T_HONDO E. coli MPN/100 mL 2019 9565 2
# 8 06T_LONG2 E. coli MPN/100 mL 2017 14670 1
# 9 06T_LONG2 E. coli MPN/100 mL 2018 12230 1
# 10 06T_LONG2 E. coli MPN/100 mL 2019 6570 1
# # ... with 15,678 more rows
# Example query: get all E. coli data (including all fields) within a selected date range for a (randomly chosen) station ----
tic() # start timer
df_query_data <- ds_con %>%
filter(SampleDate >= as.Date('2016-07-01'),
SampleDate <= as.Date('2018-06-30'),
Analyte == "E. coli",
StationName == 'BAY#301.1_SL-Candlestick Point, San Francisco') %>%
toc() # stop timer
# 1.72 sec elapsed
# verify the correct dates/analyte/station was returned
# [1] "2016-07-05 UTC" "2018-06-25 UTC"
df_query_data %>% count(Analyte, StationName)
# # A tibble: 1 x 3
# Analyte StationName n
# <chr> <chr> <int>
# 1 E. coli BAY#301.1_SL-Candlestick Point, San Francisco 161
