Skip to content

Instantly share code, notes, and snippets.

@jerryan999
Created May 17, 2023 11:44
Show Gist options
  • Save jerryan999/c07e150d4f09503e0a7792ffe2ba3a8d to your computer and use it in GitHub Desktop.
Save jerryan999/c07e150d4f09503e0a7792ffe2ba3a8d to your computer and use it in GitHub Desktop.
how to use duckdb to query local or s3 files and then do some analysis
import duckdb
import json
import os
import shutil
import time
import gzip
import os
conn = duckdb.connect(database="cloudfront-log2.db", read_only=False)
# 安装插件,配置S3等
conn.execute('''
INSTALL 'httpfs';
LOAD 'httpfs';
INSTALL json;
LOAD json;
SET s3_region='ap-northeast-1';
SET s3_access_key_id='';
SET s3_secret_access_key='';
SET enable_progress_bar = true;
''')
# create table
conn.execute('''
CREATE TABLE logs (
date STRING,
time STRING,
x_edge_location STRING,
sc_bytes STRING,
c_ip STRING,
cs_method STRING,
cs_host STRING,
cs_uri_stem STRING,
sc_status STRING,
cs_Referer STRING,
cs_User_Agent STRING,
cs_uri_query STRING,
cs_Cookie STRING,
x_edge_result_type STRING,
x_edge_request_id STRING,
x_host_header STRING,
cs_protocol STRING,
cs_bytes STRING,
time_taken STRING,
x_forwarded_for STRING,
ssl_protocol STRING,
ssl_cipher STRING,
x_edge_response_result_type STRING,
cs_protocol_version STRING,
fle_status STRING,
fle_encrypted_fields STRING,
c_port STRING,
time_to_first_byte STRING,
x_edge_detailed_result_type STRING,
sc_content_type STRING,
sc_content_len STRING,
sc_range_start STRING,
sc_range_end STRING
)
''')
file_regs = [
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-24*.gz',
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-25*.gz',
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-26*.gz',
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-27*.gz',
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-28*.gz',
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-29*.gz',
'/Users/zhibin.an/cloudfront/E1JPIDV7XCO3TK.2023-04-30*.gz',
]
# insert data
for file_reg in file_regs:
print("file_reg: ", file_reg)
sql = f"INSERT INTO logs SELECT * FROM read_csv('{file_reg}', skip=2, sep='\t', ignore_errors=true, auto_detect=TRUE)"
conn.execute(sql)
# fetch some data
df = conn.execute(f"SELECT * FROM logs limit 10").fetchdf()
print(df.head(10))
# total count
d = conn.execute(f"SELECT count(*) FROM logs").fetchall()
print(d)
# group by ( Edge_Location ) and (uri) and order by count desc
locations = ['DUB56-P1','DUB2-C1','BAH52-C1','BAH53-C1']
for loc in locations:
sql = f"SELECT \
x_edge_location, \
cs_uri_stem, \
count(*) as cnt \
FROM logs \
WHERE x_edge_location ='{loc}' \
GROUP BY 1, 2 \
ORDER BY cnt DESC"
df = conn.execute(sql).fetchdf()
print('------------------ {} start ---------------------'.format(loc))
print(df.head(30))
print('------------------- {} end ----------------------'.format(loc))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment