Skip to content

Instantly share code, notes, and snippets.

@tallcoleman
Last active March 28, 2026 23:44
Show Gist options
  • Select an option

  • Save tallcoleman/6771559900a856ee0938940eca1bd1e9 to your computer and use it in GitHub Desktop.

Select an option

Save tallcoleman/6771559900a856ee0938940eca1bd1e9 to your computer and use it in GitHub Desktop.
Data formatting script for migrating from Umami cloud to self-hosted
# /// script
# requires-python = ">=3.14"
# dependencies = [
# "pandas>=3.0.0",
# ]
# ///
import uuid
from datetime import datetime
from pathlib import Path
from typing import List
from zoneinfo import ZoneInfo
import pandas as pd
# HOW TO USE:
# - configure the four variables below
# - run `uv run umami_import.py`
# Enter the website ID from your new self-hosted instance
NEW_WEBSITE_ID = ""
# Folder where the export files from your cloud instance are saved
EXPORT_FOLDER = ""
# Folder where you want to save your generated import files
IMPORT_FOLDER = ""
# Optional timezone shift (recommended if migrating from "us" Umami cloud, put None if migrating from "eu" Umami cloud)
SHIFT_TIMEZONE_TO = ZoneInfo("America/New_York")
# Based on scripts from https://github.com/RoversX/umami-csv-import-script with several improvements
# format for created_at column (should not need to edit)
DATE_TIME_FORMAT = r"%Y-%m-%d %H:%M:%S"
def main(
website_id_new: str,
shift_to_timezone: ZoneInfo | None,
website_event_old_file: Path,
session_new_file: Path,
website_event_new_file: Path,
event_data_old_file: Path,
event_data_new_file: Path,
) -> None:
"""Runs all table generation functions."""
generate_website_event(
website_id_new=website_id_new,
shift_to_timezone=shift_to_timezone,
website_event_old_file=website_event_old_file,
website_event_new_file=website_event_new_file,
)
generate_session(
website_id_new=website_id_new,
shift_to_timezone=shift_to_timezone,
website_event_old_file=website_event_old_file,
session_new_file=session_new_file,
)
generate_event_data(
website_id_new=website_id_new,
shift_to_timezone=shift_to_timezone,
website_event_old_file=website_event_old_file,
event_data_old_file=event_data_old_file,
event_data_new_file=event_data_new_file,
)
def shift_timezone(
df: pd.DataFrame,
datetime_cols: List[str],
shift_to_timezone: ZoneInfo,
datetime_format: str = DATE_TIME_FORMAT,
context_description: str | None = None,
) -> pd.DataFrame:
"""Shifts datetime values based on the difference between UTC and the specified timezone. Assumes datetimes are represented as strings, and retains the datetime string format specified."""
for col in datetime_cols:
df[col] = [
datetime.strptime(dt, datetime_format)
.replace(tzinfo=ZoneInfo("UTC"))
.astimezone(shift_to_timezone)
.strftime(datetime_format)
for dt in df[col]
]
# log information about the time zone shift
print(
"",
context_description,
f"Shifting time zones from UTC to {shift_to_timezone}",
"Adjusted frequency breakdown of events by hour in output time:",
df[col].str.extract(r"\s(\d{2}):").value_counts().sort_index(),
"",
sep="\n",
)
return df
def generate_website_event(
website_id_new: str,
shift_to_timezone: ZoneInfo | None,
website_event_old_file: Path,
website_event_new_file: Path,
):
"""Generate a new website_event table using the website_event file from the Umami data export. Replaces the old website_id with website_id_new."""
# Load the original CSV file
website_event_old = pd.read_csv(website_event_old_file)
# Update the website_id column with the user-provided website ID
website_event_old["website_id"] = website_id_new
# Define the columns required for the website_event table
# Check in psql with `\d website_event`
# Must be in exactly the same order as the table schema
website_event_columns = [
"event_id",
"website_id",
"session_id",
"created_at",
"url_path",
"url_query",
"referrer_path",
"referrer_query",
"referrer_domain",
"page_title",
"event_type",
"event_name",
"visit_id",
"tag",
"fbclid",
"gclid",
"li_fat_id",
"msclkid",
"ttclid",
"twclid",
"utm_campaign",
"utm_content",
"utm_medium",
"utm_source",
"utm_term",
"hostname",
]
# Create a new DataFrame for the website_event data with the required columns
df_website_event = website_event_old[website_event_columns]
website_event_unique = df_website_event.groupby(
"event_id", as_index=False
).aggregate("first")
# Convert timestamps if needed
website_event_timeshifted = (
shift_timezone(
website_event_unique,
datetime_cols=["created_at"],
shift_to_timezone=shift_to_timezone,
context_description="website_event table",
)
if shift_to_timezone is not None
else website_event_unique
)
# Save the new website_event data to a CSV file
website_event_new_file.parent.mkdir(exist_ok=True, parents=True)
website_event_timeshifted.to_csv(website_event_new_file, index=False)
print(f"Successfully generated {website_event_new_file}")
def generate_session(
website_id_new: str,
shift_to_timezone: ZoneInfo | None,
website_event_old_file: Path,
session_new_file: Path,
):
"""Generate a new session table using the website_event file from the Umami data export. Replaces the old website_id with website_id_new."""
# Load the original CSV file
website_event_old = pd.read_csv(website_event_old_file)
# Update the website_id column with the user-provided website ID
website_event_old["website_id"] = website_id_new
# Define the columns required for the session table
# Check in psql with `\d session`
# Must be in exactly the same order as the table schema
sessions_aggregations = {
# 'session_id' will be the key for the aggregation
"website_id": "first",
"browser": "first",
"os": "first",
"device": "first",
"screen": "first",
"language": "first",
"country": "first",
"region": "first",
"city": "first",
"created_at": "min", # pick first date-time
"distinct_id": "first",
}
unique_sessions = website_event_old.groupby("session_id", as_index=False).aggregate(
sessions_aggregations
)
# Convert timestamps if needed
sessions_timeshifted = (
shift_timezone(
unique_sessions,
datetime_cols=["created_at"],
shift_to_timezone=shift_to_timezone,
context_description="session table",
)
if shift_to_timezone is not None
else unique_sessions
)
# Save the new session data to a CSV file
session_new_file.parent.mkdir(exist_ok=True, parents=True)
sessions_timeshifted.to_csv(session_new_file, index=False)
print(f"Successfully generated {session_new_file}")
def generate_event_data(
website_id_new: str,
shift_to_timezone: ZoneInfo | None,
website_event_old_file: Path,
event_data_old_file: Path,
event_data_new_file: Path,
):
"""
Generate a new event_data table using the website_event and event_data files from the Umami data export. Replaces the old website_id with website_id_new.
The event_id column from the event_data export file may not always match event_id entries from the website_event export file. If this is the case, the function will try to find a match based on unique timestamps if possible. If there are still event_data rows that are not successfully matched to website_event rows, the function will print a warning about it and output a file of the un-matched event_data rows.
"""
# skip if there is no event_data_old file
if not event_data_old_file.exists():
print(
f"No event_data export file found at {event_data_old_file}, skipping event_data import file generation."
)
return
event_data_old = pd.read_csv(event_data_old_file)
# skip if there is no event data
if len(event_data_old) == 0:
print(
f"Event data export file found at {event_data_old_file} is empty, skipping event event_data import file generation."
)
return
website_event_old = pd.read_csv(website_event_old_file)
# Check in psql with `\d event_data`
# Must be in exactly the same order as the table schema
event_data_columns = [
"event_data_id",
"website_id",
"website_event_id",
"data_key",
"string_value",
"number_value",
"date_value",
"data_type",
"created_at",
]
# website_event_id is further checked to make sure it matches website_event table, so column name is prefixed with '_' here
event_data_new = pd.DataFrame().assign(
event_data_id=[uuid.uuid4() for _ in range(len(event_data_old.index))],
website_id=website_id_new,
_website_event_id=event_data_old["event_id"],
data_key=event_data_old["data_key"],
string_value=event_data_old["string_value"],
number_value=event_data_old["number_value"],
date_value=event_data_old["date_value"],
data_type=event_data_old["data_type"],
created_at=event_data_old["created_at"],
)
# pull matches betweeen _website_event_id and website_event.event_id
# both direct matches and cases where there is a 1:1 match using created_at
event_data_new = event_data_new.merge(
pd.DataFrame().assign(
_website_event_id=website_event_old["event_id"],
_weid_match=website_event_old["event_id"],
),
how="left",
on="_website_event_id",
).merge(
pd.DataFrame()
.assign(
created_at=website_event_old["created_at"],
_weid_created_at_match=website_event_old["event_id"],
)
.drop_duplicates(subset=["created_at"], keep=False),
how="left",
on="created_at",
)
# assign website_event_id based on direct matches with a fallback to matches based on 1:1 created_at matches
event_data_new = event_data_new.assign(
website_event_id=event_data_new["_weid_match"].fillna(
event_data_new["_weid_created_at_match"]
)
)
# replace \N string for null with None
event_data_new = event_data_new.replace(r"\N", None)
# set column order to match db schema
event_data_new = event_data_new[event_data_columns]
# Convert timestamps if needed
event_data_timeshifted = (
shift_timezone(
event_data_new,
datetime_cols=["created_at"],
shift_to_timezone=shift_to_timezone,
context_description="event_data table",
)
if shift_to_timezone is not None
else event_data_new
)
# warn and save a list of unmatched events, if there are any
unmatched_event_data = event_data_timeshifted[
event_data_timeshifted["website_event_id"].isna()
]
if len(unmatched_event_data) > 0:
unmatched_path = event_data_new_file.with_name(
event_data_new_file.stem + "_unmatched" + event_data_new_file.suffix
)
print(
f"{len(unmatched_event_data)} event_data rows were unable to be matched to a website event. Saving list of unmatched entries to {unmatched_path}"
)
unmatched_path.parent.mkdir(exist_ok=True, parents=True)
unmatched_event_data.to_csv(unmatched_path, index=False)
# save validated event data to csv
print(f"Successfully generated {event_data_new_file}")
matched_event_data = event_data_timeshifted[
~event_data_timeshifted["website_event_id"].isna()
]
event_data_new_file.parent.mkdir(exist_ok=True, parents=True)
matched_event_data.to_csv(event_data_new_file, index=False)
if __name__ == "__main__":
main(
website_id_new=NEW_WEBSITE_ID,
shift_to_timezone=SHIFT_TIMEZONE_TO,
website_event_old_file=Path(EXPORT_FOLDER) / "website_event.csv",
session_new_file=Path(IMPORT_FOLDER) / "session_new.csv",
website_event_new_file=Path(IMPORT_FOLDER) / "website_event_new.csv",
event_data_old_file=Path(EXPORT_FOLDER) / "event_data.csv",
event_data_new_file=Path(IMPORT_FOLDER) / "event_data_new.csv",
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment