Skip to content

Instantly share code, notes, and snippets.

@wush978
Created July 24, 2018 01:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wush978/f8823a6aef81a7bc08b5a29caf5236ff to your computer and use it in GitHub Desktop.
Save wush978/f8823a6aef81a7bc08b5a29caf5236ff to your computer and use it in GitHub Desktop.
#include <Rcpp.h>
using namespace Rcpp;
//[[Rcpp::export(".time_clustering")]]
SEXP time_clustering(DataFrame x, int time_threshold, double ratio_threshold) {
std::vector<int> result_i, result_k;
IntegerVector tunit(wrap(x["tunit"]));
std::vector< LogicalVector > targets;
for(int i = 1;i < x.ncol();i++) {
targets.push_back(LogicalVector(wrap(x[i])));
}
double threshold = time_threshold * ratio_threshold;
std::vector<double> count(targets.size(), 0);
for(int i = 0;i < tunit.size();i++) {
const int current_t(tunit[i]);
std::fill(count.begin(), count.end(), 0.0);
for(int j = i;j < tunit.size();j++) {
if (tunit[j] - current_t >= time_threshold) break;
for(int k = 0;k < targets.size();k++) {
if (targets[k][j]) count[k] += 1;
}
}
for(int k = 0;k < count.size();k++) {
if (count[k] > threshold) {
result_i.push_back(i + 1);
result_k.push_back(k + 1);
}
}
}
return List::create(
wrap(result_i),
wrap(result_k)
);
}
library(magrittr)
library(RSQLite)
library(parallel)
library(dplyr)
library(data.table)
.tbs <- dbListTables(db) %>%
grep(pattern = "^cem_", value = TRUE)
cl <- makeCluster(length(.tbs))
init.cluster <- function(cl) {
parallel:::checkCluster(cl)
clusterEvalQ(cl, {
library(magrittr)
library(RSQLite)
library(parallel)
library(dplyr)
library(data.table)
NULL
})
invisible(NULL)
}
init.cluster(cl)
clusterExport(cl, "db")
clusterEvalQ(cl, {
db <- dbConnect(db)
NULL
})
# parLapply(cl, .tbs, function(.tb) {
# assign("tb.name", .tb, envir = globalenv())
# . <- strsplit(tb.name, "_")[[1]][2]
# year <- substring(., 1, 4)
# month <- substring(., 5, 6)
# . <- dbReadTable(db, .tb)
# . <- mutate(., time = sprintf("%s-%s-%02d %02d:00:00", year, month, day, hour) %>% as.POSIXct(tz = "Asia/Taipei"))
# . <- mutate(., weekday = format(time, "%w") %>% as.integer())
# . <- mutate(., key = sprintf("%d-%d-%d-%d", long, lat, weekday, hour))
# . <- data.table(.)
# setkeyv(., "key")
# assign("tb", ., envir = globalenv())
# NULL
# })
tb <- lapply(.tbs, function(.tb) {
. <- strsplit(.tb, "_")[[1]][2]
year <- substring(., 1, 4)
month <- substring(., 5, 6)
. <- dbReadTable(db, .tb)
. <- mutate(., time = sprintf("%s-%s-%02d %02d:00:00", year, month, day, hour) %>% as.POSIXct(tz = "Asia/Taipei"))
. <- mutate(., weekday = format(time, "%w") %>% as.integer())
. <- mutate(., key = sprintf("%d-%d-%d-%d", long, lat, weekday, hour))
}) %>%
rbindlist()
tb.key <- clusterEvalQ(cl, {
unique(tb$key)
}) %>%
do.call(what = c) %>%
unique()
tb.area <- strsplit(tb.key, "-") %>%
parLapply(cl = cl, head, 2) %>%
parSapply(cl = cl, paste, collapse = "-")
tb.ka <- data.frame(key = tb.key, area = tb.area)
tb.ka <- data.table(tb.ka, key = "area")
n <- unique(tb.area) %>% length()
gr.count <- 8
tb.area.list <- rep(1:gr.count, floor(n / gr.count)) %>%
`length<-`(n) %>%
sample() %>%
split(x = unique(tb.area))
tb.ka.list <- lapply(tb.area.list, function(.a) {
tb.ka[J(.a),nomatch = 0]
}) %>%
lapply(as.data.frame)
clusterExport(cl, "tb.ka.list")
clusterEvalQ(cl, {
tb.gr <- lapply(tb.ka.list, function(.k) {
tb[J(.k),nomatch=0]
})
NULL
})
dir.create(".cem.gr")
lapply(1:gr.count, function(.) dir.create(file.path(".cem.gr", .)))
clusterExport(cl, "gr.count")
clusterEvalQ(cl, {
for(i in 1:gr.count) {
saveRDS(tb.gr[[i]], file.path(".cem.gr", i, sprintf("%s.Rds", tb.name)))
}
NULL
})
stopCluster(cl)
rm(tb.ka, tb.gr, tb.lnglat, tb.area, tb.area.list, tb.gr.list, tb.ka.list, tb.key, tb.key.list);gc()
cl <- makeCluster(gr.count)
init.cluster(cl)
parLapply(cl, seq_len(gr.count), function(.i) {
.path <- dir(file.path(".cem.gr", .i), full.names = TRUE)
tb <- lapply(.path, readRDS) %>%
rbindlist()
assign("tb", tb, envir = globalenv())
gc()
NULL
}) %>%
invisible()
# generate input
input.threshold <- as.POSIXct("2018-05-20 00:00:00")
clusterExport(cl, "input.threshold")
clusterEvalQ(cl, {
tb.input <- filter(tb, time >= input.threshold) %>%
data.table(key = "key")
tb.history <- filter(tb, time < input.threshold) %>%
data.table(key = "key")
rm(tb)
gc()
NULL
}) %>%
invisible()
target <- colnames(tb) %>%
grep(pattern = "count|traffic", value = TRUE)
clusterExport(cl, "target")
clusterEvalQ(cl, {
tb.history.avg <- tb.history[,lapply(.SD, mean), by = key, .SDcols = target]
colnames(tb.history.avg) <- c("key", sprintf("%s.avg", colnames(tb.history.avg) %>% tail(-1)))
tb.history.sd <- tb.history[,lapply(.SD, sd), by = key, .SDcols = target]
colnames(tb.history.sd) <- c("key", sprintf("%s.sd", colnames(tb.history.sd) %>% tail(-1)))
rm(tb.history);gc()
NULL
}) %>%
invisible()
clusterEvalQ(cl, {
tb.input.avg <- tb.history.avg[tb.input]
tb.input.avg.sd <- tb.history.sd[tb.input.avg]
# rm(tb.input.avg);gc()
NULL
}) %>%
invisible()
clusterEvalQ(cl, {
result <- data.frame(
key = tb.input.avg.sd$key,
time = tb.input.avg.sd$time,
long = tb.input.avg.sd$long,
lat = tb.input.avg.sd$lat,
weekday = tb.input.avg.sd$weekday,
hour = tb.input.avg.sd$hour,
stringsAsFactors = FALSE
) %>%
data.table(key = "key")
for(name in target) {
name.avg <- sprintf("%s.avg", name)
name.sd <- sprintf("%s.sd", name)
.value <- tb.input.avg.sd[[name]]
.upper <- tb.input.avg.sd[[name.avg]] + 3 * tb.input.avg.sd[[name.sd]]
.lower <- tb.input.avg.sd[[name.avg]] - 3 * tb.input.avg.sd[[name.sd]]
.is_alert <- sign((.value - .upper) * (.value - .lower)) != -1
name.result <- sprintf("%s.is_alert", name)
result[[name.result]] <- .is_alert
}
gc()
}) %>%
invisible()
parLapply(cl, 1:gr.count, function(.i) {
saveRDS(result, file.path(".cem.gr", .i, "result.Rds"))
NULL
}) %>%
invisible()
stopCluster(cl)
name.is_alert <- sprintf("%s.is_alert", target)
result$alert_ratio <- lapply(name.is_alert, function(x) result[[x]]) %>%
Reduce(f = `+`) %>%
`/`(length(name.is_alert))
alert_ratio.threshold <- .1
result.filtered <- filter(result, alert_ratio > alert_ratio.threshold)
# time based clustering
time.threshold <- 24
time_ratio.threshold <- .8
result.time <- group_by(result.filtered, long, lat) %>%
do({
.df <- .
if (nrow(.df) < time.threshold) data.frame() else {
.df <- arrange(.df, time)
.x <- cbind(
tunit = difftime(.df$time , .df$time[1], units = "hour") %>% as.integer(),
.df[,name.is_alert]
)
.i <- .time_clustering(.x, time.threshold, time_ratio.threshold)
if (length(.i[[1]]) == 0) data.frame() else {
.k <- .i[[2]]
.i <- .i[[1]]
.r <- .df[unique(.i),]
.i <- match(.i, unique(.i))
.r[,name.is_alert][] <- FALSE
for(.l in seq_along(.i)) {
.r[[name.is_alert[.k[.l]]]][.i[.l]] <- TRUE
}
.r$alert_ratio <- NULL
.r
}
}
})
# space clustering
library(dbscan)
result.time$tunit <- difftime(result.time$time, min(result.time$time), units = "hour") %>%
as.numeric()
g <- select(result.time, long, lat, tunit) %>%
as.matrix() %>%
dbscan(eps = 10, minPts = 5)
## -- dev
function() {
tb <- clusterEvalQ(cl, head(tb)) %>% do.call(what = rbind)
clusterEvalQ(cl, head(tb.input))
clusterEvalQ(cl, rm(tb))
clusterEvalQ(cl, gc());gc()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment