Skip to content

Instantly share code, notes, and snippets.

@dholstius
Last active August 29, 2015 14:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dholstius/3993cc91d5552d634839 to your computer and use it in GitHub Desktop.
Save dholstius/3993cc91d5552d634839 to your computer and use it in GitHub Desktop.
Fetch, display, and download data from ARB
suppressPackageStartupMessages({
library(dplyr) # install.packages("dplyr")
library(tidyr) # install.packages("tidyr")
library(httr) # install.packages("httr")
library(lubridate) # install.packages("lubridate")
library(stringr) # install.packages("stringr")
library(ggvis) # install.packages("ggvis")
library(digest) # install.packages("digest")
})
TODAY <- as.Date(Sys.time())
LST <- "America/Los_Angeles"
EPOCH <- as.POSIXct("1970-01-01")
QA_FLAGS <- c(
"Valid" = "0",
"Valid, flagged by data supplier" = "1",
"Uncertain, flagged by QA routine" = "2",
"Invalid, flagged by QA routine" = "3",
"Invalid, flagged by data supplier" = "4",
"Invalid, flagged manually" = "5")
PARAMETERS <- c(
"Ozone (ppb)" = "OZONE",
"PM2.5 (ug/m3)" = "PM25HR",
"Black Carbon (ug/m3)" = "BC")
UNITS <- c(
"ug/m3" = "001",
"ppm" = "007",
"ppb" = "008")
PARAM_UNITS <- c(
"OZONE" = "008",
"PM25HR" = "001",
"BC" = "001")
categorical <- function (x, choices) {
factor(x, levels = choices, labels = names(choices))
}
cached <- function (file, expr, cache_dir = "cache", compress = "xz", verbose = TRUE, force = FALSE) {
file <- normalizePath(file.path(cache_dir, file), mustWork = FALSE)
if (!file.exists(file) || force) {
if (verbose) {
if (force) {
message("[cached] force:", file)
} else {
message("[cached] miss:", file)
}
}
if (!file.exists(dn <- dirname(file))) dir.create(dn, recursive = TRUE)
saveRDS(obj <- force(expr), file = file, compress = compress)
} else {
if (verbose) message("[cached] hit:", file)
obj <- readRDS(file)
}
return(obj)
}
lapply_with_progress <- function (session, message, detail) {
require(shinyIncubator)
function (X, FUN, ...) {
env <- environment()
assign("counter", 0, envir = env)
wrapper <- function (...) {
i <- get("counter", envir = env)
assign("counter", i + 1 , envir = env)
setProgress(value = i + 1, message = message, detail = detail)
FUN(...)
}
withProgress(session, min = 1, max = length(X), lapply(X, wrapper, ...))
}
}
# This is the server logic for a Shiny web application.
# You can find out more about building applications with Shiny here:
#
# http://shiny.rstudio.com
#
library(shiny)
shinyServer( function (input, output, session) {
.selected_parameter <- reactive({
categorical(input$parameter, choices = PARAMETERS)
})
.units <- reactive({
categorical(PARAM_UNITS[.selected_parameter()], choices = UNITS)
})
.check_cache <- reactive({
as.logical(input$checkCache)
})
.selected_dates <- reactive({
seq(from = input$dateRange[1], to = input$dateRange[2], by = "1 day")
})
.selected_sites <- reactive({
categorical(input$sites, choices = .SITES())
})
today <- function () as.Date(Sys.time())
get_date <- function (date, parameter, units, cache = TRUE) {
query <- list(
download = "y",
param = PARAMETERS[parameter], units = UNITS[units], statistic = "DAVG",
year = year(date), mon = month(date), day = mday(date), hours = "all",
county_name = "--COUNTY--", basin = "SFB-San Francisco Bay",
latitude = "--PART OF STATE--", report = "HVAL",
order = "basin,county_name,s.name",
submit = "Retrieve Data", ptype = "aqd")
if (cache) {
force <- (today() - date) < structure(2, units = "days", class = "difftime")
cache_key <- digest(query, "sha1", raw = FALSE)
response <- cached(
file.path(substr(cache_key, 1, 3), substr(cache_key, 4, 6), cache_key),
GET("http://www.arb.ca.gov/aqmis2/display.php", query = query),
force = force)
} else {
response <- GET("http://www.arb.ca.gov/aqmis2/display.php", query = query)
}
stop_for_status(response)
content(response) %>%
textConnection() %>%
read.csv(colClasses = "character") %>%
as.tbl() %>%
filter(str_detect(value, "[0-9]")) %>%
mutate_each(funs(stringr::str_trim)) %>%
mutate(start = ymd(date, tz = LST) + 3600 * extract_numeric(start_hour),
value = extract_numeric(value),
parameter = categorical(variable, choices = PARAMETERS),
#units = categorical(units, choices = UNITS),
quality = categorical(quality, choices = QA_FLAGS)) %>%
select(site_id = site, site_name = name, start, value, parameter, quality)
}
.parsed_data <- reactive({
msg <- "Updating, please wait"
detail <- "Fetching data ..."
process <- lapply_with_progress(session, msg, detail)
chunks <- process(.selected_dates(), get_date, .selected_parameter(), .units(), .check_cache())
do.call(rbind, chunks)
})
.SITES <- reactive({
SITES <<- .parsed_data() %>%
select(site_id, site_name) %>%
unique() %>%
with(setNames(site_id, site_name))
updateSelectizeInput(session, 'sites', choices = SITES)
SITES
})
.data_tbl <- reactive({
.parsed_data() %>%
mutate(site = categorical(site_id, choices = .SITES())) %>%
select(-site_id, -site_name)
})
.index_tbl <- reactive({
start <- .data_tbl()$start
combns <- expand.grid(start = seq(min(start), max(start), by = "1 hour"),
parameter = unique(.data_tbl()$parameter),
site = unique(.data_tbl()$site))
as.tbl(combns)
})
.filtered_data_tbl <- reactive({
grouped <- .data_tbl() %>%
group_by(site, parameter) %>%
mutate(series = cumsum(c(0, as.integer(diff(start) - 1))),
series = str_c(site, parameter, series),
key = str_c(start, site, parameter, value))
if (length(.selected_sites()) == 0) {
grouped
} else {
grouped %>% filter(site %in% .selected_sites())
}
})
.y_axis_title <- reactive({
str_c(.selected_parameter(), ", ", .units())
})
.filtered_data_tbl %>%
group_by(series) %>%
ggvis(x = ~start, y = ~value) %>%
set_options(height = 300) %>%
layer_lines(stroke = ~site) %>%
add_axis("x", title = "Timestamp (hour beginning)") %>%
add_axis("y", title = "Reported 1h value") %>%
scale_datetime("x", expand = 0) %>%
#scale_numeric("y", zero = TRUE) %>%
bind_shiny("ggvis_plot")
.reactive_filename <- reactive({
sprintf("SFAB-%s-%s-%s.csv",
str_replace_all(.selected_parameter(), "[^A-Za-z0-9]", ""),
format(min(.selected_dates()), "%Y%m%d"),
format(max(.selected_dates()), "%Y%m%d"))
})
output$downloadCSV <- downloadHandler(
filename = function () {
.reactive_filename()
},
content = function (file) {
write.csv(.data_tbl(), file = file, row.names = FALSE)
}
)
})
# This is the user-interface definition of a Shiny web application.
# You can find out more about building applications with Shiny here:
#
# http://shiny.rstudio.com
#
library(shiny)
library(shinyIncubator)
shinyUI(fluidPage(
titlePanel("San Francisco Air Basin Data"),
ggvisOutput("ggvis_plot"),
hr(),
# Progress bar (placeholder)
progressInit(),
fluidRow(
column(4, offset = 1,
h3("Query"),
selectInput("parameter", "Parameter:", choices = PARAMETERS, selected = "BC"),
dateRangeInput("dateRange", "Date range:", start = TODAY - ddays(7), end = TODAY, max = TODAY),
checkboxInput("checkCache", "Use cache", value = TRUE)),
column(4,
h3("Filter"),
selectizeInput("sites", label = "Sites:", choices = NULL, multiple = TRUE,
options = list(placeholder = 'Click to select one or more ... '))),
column(3,
h3("Download"),
downloadButton("downloadCSV", "CSV")
))
))
@dholstius
Copy link
Author

Hmm. Updated to dplyr 0.3, and now getting

Error in eval(substitute(expr), envir, enclos) : 
unsupported type for column 'site' (CLOSXP)

with traceback():

> traceback()
4: Sys.sleep(0.001)
3: withCallingHandlers(expr, error = function(e) {
   handle <- getOption("shiny.error")
   if (is.function(handle)) 
       handle()
   })
2: shinyCallingHandlers(while (!.globals$stopped) {
   serviceApp()
   Sys.sleep(0.001)
   })
1: runApp()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment