Skip to content

Instantly share code, notes, and snippets.

@mutolisp
Created November 29, 2014 15:17
Show Gist options
  • Save mutolisp/99f6ba3be9f19140623b to your computer and use it in GitHub Desktop.
Save mutolisp/99f6ba3be9f19140623b to your computer and use it in GitHub Desktop.
##### Preprocessing Air Quality
## Lin, Cheng-Tao (mutolisp)
#####
# load saved Rdata
if (length(ls()) == 0) {
load(".RData")
}
#### TimeSeries
require(timeSeries)
require(xts)
require(PerformanceAnalytics)
require(fTrading)
##### Functions #####
airquality_parser = function(raw, stn='台西', msmt='PM2.5') {
require(reshape2)
stn_list = levels(raw$station)
msmt_list = levels(raw$measurement)
if ( stn %in% stn_list & msmt %in% msmt_list ) {
melted = melt(raw, id=c("date", "station", "measurement"), na.rm=FALSE)
colnames(melted) = c('date', 'station', 'measurement', 'hour', 'value')
# convert non-numeric value to NA
melted$hour = as.numeric(melted$hour)
melted$value = as.numeric(melted$value)
#
msmt_val = melted[melted$station==stn & melted$measurement==msmt, ]
# sort the dataframe
idx = with(msmt_val, order(date, hour))
output = msmt_val[idx,]
output$date = strftime(paste(output$date, " ", output$hour-1,
":00:00", sep=""), format = "%Y-%m-%d %H:%M:%S")
output = subset(output, select = -c(measurement, station, hour))
return(output)
} else {
#
print("Target station or monitoring measurement not exist in list")
print("Please select from following items")
print("Stations:")
print(stn_list)
print("Monitoring items:")
print(msmt_list)
}
}
hdfs_read.csv = function(data, header=TRUE) {
Sys.setenv(HADOOP_CMD="/usr/local/bin/hadoop")
library(rhdfs)
library(rmr2)
hdfs.init();
# import data into hdfs via hdfs.file()
HDFS_r = hdfs.file(data, "r",buffersize = 104857600)
HDFS_f = hdfs.read(HDFS_r)
rawdata = read.csv(textConnection(rawToChar(HDFS_f)), sep=",", header=header)
return(rawdata)
}
parserHrData = function(dataset, threshold) {
gt95 = dataset[dataset$value>threshold,]
gt95_m = as.data.frame(cbind(strftime(gt95$date, format="%Y"),
strftime(gt95$date, format="%m"), gt95$value))
gt95_m = as.data.frame(cbind(paste(strftime(gt95$date, format="%Y-%m"),'-01',sep=""),
gt95$value))
colnames(gt95_m) = c('date','hours')
gt95_hrs = aggregate(gt95_m$hours, by = list(gt95_m[,1]), FUN = length)
colnames(gt95_hrs) = c('date','hours')
row.names(gt95_hrs) = gt95_hrs$date
# create Time Series period
len = length(sort(levels(gt95_hrs$date)))
st = as.Date(sort(levels(gt95_hrs$date))[1])
en = as.Date(levels(gt95_hrs$date)[len])
pd = seq(st, en, by = "1 month")
period = as.data.frame(cbind(strftime(pd, format="%Y-%m-%d")))
colnames(period) = 'date'
period_outerjoin = merge(x=period, y=gt95_hrs, all.x=TRUE)
period_outerjoin$hours[is.na(period_outerjoin$hours)]=0
row.names(period_outerjoin) = period_outerjoin$date
period_outerjoin = subset(period_outerjoin, select=-date)
return(period_outerjoin)
}
# Method A
# 0. execute external encoding_conv.sh to convert big5 to utf8 first!
# 1. load reshape2 library
library(reshape2)
# 2. data input
douliu_raw = hdfs_read.csv('data/douliu.csv')
lunbei_raw = hdfs_read.csv('data/lunbei.csv')
taishi_raw = hdfs_read.csv('data/taishi.csv')
colnames(douliu_raw) = c('date', 'station', 'measurement', 1:24)
colnames(lunbei_raw) = c('date', 'station', 'measurement', 1:24)
colnames(taishi_raw) = c('date', 'station', 'measurement', 1:24)
# filtering data by stations and monitoring items
#stn_list = levels(rawdata$station)
msmt_obj = levels(taishi_raw$measurement)
# target monitoring items
target = c('PM2.5', 'PM10', 'WIND_DIREC', 'WIND_SPEED')
taishi = list(); lunbei = list(); douliu = list()
for ( i in 1:length(target) ) {
taishi[[target[i]]] =
airquality_parser(taishi_raw, stn='台西', msmt=target[i])
lunbei[[target[i]]] =
airquality_parser(lunbei_raw, stn='崙背', msmt=target[i])
douliu[[target[i]]] =
airquality_parser(douliu_raw, stn='斗六', msmt=target[i])
}
# PM2.5
summary_report = list()
stns = c('douliu', 'lunbei', 'taishi')
pm2.5_q95 = list()
douliu_pm2.5 = douliu$PM2.5[!is.na(douliu$PM2.5$value),]
lunbei_pm2.5 = lunbei$PM2.5[!is.na(lunbei$PM2.5$value),]
taishi_pm2.5 = taishi$PM2.5[!is.na(taishi$PM2.5$value),]
pm2.5_q95['douliu'] = quantile(douliu_pm2.5$value, 0.95, na.rm=T)
summary(douliu_pm2.5)
pm2.5_q95['lunbei'] = quantile(lunbei_pm2.5$value, 0.95, na.rm=T)
summary(lunbei_pm2.5)
pm2.5_q95['taishi'] = quantile(taishi_pm2.5$value, 0.95, na.rm=T)
summary(taishi_pm2.5)
# PM10
douliu_pm10 = douliu$PM10[!is.na(douliu$PM10$value),]
lunbei_pm10 = lunbei$PM10[!is.na(lunbei$PM10$value),]
taishi_pm10 = taishi$PM10[!is.na(taishi$PM10$value),]
pm10_q95 = list()
pm10_q95['douliu'] = quantile(douliu_pm10$value, 0.95, na.rm=T)
summary(douliu_pm10)
pm10_q95['lunbei'] = quantile(lunbei_pm10$value, 0.95, na.rm=T)
summary(lunbei_pm10)
pm10_q95['taishi'] = quantile(taishi_pm10$value, 0.95, na.rm=T)
summary(taishi_pm10)
douliu_pm2.5_gt95 = parserHrData(douliu_pm2.5, pm2.5_q95['douliu'][[1]])
lunbei_pm2.5_gt95 = parserHrData(lunbei_pm2.5, pm2.5_q95['lunbei'][[1]])
taishi_pm2.5_gt95 = parserHrData(taishi_pm2.5, pm2.5_q95['taishi'][[1]])
douliu_pm2.5_validhours = parserHrData(douliu_pm2.5, -1)
lunbei_pm2.5_validhours = parserHrData(lunbei_pm2.5, -1)
taishi_pm2.5_validhours = parserHrData(taishi_pm2.5, -1)
douliu_pm2.5_p = douliu_pm2.5_gt95/douliu_pm2.5_validhours*100
lunbei_pm2.5_p = lunbei_pm2.5_gt95/douliu_pm2.5_validhours*100
taishi_pm2.5_p = taishi_pm2.5_gt95/douliu_pm2.5_validhours*100
ALL_pm2.5_p = cbind(douliu_pm2.5_p, lunbei_pm2.5_p, taishi_pm2.5_p)
colnames(ALL_pm2.5_p) = c('douliu', 'lunbei', 'taishi')
pdf(file="../pm2.5_gt95_percentage.pdf", width = 15)
PerformanceAnalytics::chart.TimeSeries(as.xts(ALL_pm2.5_p),
main="PM2.5 > 95% quantile percentage (2005-2013)", ylab="Percentage (%)", grid.color = "grey")
abline(v=(0:10)*12+1, lty=2)
dev.off()
nrownames = row.names(lunbei_pm2.5_gt95)[3:110]
lunbei_pm2.5_gt95 = as.data.frame(lunbei_pm2.5_gt95[3:110,])
row.names(lunbei_pm2.5_gt95) = nrownames
nrownames = row.names(taishi_pm2.5_gt95)[6:113]
taishi_pm2.5_gt95 = as.data.frame(taishi_pm2.5_gt95[6:113,])
row.names(taishi_pm2.5_gt95) = nrownames
ALL_pm2.5_gt95 = cbind(douliu_pm2.5_gt95, lunbei_pm2.5_gt95, taishi_pm2.5_gt95)
colnames(ALL_pm2.5_gt95) = c('douliu', 'lunbei', 'taishi')
pdf(file="../pm2.5_gt95_quantile.pdf", width = 15)
PerformanceAnalytics::chart.TimeSeries(as.xts(ALL_pm2.5_gt95),
main="PM2.5 > 95% quantile (2005-2013)", ylab="Hours", grid.color = "grey")
abline(v=(0:10)*12+1, lty=2)
dev.off()
# monthly average
douliu_pm2.5_gt95_avgm = cbind(as.numeric(strftime(row.names(douliu_pm2.5_gt95), format="%m")),
douliu_pm2.5_gt95)
colnames(douliu_pm2.5_gt95_avgm) = c('month', 'pm2.5')
lunbei_pm2.5_gt95_avgm = cbind(as.numeric(strftime(row.names(lunbei_pm2.5_gt95), format="%m")),
lunbei_pm2.5_gt95)
colnames(lunbei_pm2.5_gt95_avgm) = c('month', 'pm2.5')
taishi_pm2.5_gt95_avgm = cbind(as.numeric(strftime(row.names(taishi_pm2.5_gt95), format="%m")),
taishi_pm2.5_gt95)
colnames(taishi_pm2.5_gt95_avgm) = c('month', 'pm2.5')
st = as.Date('2013-01-01')
en = as.Date('2013-12-01')
pd = seq(st, en, by = "1 month")
douliu_pm2.5_gt95_avgm = as.data.frame(aggregate(douliu_pm2.5_gt95_avgm[,2],
by=list(douliu_pm2.5_gt95_avgm[,1]), FUN = mean)[,2])
colnames(douliu_pm2.5_gt95_avgm) = 'value'
rownames(douliu_pm2.5_gt95_avgm) = pd
lunbei_pm2.5_gt95_avgm = as.data.frame(aggregate(lunbei_pm2.5_gt95_avgm[,2],
by=list(lunbei_pm2.5_gt95_avgm[,1]), FUN = mean)[,2])
colnames(lunbei_pm2.5_gt95_avgm) = 'value'
rownames(lunbei_pm2.5_gt95_avgm) = pd
taishi_pm2.5_gt95_avgm = as.data.frame(aggregate(taishi_pm2.5_gt95_avgm[,2],
by=list(taishi_pm2.5_gt95_avgm[,1]), FUN = mean)[,2])
colnames(taishi_pm2.5_gt95_avgm) = 'value'
rownames(taishi_pm2.5_gt95_avgm) = pd
ALL_pm2.5_gt95_avgm = cbind(douliu_pm2.5_gt95_avgm, lunbei_pm2.5_gt95_avgm, taishi_pm2.5_gt95_avgm)
colnames(ALL_pm2.5_gt95_avgm) = c('douliu', 'lunbei', 'taishi')
pdf(file="../mean_pm2.5_gt95_quantile.pdf", width = 15)
PerformanceAnalytics::chart.TimeSeries(as.xts(ALL_pm2.5_gt95_avgm),
main="Average PM2.5 > 95% quantile (2005-2013)", ylab="Hours", grid.color = "grey")
dev.off()
####### PM10
douliu_pm10_gt95 = parserHrData(douliu_pm10, pm10_q95['douliu'][[1]])
lunbei_pm10_gt95 = parserHrData(lunbei_pm10, pm10_q95['lunbei'][[1]])
taishi_pm10_gt95 = parserHrData(taishi_pm10, pm10_q95['taishi'][[1]])
douliu_pm10_validhours = parserHrData(douliu_pm10, -1)
lunbei_pm10_validhours = parserHrData(lunbei_pm10, -1)
taishi_pm10_validhours = parserHrData(taishi_pm10, -1)
douliu_pm10_p = douliu_pm10_gt95/douliu_pm10_validhours*100
lunbei_pm10_p = lunbei_pm10_gt95/lunbei_pm10_validhours*100
taishi_pm10_p = taishi_pm10_gt95/taishi_pm10_validhours*100
ALL_pm10_p = cbind(douliu_pm10_p, lunbei_pm10_p, taishi_pm10_p)
colnames(ALL_pm10_p) = c('douliu', 'lunbei', 'taishi')
pdf(file="../pm10_gt95_percentage.pdf", width = 15)
PerformanceAnalytics::chart.TimeSeries(as.xts(ALL_pm10_p),
main="PM10 > 95% quantile percentage (1993-2013)", ylab="Percentage (%)", grid.color = "grey")
abline(v=(0:20)*12+5, lty=2)
dev.off()
ALL_pm10_gt95 = cbind(douliu_pm10_gt95, lunbei_pm10_gt95, taishi_pm10_gt95)
colnames(ALL_pm10_gt95) = c('douliu', 'lunbei', 'taishi')
pdf(file="../pm10_gt95_quantile.pdf", width = 15)
PerformanceAnalytics::chart.TimeSeries(as.xts(ALL_pm10_gt95),
main="PM10 > 95% quantile (1993/09-2013)", ylab="Hours", grid.color = "grey")
abline(v=(0:20)*12+5, lty=2)
dev.off()
douliu_pm10_gt95_avgm = cbind(as.numeric(strftime(row.names(douliu_pm10_gt95), format="%m")),
douliu_pm10_gt95)
colnames(douliu_pm10_gt95_avgm) = c('month', 'pm10')
lunbei_pm10_gt95_avgm = cbind(as.numeric(strftime(row.names(lunbei_pm10_gt95), format="%m")),
lunbei_pm10_gt95)
colnames(lunbei_pm10_gt95_avgm) = c('month', 'pm10')
taishi_pm10_gt95_avgm = cbind(as.numeric(strftime(row.names(taishi_pm10_gt95), format="%m")),
taishi_pm10_gt95)
colnames(taishi_pm10_gt95_avgm) = c('month', 'pm10')
st = as.Date('2013-01-01')
en = as.Date('2013-12-01')
pd = seq(st, en, by = "1 month")
douliu_pm10_gt95_avgm = as.data.frame(aggregate(douliu_pm10_gt95_avgm[,2],
by=list(douliu_pm10_gt95_avgm[,1]), FUN = mean)[,2])
colnames(douliu_pm10_gt95_avgm) = 'value'
rownames(douliu_pm10_gt95_avgm) = pd
lunbei_pm10_gt95_avgm = as.data.frame(aggregate(lunbei_pm10_gt95_avgm[,2],
by=list(lunbei_pm10_gt95_avgm[,1]), FUN = mean)[,2])
colnames(lunbei_pm10_gt95_avgm) = 'value'
rownames(lunbei_pm10_gt95_avgm) = pd
taishi_pm10_gt95_avgm = as.data.frame(aggregate(taishi_pm10_gt95_avgm[,2],
by=list(taishi_pm10_gt95_avgm[,1]), FUN = mean)[,2])
colnames(taishi_pm10_gt95_avgm) = 'value'
rownames(taishi_pm10_gt95_avgm) = pd
ALL_pm10_gt95_avgm = cbind(douliu_pm10_gt95_avgm, lunbei_pm10_gt95_avgm, taishi_pm10_gt95_avgm)
colnames(ALL_pm10_gt95_avgm) = c('douliu', 'lunbei', 'taishi')
pdf(file="../mean_pm10_gt95_quantile.pdf", width = 15)
PerformanceAnalytics::chart.TimeSeries(as.xts(ALL_pm10_gt95_avgm),
main="Average pm10 > 95% quantile (2005-2013)", ylab="Hours", grid.color = "grey")
dev.off()
PerformanceAnalytics::chart.TimeSeries(as.xts(douliu_pm10_gt95),
main="PM10 > 95% quantile (2005-2013) at Douliu", ylab="Hours", grid.color = "grey")
abline(v=(0:10)*12+1, lty=2)
PerformanceAnalytics::chart.TimeSeries(as.xts(lunbei_pm10_gt95),
main="PM10 > 95% quantile (2005-2013) at Lunbei", ylab="Hours", grid.color = "grey")
abline(v=(0:10)*12+1, lty=2)
PerformanceAnalytics::chart.TimeSeries(as.xts(taishi_pm10_gt95),
main="PM10 > 95% quantile (2005-2013) at Taishi", ylab="Hours", grid.color = "grey")
abline(v=(0:10)*12+1, lty=2)
dev.off()
pdf(file="../ALL_PM2.5_histogram.pdf")
par(mfrow = c(2,2))
hist(douliu$PM2.5$value, breaks = 100, main="",
xlab="PM2.5", ylab="Frequency (hour)")
abline(v=pm2.5_q95$douliu[[1]], lty=2, col="red")
hist(lunbei$PM2.5$value, breaks = 100, main="",
xlab="PM2.5", ylab="Frequency (hour)")
abline(v=pm2.5_q95$lunbei[[1]], lty=2, col="red")
hist(taishi$PM2.5$value, breaks = 100, main="",
xlab="PM2.5", ylab="Frequency (hour)")
abline(v=pm2.5_q95$taishi[[1]], lty=2, col="red")
dev.off()
pdf(file="../ALL_PM10_histogram.pdf")
par(mfrow = c(2,2))
hist(douliu$PM10$value, breaks = 100, main="",
xlab="PM10", ylab="Frequency (hour)")
abline(v=pm10_q95$douliu[[1]], lty=2, col="red")
hist(lunbei$PM10$value, breaks = 100, main="",
xlab="PM10", ylab="Frequency (hour)")
abline(v=pm10_q95$lunbei[[1]], lty=2, col="red")
hist(taishi$PM10$value, breaks = 100, main="",
xlab="PM10", ylab="Frequency (hour)")
abline(v=pm10_q95$taishi[[1]], lty=2, col="red")
dev.off()
# Test for normal distribution
pdf(file="../normal_qqplot.pdf")
par(mfrow = c(2,3))
qqnorm(douliu_pm10$value, main="Douliu PM10")
qqnorm(lunbei_pm10$value, main="Lunbei PM10")
qqnorm(taishi_pm10$value, main="Taishi PM10")
qqnorm(douliu_pm2.5$value, main="Douliu PM2.5")
qqnorm(lunbei_pm2.5$value, main="Lunbei PM2.5")
qqnorm(taishi_pm2.5$value, main="Taishi PM2.5")
dev.off()
# 極端值統計
dim(douliu_pm2.5[douliu_pm2.5$value>87,])[1]/dim(douliu_pm2.5)[1]*100
dim(lunbei_pm2.5[lunbei_pm2.5$value>82,])[1]/dim(lunbei_pm2.5)[1]*100
dim(taishi_pm2.5[taishi_pm2.5$value>70,])[1]/dim(taishi_pm2.5)[1]*100
dim(douliu_pm10[douliu_pm10$value>87,])[1]/dim(douliu_pm10)[1]*100
dim(lunbei_pm10[lunbei_pm10$value>82,])[1]/dim(lunbei_pm10)[1]*100
dim(taishi_pm10[taishi_pm10$value>70,])[1]/dim(taishi_pm10)[1]*100
# calculate rh
douliu_rh = airquality_parser(douliu_raw, stn='斗六', msmt='RH')
douliu_rh = douliu_rh[!is.na(douliu_rh$value),]
douliu_rh$date = paste(strftime(douliu_rh$date, format='%Y-%m'), "-01", sep="")
douliu_rh_mavg = aggregate(douliu_rh$value, by=list(douliu_rh$date), FUN=mean)
douliu_rh_mavg = douliu_rh_mavg[8:dim(douliu_rh_mavg)[1],]
colnames(douliu_rh_mavg) = c("date", "rh")
row.names(douliu_rh_mavg) = douliu_rh_mavg[,1]
douliu_rh_mavg = subset(douliu_rh_mavg, select=-date)
taishi_rh = airquality_parser(taishi_raw, stn='台西', msmt='RH')
taishi_rh = taishi_rh[!is.na(taishi_rh$value),]
taishi_rh$date = paste(strftime(taishi_rh$date, format='%Y-%m'), "-01", sep="")
taishi_rh_mavg = aggregate(taishi_rh$value, by=list(taishi_rh$date), FUN=mean)
taishi_rh_mavg = taishi_rh_mavg[8:dim(taishi_rh_mavg)[1],]
colnames(taishi_rh_mavg) = c("date", "rh")
row.names(taishi_rh_mavg) = taishi_rh_mavg[,1]
taishi_rh_mavg = subset(taishi_rh_mavg, select=-date)
taishi_precp_raw = taishi_raw$measurement=='RAINFALL'
taishi_melted = melt(taishi_precp_raw, id=c("date", "station", "measurement"), na.rm=FALSE)
taishi_melted$value = as.numeric(taishi_melted$value)
taishi_melted[is.na(taishi_melted$value),]$value=0
taishi_precp = aggregate(taishi_melted$value,
by=list(strftime(taishi_melted$date, format="%Y-%m")), FUN=sum)
taishi_precp = as.data.frame(cbind(paste(taishi_precp[,1], '-01', sep=""), taishi_precp[,2]))
row.names(taishi_precp) = taishi_precp[,1]
taishi_precp = subset(taishi_precp, select=-V1)
pdf(file="../taishi_precp_sum.pdf", width=15)
PerformanceAnalytics::chart.TimeSeries(as.xts(taishi_precp), ylim=c(0,1000),
main="Monthly precipitataion (1994-2013) at Taishi", ylab="precipitation (mm)",
grid.color = "grey", col="blue")
dev.off()
douliu_precp_raw = douliu_raw[douliu_raw$measurement=='RAINFALL',]
douliu_melted = melt(douliu_precp_raw, id=c("date", "station", "measurement"), na.rm=FALSE)
douliu_melted$value = as.numeric(douliu_melted$value)
douliu_melted[is.na(douliu_melted$value),]$value=0
douliu_precp = aggregate(douliu_melted$value,
by=list(strftime(douliu_melted$date, format="%Y-%m")), FUN=sum)
douliu_precp = as.data.frame(cbind(paste(douliu_precp[,1], '-01', sep=""), douliu_precp[,2]))
row.names(douliu_precp) = douliu_precp[,1]
douliu_precp = subset(douliu_precp, select=-V1)
pdf(file="../douliu_precp_sum.pdf", width=15)
PerformanceAnalytics::chart.TimeSeries(as.xts(douliu_precp), ylim=c(0,1000),
main="Monthly precipitataion (1994-2013) at Douliu", ylab="precipitation (mm)",
grid.color = "grey", col="blue")
dev.off()
lunbei_precp_raw = lunbei_raw[lunbei_raw$measurement=='RAINFALL',]
lunbei_melted = melt(lunbei_precp_raw, id=c("date", "station", "measurement"), na.rm=FALSE)
lunbei_melted$value = as.numeric(lunbei_melted$value)
lunbei_melted[is.na(lunbei_melted$value),]$value=0
lunbei_precp = aggregate(lunbei_melted$value,
by=list(strftime(lunbei_melted$date, format="%Y-%m")), FUN=sum)
lunbei_precp = as.data.frame(cbind(paste(lunbei_precp[,1], '-01', sep=""), lunbei_precp[,2]))
row.names(lunbei_precp) = lunbei_precp[,1]
lunbei_precp = subset(lunbei_precp, select=-V1)
pdf(file="../lunbei_precp_sum.pdf", width=15)
PerformanceAnalytics::chart.TimeSeries(as.xts(lunbei_precp), ylim=c(0,1000),
main="Monthly precipitataion (1994-2013) at Lunbei", ylab="precipitation (mm)",
grid.color = "grey", col="blue")
dev.off()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment