Skip to content

Instantly share code, notes, and snippets.

@snopoke
Created June 13, 2019 11:41
Show Gist options
  • Save snopoke/08ca7cb40ac04350893fc8d02dc15104 to your computer and use it in GitHub Desktop.
Save snopoke/08ca7cb40ac04350893fc8d02dc15104 to your computer and use it in GitHub Desktop.
PostgreSQL log parsing into SQL
# Load and parse CSV logs from PostgreSQL
import os
import re
import sys
import postgres_copy
import six
import sqlalchemy
CREATE_LOG_TABLE = """
CREATE TABLE if not exists postgres_log
(
log_time timestamp(3) with time zone,
user_name text,
database_name text,
process_id integer,
connection_from text,
session_id text,
session_line_num bigint,
command_tag text,
session_start_time timestamp with time zone,
virtual_transaction_id text,
transaction_id bigint,
error_severity text,
sql_state_code text,
message text,
detail text,
hint text,
internal_query text,
internal_query_pos integer,
context text,
query text,
query_pos integer,
location text,
application_name text,
PRIMARY KEY (session_id, session_line_num)
);
"""
CREATE_PARSED_TABLE = """
create table if not exists pg_log_parsed(
session_id text,
session_line_num bigint,
log_time timestamp(3) with time zone,
database_name text,
duration float,
query text,
params text,
PRIMARY KEY (session_id, session_line_num)
);
"""
def _create_tables(engine):
with engine.being() as conn:
conn.execute(CREATE_LOG_TABLE)
conn.execute(CREATE_PARSED_TABLE)
def _insert_raw_logs(log_path, engine):
metadata = sqlalchemy.MetaData(bind=engine)
metadata.reflect(bind=engine, extend_existing=True)
table = metadata.tables['postgres_log']
with open(log_path, 'r') as f:
postgres_copy.copy_from(
f, table, engine, format='csv' if six.PY3 else b'csv',
null='' if six.PY3 else b''
)
def parse_logs(engine):
query_rx = re.compile(r'duration: ([\d.]+).*(?:execute [\w<>]*|statement): (.*)')
params_rx = re.compile(r'parameters: (.*)')
with engine.begin() as conn:
res = conn.execute(
"select session_id, session_line_num, log_time, database_name, message, detail"
" from postgres_log where command_tag in ('SELECT', 'COPY')"
)
for row in res:
duration, query, params = None, None, None
match = query_rx.match(row.message)
if match:
duration, query = match.groups()
if row.detail:
match = params_rx.match(row.detail)
if match:
params = match.groups()[0]
conn.execute('insert into pg_log_parsed values(%s, %s, %s, %s, %s, %s, %s)', [
row.session_id,
row.session_line_num,
row.log_time,
row.database_name,
float(duration) if duration else None,
query,
params
])
if __name__ == '__main__':
args = sys.argv[1:]
if len(args) != 2:
print('Usage: pg_log_loader [path to log dir] [db connection url]')
log_path, db_url = args
engine = sqlalchemy.create_engine(db_url)
for file in os.listdir(log_path):
fpath = os.path.join(log_path, file)
if os.path.isfile(fpath) and file.endswith('.csv'):
_insert_raw_logs(fpath, engine)
parse_logs(engine)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment