Skip to content

Instantly share code, notes, and snippets.

@Brontojoris
Last active August 15, 2016 12:10
Show Gist options
  • Save Brontojoris/8a7053b363857701151fb4d7106415c0 to your computer and use it in GitHub Desktop.
Save Brontojoris/8a7053b363857701151fb4d7106415c0 to your computer and use it in GitHub Desktop.
Adobe Analytics Clickstream Importer
# Load required libraries
library(DBI)
library(RMySQL)
# Init settings
conn <- dbConnect(RMySQL::MySQL(), host="localhost", user="rstudio", password="rstudio", dbname="rstudio");
ftp = list(
username = "username",
password = "password",
hostname = "ftp host",
path = "/"
)
# 1. Mirror files from the Adobe FTP Server
# Todo:
# * Sync only files we can't find in the clickstream table
# * Download files in parallel.
# LFTP tips found at: http://unix.stackexchange.com/questions/98393/lftp-exclude-syntax-confusion
# To exclude .svn directories use:
# mirror -e -x ^\.svn/$ /myhome/ /elsewhere
# To exclude all folders starting with a dot:
# mirror -e -x /\..+/$ /myhome/ /elsewhere
# To exclude all *.bin AND *.so files:
# mirror -e -X *.bin -X *.so /myhome/ /elsewhere
baseFolder = "~/Datafeeds"
lftpCommand = paste0("lftp -c \"open -u ", ftp$username, ",", ftp$password," sftp://", ftp$hostname, "; mirror -c -e ", ftp$path, " ", baseFolder, "\"")
cat("Download files:", lftpCommand)
system(lftpCommand)
# 2. Setting directory to FTP folder where files incoming from Adobe ----
setwd(baseFolder)
# 3. Sort files into three separate folders ----
hitData = "./hit_data"
metaDataFolder="./metadata"
columnDataFolder = "./columns"
# 4. Optimisation. Only work on files that aren't already in the DB ----
# Get a list of previously imported datafeeds so we don't put duplicate data into our DB ----
dbResult = dbSendQuery(conn, "SELECT DISTINCT file FROM clickstream")
previouslyImported = dbFetch(dbResult)
dbClearResult(dbResult)
# Compare the files on disk to those in the DB.
filesToImport=cbind(
as.data.frame(list.files(baseFolder,".tar.gz")),
as.data.frame(sub(".tar.gz", ".tsv", list.files(baseFolder,".tar.gz")))
)
colnames(filesToImport) = c("filesOnDisk","filesToCompare")
# Dataframe with just those we've not seen before.
filesToImport=subset(filesToImport, !(filesToImport$filesToCompare %in% previouslyImported$file ))
print("Importing the following files:")
print(filesToImport$filesOnDisk)
# 5. Extract metadata; browsers, colour_deption, country, languages, etc ----
metaDataFiles = c("browser_type.tsv","browser.tsv","color_depth.tsv","connection_type.tsv","country.tsv","javascript_version.tsv","languages.tsv","operating_systems.tsv","plugins.tsv","referrer_type.tsv","resolution.tsv","search_engines.tsv")
lapply(as.character(filesToImport$filesOnDisk), function(x){
if(grepl(pattern = "000000.tar.gz", x) || !dir.exists(metaDataFolder)){
print(paste('Extracting metadata from',x))
untar(x, files=metaDataFiles, exdir=metaDataFolder)
}
})
# 6. Importing metadata into MySQL ----
setwd(paste0(baseFolder, "/", metaDataFolder))
loadlookup <- function(tblname){
df <- read.csv2(paste(tblname,".tsv", sep=""), sep = "\t", header = FALSE, stringsAsFactors = FALSE)
if(dbExistsTable(conn, tblname)){ dbRemoveTable(conn, tblname) }
dbWriteTable(conn, name=tblname, value=df, row.names=FALSE,overwrite=TRUE,append=FALSE )
dbSendQuery(conn, paste0("ALTER TABLE ", tblname,
" CHANGE COLUMN V1 id BIGINT,",
" CHANGE COLUMN V2 label varchar(255),",
" ADD INDEX `idx_", tblname, "` USING BTREE (label)"
))
dbGetQuery(conn, paste0("OPTIMIZE TABLE ", tblname))
}
metaDataTables = unlist(lapply(list.files(pattern = "*.tsv"), function(x){print(sub(".tsv","",x))}))
for(file in list.files(pattern = "*.tsv")){
print(file)
for(tbl in metaDataTables){
loadlookup(tbl)
}
}
# 7. Create some views for convenience
if(dbExistsTable(conn, "v_browsers")==FALSE){
dbSendQuery(conn, paste0("CREATE VIEW `rstudio`.`v_browsers` AS ",
"SELECT `browser`.`id` AS `browser_id`, ",
"`browser`.`label` AS `browser`, ",
"`browser_type`.`label` AS `browser_type` ",
"FROM (`browser` join `browser_type` on((`browser`.`id` = `browser_type`.`id`)))"
))
}
# 8. Extract column data and events ----
columnData = c("column_headers.tsv","event.tsv")
lapply(as.character(filesToImport$filesOnDisk), function(x){
if(grepl(pattern = "000000.tar.gz", x) || !dir.exists(columnDataFolder)){
print(paste('Extracting column data from',x))
untar(x, files=columnData, exdir=columnDataFolder)
}
})
# 9. Extract all Hit Data and save to a separate folder ----
lapply(as.character(filesToImport$filesOnDisk), function(x){
untar(x, files="hit_data.tsv", exdir=hitData );
hitDataFile = paste0(hitData, "/hit_data.tsv");
# rename the hit_data file to match enclosing filename.
newHitDataFile = paste0(hitData, "/", sub(".tar.gz",".tsv",x))
file.rename(hitDataFile, newHitDataFile)
})
# 10. Load clickstream data into DB ----
#Set directory to avoid having to use paste to build urls
setwd(paste0(baseFolder, "/", hitData))
# Set column headers for server calls ----
column_headers = unlist(colnames(read.delim(
paste0(baseFolder, "/", columnDataFolder, "/", "column_headers.tsv"),
stringsAsFactors=FALSE
)))
column_headers = append(column_headers, "file")
# During testing, always drop the clickstream table. ----
#if(dbExistsTable(conn, "clickstream")){dbRemoveTable(conn, "clickstream")}
# 11. Loop over list of files selected for import ----
for(file in filesToImport$file){
print(file)
# During testing, just use first 5 rows of data so things load faster.
# hit_data = read.csv2(file, sep = "\t", header = FALSE, stringsAsFactors = FALSE, colClasses = "character", nrows=5)
# Use the below line for prod
hit_data = read.csv2(file, sep = "\t", header = FALSE, stringsAsFactors = FALSE, colClasses = "character")
# Add the filename
hit_data$file <- file
colnames(hit_data) = column_headers
hit_data = hit_data[ , !grepl( "^(evar|prop)" , names( hit_data ) ) ]
dbWriteTable(conn, name = 'clickstream',
value = hit_data,
field.types = list(df$column_type),
append = T, overwrite = F, col.names=column_headers)
rm(hit_data)
}
# Run analyze in MySQL so that query planner has accurate information
dbGetQuery(conn, "OPTIMIZE TABLE clickstream")
# Remove hit_data files.
setwd(paste0(baseFolder, "/", hitData))
for(file in list.files(pattern = "*.tsv")){
file.remove(file)
}
dbDisconnect(conn)
@Brontojoris
Copy link
Author

Import a folder of Adobe Analytics datafeed files into MySQL

Based on the script shared in the Adobe Analytics Clickstream Data Feed: Loading To Relational Database blog post by Randy Zwitch.

Have added a few niceties:

  • Have got the column headers on the hit_data working.
  • Only load data into the DB that hasn't already been inserted.
  • Index the lookup tables.
  • Create some view tables for convenience

The script has been tweaked to use MySQL instead of PostGres, but it should be fairly easy to change it to any RDBMS supported by the DBI package.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment