Last active
August 15, 2016 12:10
-
-
Save Brontojoris/8a7053b363857701151fb4d7106415c0 to your computer and use it in GitHub Desktop.
Adobe Analytics Clickstream Importer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Load required libraries | |
library(DBI) | |
library(RMySQL) | |
# Init settings | |
conn <- dbConnect(RMySQL::MySQL(), host="localhost", user="rstudio", password="rstudio", dbname="rstudio"); | |
ftp = list( | |
username = "username", | |
password = "password", | |
hostname = "ftp host", | |
path = "/" | |
) | |
# 1. Mirror files from the Adobe FTP Server | |
# Todo: | |
# * Sync only files we can't find in the clickstream table | |
# * Download files in parallel. | |
# LFTP tips found at: http://unix.stackexchange.com/questions/98393/lftp-exclude-syntax-confusion | |
# To exclude .svn directories use: | |
# mirror -e -x ^\.svn/$ /myhome/ /elsewhere | |
# To exclude all folders starting with a dot: | |
# mirror -e -x /\..+/$ /myhome/ /elsewhere | |
# To exclude all *.bin AND *.so files: | |
# mirror -e -X *.bin -X *.so /myhome/ /elsewhere | |
baseFolder = "~/Datafeeds" | |
lftpCommand = paste0("lftp -c \"open -u ", ftp$username, ",", ftp$password," sftp://", ftp$hostname, "; mirror -c -e ", ftp$path, " ", baseFolder, "\"") | |
cat("Download files:", lftpCommand) | |
system(lftpCommand) | |
# 2. Setting directory to FTP folder where files incoming from Adobe ---- | |
setwd(baseFolder) | |
# 3. Sort files into three separate folders ---- | |
hitData = "./hit_data" | |
metaDataFolder="./metadata" | |
columnDataFolder = "./columns" | |
# 4. Optimisation. Only work on files that aren't already in the DB ---- | |
# Get a list of previously imported datafeeds so we don't put duplicate data into our DB ---- | |
dbResult = dbSendQuery(conn, "SELECT DISTINCT file FROM clickstream") | |
previouslyImported = dbFetch(dbResult) | |
dbClearResult(dbResult) | |
# Compare the files on disk to those in the DB. | |
filesToImport=cbind( | |
as.data.frame(list.files(baseFolder,".tar.gz")), | |
as.data.frame(sub(".tar.gz", ".tsv", list.files(baseFolder,".tar.gz"))) | |
) | |
colnames(filesToImport) = c("filesOnDisk","filesToCompare") | |
# Dataframe with just those we've not seen before. | |
filesToImport=subset(filesToImport, !(filesToImport$filesToCompare %in% previouslyImported$file )) | |
print("Importing the following files:") | |
print(filesToImport$filesOnDisk) | |
# 5. Extract metadata; browsers, colour_deption, country, languages, etc ---- | |
metaDataFiles = c("browser_type.tsv","browser.tsv","color_depth.tsv","connection_type.tsv","country.tsv","javascript_version.tsv","languages.tsv","operating_systems.tsv","plugins.tsv","referrer_type.tsv","resolution.tsv","search_engines.tsv") | |
lapply(as.character(filesToImport$filesOnDisk), function(x){ | |
if(grepl(pattern = "000000.tar.gz", x) || !dir.exists(metaDataFolder)){ | |
print(paste('Extracting metadata from',x)) | |
untar(x, files=metaDataFiles, exdir=metaDataFolder) | |
} | |
}) | |
# 6. Importing metadata into MySQL ---- | |
setwd(paste0(baseFolder, "/", metaDataFolder)) | |
loadlookup <- function(tblname){ | |
df <- read.csv2(paste(tblname,".tsv", sep=""), sep = "\t", header = FALSE, stringsAsFactors = FALSE) | |
if(dbExistsTable(conn, tblname)){ dbRemoveTable(conn, tblname) } | |
dbWriteTable(conn, name=tblname, value=df, row.names=FALSE,overwrite=TRUE,append=FALSE ) | |
dbSendQuery(conn, paste0("ALTER TABLE ", tblname, | |
" CHANGE COLUMN V1 id BIGINT,", | |
" CHANGE COLUMN V2 label varchar(255),", | |
" ADD INDEX `idx_", tblname, "` USING BTREE (label)" | |
)) | |
dbGetQuery(conn, paste0("OPTIMIZE TABLE ", tblname)) | |
} | |
metaDataTables = unlist(lapply(list.files(pattern = "*.tsv"), function(x){print(sub(".tsv","",x))})) | |
for(file in list.files(pattern = "*.tsv")){ | |
print(file) | |
for(tbl in metaDataTables){ | |
loadlookup(tbl) | |
} | |
} | |
# 7. Create some views for convenience | |
if(dbExistsTable(conn, "v_browsers")==FALSE){ | |
dbSendQuery(conn, paste0("CREATE VIEW `rstudio`.`v_browsers` AS ", | |
"SELECT `browser`.`id` AS `browser_id`, ", | |
"`browser`.`label` AS `browser`, ", | |
"`browser_type`.`label` AS `browser_type` ", | |
"FROM (`browser` join `browser_type` on((`browser`.`id` = `browser_type`.`id`)))" | |
)) | |
} | |
# 8. Extract column data and events ---- | |
columnData = c("column_headers.tsv","event.tsv") | |
lapply(as.character(filesToImport$filesOnDisk), function(x){ | |
if(grepl(pattern = "000000.tar.gz", x) || !dir.exists(columnDataFolder)){ | |
print(paste('Extracting column data from',x)) | |
untar(x, files=columnData, exdir=columnDataFolder) | |
} | |
}) | |
# 9. Extract all Hit Data and save to a separate folder ---- | |
lapply(as.character(filesToImport$filesOnDisk), function(x){ | |
untar(x, files="hit_data.tsv", exdir=hitData ); | |
hitDataFile = paste0(hitData, "/hit_data.tsv"); | |
# rename the hit_data file to match enclosing filename. | |
newHitDataFile = paste0(hitData, "/", sub(".tar.gz",".tsv",x)) | |
file.rename(hitDataFile, newHitDataFile) | |
}) | |
# 10. Load clickstream data into DB ---- | |
#Set directory to avoid having to use paste to build urls | |
setwd(paste0(baseFolder, "/", hitData)) | |
# Set column headers for server calls ---- | |
column_headers = unlist(colnames(read.delim( | |
paste0(baseFolder, "/", columnDataFolder, "/", "column_headers.tsv"), | |
stringsAsFactors=FALSE | |
))) | |
column_headers = append(column_headers, "file") | |
# During testing, always drop the clickstream table. ---- | |
#if(dbExistsTable(conn, "clickstream")){dbRemoveTable(conn, "clickstream")} | |
# 11. Loop over list of files selected for import ---- | |
for(file in filesToImport$file){ | |
print(file) | |
# During testing, just use first 5 rows of data so things load faster. | |
# hit_data = read.csv2(file, sep = "\t", header = FALSE, stringsAsFactors = FALSE, colClasses = "character", nrows=5) | |
# Use the below line for prod | |
hit_data = read.csv2(file, sep = "\t", header = FALSE, stringsAsFactors = FALSE, colClasses = "character") | |
# Add the filename | |
hit_data$file <- file | |
colnames(hit_data) = column_headers | |
hit_data = hit_data[ , !grepl( "^(evar|prop)" , names( hit_data ) ) ] | |
dbWriteTable(conn, name = 'clickstream', | |
value = hit_data, | |
field.types = list(df$column_type), | |
append = T, overwrite = F, col.names=column_headers) | |
rm(hit_data) | |
} | |
# Run analyze in MySQL so that query planner has accurate information | |
dbGetQuery(conn, "OPTIMIZE TABLE clickstream") | |
# Remove hit_data files. | |
setwd(paste0(baseFolder, "/", hitData)) | |
for(file in list.files(pattern = "*.tsv")){ | |
file.remove(file) | |
} | |
dbDisconnect(conn) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Import a folder of Adobe Analytics datafeed files into MySQL
Based on the script shared in the Adobe Analytics Clickstream Data Feed: Loading To Relational Database blog post by Randy Zwitch.
Have added a few niceties:
The script has been tweaked to use MySQL instead of PostGres, but it should be fairly easy to change it to any RDBMS supported by the DBI package.