Skip to content

Instantly share code, notes, and snippets.

@vsuharnikov
Last active January 4, 2024 08:17
Show Gist options
  • Save vsuharnikov/80784b461bc391fae2492788f64536ec to your computer and use it in GitHub Desktop.
Save vsuharnikov/80784b461bc391fae2492788f64536ec to your computer and use it in GitHub Desktop.
Parse LOG file tables of RocksDB
import numbers
import re
from datetime import datetime
import numpy as np
import pandas as pd
def convert_to_bytes(size_str):
"""
Convert sizes like '10 b', '123 kb', '44 mb' to bytes.
Parameters:
size_str (str): A string representing the size with unit.
Returns:
int: The size in bytes.
"""
units = {"B": 1, "KB": 1024, "MB": 1024**2, "GB": 1024**3, "TB": 1024**4}
if isinstance(size_str, str):
size, unit = size_str.split()
return np.int64(np.float64(size) * units[unit])
else:
return size_str
def convert_to_number(size_str):
"""
Convert sizes like '1', '32m', '2566k' to numerical values. 'm' for millions, 'k' for thousands.
Parameters:
size_str (str): A string representing the size with unit.
Returns:
int: The numerical value.
"""
units = {"K": np.uint64(1000), "M": np.uint64(1000000)}
if isinstance(size_str, str) and size_str[-1] in units:
return np.uint64(size_str[:-1]) * units[size_str[-1]]
else:
return np.uint64(size_str)
# https://stackoverflow.com/a/67403354
def auto_opt_pd_dtypes(df: pd.DataFrame):
"""
Automatically downcast numeric dtypes in a DataFrame to the smallest possible size.
Parameters:
df (pd.DataFrame): The DataFrame to be optimized.
This function modifies the DataFrame in place.
"""
for col in df.columns:
# integers
if issubclass(df[col].dtypes.type, numbers.Integral):
if df[col].min() >= 0:
df[col] = pd.to_numeric(df[col], downcast="unsigned")
else:
df[col] = pd.to_numeric(df[col], downcast="integer")
# other real numbers
elif issubclass(df[col].dtypes.type, numbers.Real):
df[col] = pd.to_numeric(df[col], downcast="float")
class RocksDBLogParser:
def __init__(self, enable_stats=False, uptime_from=None, uptime_to=None):
"""
Initialize the RocksDBLogParser object.
Parameters:
enable_stats (bool): If True, enables parsing of statistics.
uptime_from (float): The minimum uptime from which to start parsing.
uptime_to (float): The maximum uptime to which parsing is allowed.
"""
self.enable_stats = enable_stats
self.uptime_from = uptime_from
self.uptime_to = uptime_to
self.category_columns = ["Level", "Priority", "CF"]
self.state = "expect_start_time"
self.compaction = pd.DataFrame()
self.stats = pd.DataFrame()
self.start_date = None
self.uptime = 0.0
self.column_family = ""
self.columns = []
self.rows = []
self.curr_stats = {}
self.stat_rows = []
self.num_of_tables = {}
self.events = []
self.start_time_regex = re.compile(r'(\d{4}/\d{2}/\d{2}-\d{2}:\d{2}:\d{2}\.\d+) \d+ RocksDB version')
self.db_stats_regex = re.compile(r"^\*\* DB Stats \*\*")
self.uptime_regex = re.compile(r"^Uptime\(secs\):\s+(\d+\.\d+)\s+total")
self.heading_regex = re.compile(r"^\*\* Compaction Stats \[(.*?)\]")
self.columns_regex = re.compile(r"\s{1,}")
self.row_regex = re.compile(r"\s+(?![BKMGT]{2}\b)")
self.stats_heading_regex = re.compile(r'(\d{4}/\d{2}/\d{2}-\d{2}:\d{2}:\d{2}\.\d+).+STATISTICS:')
self.stat_regex = re.compile(
r"^\s?(?P<label>rocksdb\.[^\s]+)(?P<type_values> [^\n]+)$"
)
self.stat_value_regex = re.compile(
r"(?P<type>COUNT|SUM|P\d+) : (?P<value>[\d\.]+)"
)
self.event_regex = re.compile(
r'EVENT_LOG_v1.+"time_micros": (?P<time>\d+),.*"event": "(?P<event>[^"]+)"'
)
def parse_file(self, file_path):
"""
Parse the contents of a file line by line.
Parameters:
file_path (str): The path to the file that needs to be parsed.
Returns:
tuple: A tuple containing two DataFrames, one for compaction and one for statistics.
"""
with open(file_path, "r") as file:
for line in file:
if not self._parse_line(line.strip())
break
return self._finalize()
def _parse_line(self, line):
"""
Parse a single line from the input.
Parameters:
line (str): A line from the log or input file.
This method updates the internal state based on the parsed line.
Returns:
bool: False if parsing should be stopped.
"""
match = self.event_regex.search(line)
if match:
self.events.append([match.group("time"), match.group("event")])
# print(f'table.state: {self.state}')
match self.state:
case "uptime_limit":
return False
case "expect_start_time":
match = self.start_time_regex.search(line)
if match:
self.start_date = datetime.strptime(match.group(1), "%Y/%m/%d-%H:%M:%S.%f")
self.state = "expect_first_db_stats"
case "expect_first_db_stats" if self.db_stats_regex.search(line):
self.state = "expect_uptime"
case "expect_uptime":
match = self.uptime_regex.search(line)
if match:
self.uptime = float(match.group(1))
if self.uptime_from is None or self.uptime >= self.uptime_from:
if self.uptime_to is None or self.uptime <= self.uptime_to:
self.state = "expect_heading"
else:
self.state = "uptime_limit"
else:
return # Don't change the state, still waiting for uptime >= from
case "expect_heading":
match = self.heading_regex.search(line)
if match:
self.column_family = match.group(1)
self.state = "expect_header"
case "expect_heading_or_stats_header":
match = self.stats_heading_regex.search(line)
if match:
parsed_date = datetime.strptime(match.group(1), "%Y/%m/%d-%H:%M:%S.%f")
self.uptime = (parsed_date - self.start_date).total_seconds()
self.state = "expect_stats_row"
else:
match = self.heading_regex.search(line)
if match:
self.column_family = match.group(1)
self.state = "expect_header"
case "expect_header":
self.columns = self.columns_regex.split(line)
self.state = "skip_separator"
case "skip_separator":
self.state = "expect_row"
case "expect_row":
if line == "":
self.compaction = pd.concat(
[self.compaction, self._curr_compaction_to_pd()],
ignore_index=True,
)
self.rows = []
self.num_of_tables[self.column_family] = (
self.num_of_tables.get(self.column_family, 0) + 1
)
self.state = (
"expect_heading_or_stats_header"
if self.enable_stats
else "expect_heading"
)
else:
row = self.row_regex.split(line)
self.rows.append(row)
case "expect_stats_row":
match = self.stat_regex.search(line)
if match:
for m in self.stat_value_regex.finditer(match["type_values"]):
k = match.group("label") + "." + m.group("type")
self.curr_stats[k] = m.group("value")
else:
self.curr_stats["CF"] = self.column_family
self.curr_stats["Uptime(s)"] = str(self.uptime)
self.stat_rows.append(self.curr_stats)
self.curr_stats = {}
self.state = "expect_heading"
return True
def _finalize(self):
"""
Finalize the parsing process and prepare data for output.
Returns:
tuple: A tuple containing two DataFrames, one for compaction and one for statistics.
"""
for c in self.category_columns:
if c in self.compaction:
self.compaction[c] = self.compaction[c].astype("category")
self.compaction[["Files(cnt)", "Files(comp)"]] = self.compaction["Files"].apply(
lambda x: pd.Series(RocksDBLogParser._split_files(x))
)
stats_df = pd.DataFrame(self.stat_rows)
self.stat_rows = []
for c in stats_df.columns:
if c == "Files" or c in self.category_columns:
continue
stats_df[c] = pd.to_numeric(stats_df[c])
auto_opt_pd_dtypes(stats_df)
events_df = pd.DataFrame(self.events, columns=["Timestamp", "Name"])
events_df["Name"] = events_df["Name"].astype("category")
events_df["Timestamp"] = events_df["Timestamp"].astype(np.uint64)
start_time = events_df["Timestamp"].iloc[0]
events_df["Uptime(s)"] = (events_df["Timestamp"] - start_time) / 1000000
auto_opt_pd_dtypes(events_df)
return {
"compaction": self.compaction.copy(),
"events": events_df,
"stats": stats_df.copy(),
}
def _curr_compaction_to_pd(self):
"""
Convert the current compaction data to a pandas DataFrame.
Returns:
pd.DataFrame: A DataFrame representing the current compaction data.
"""
df = pd.DataFrame(self.rows, columns=self.columns)
df["CF"] = self.column_family
df["Uptime(s)"] = float(self.uptime)
if "Size" in df.columns:
df["Size"] = df["Size"].apply(convert_to_bytes)
for c in ["KeyIn", "KeyDrop"]:
if c in df.columns:
df[c] = df[c].apply(convert_to_number)
gb = np.float64(1024**3)
for c in [
"Read(GB)",
"Rn(GB)",
"Rnp1(GB)",
"Write(GB)",
"Wnew(GB)",
"Moved(GB)",
"Rblob(GB)",
"Wblob(GB)",
]:
if c in df.columns:
df[c] = df[c].apply(lambda x: np.int64(np.float64(x) * gb))
for c in df.columns:
if c == "Files" or c in self.category_columns:
continue
df[c] = pd.to_numeric(df[c])
auto_opt_pd_dtypes(df)
return df
@staticmethod
def _split_files(file_str):
"""
Split a string formatted as 'count/comp' into two separate values.
Parameters:
file_str (str): A string representing file count and compaction.
Returns:
tuple: A tuple of two integers (count, comp).
"""
cnt, comp = file_str.split("/")
return np.uint32(cnt), np.uint32(comp)
r = RocksDBLogParser(enable_stats = True).parse_file('path/to/LOG/file')
r['compaction'] # | stats | events
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment