Skip to content

Instantly share code, notes, and snippets.

@nathanleclaire
Created December 14, 2023 18:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nathanleclaire/508757d47ca2bd42d3c9bc5914ac52ff to your computer and use it in GitHub Desktop.
Save nathanleclaire/508757d47ca2bd42d3c9bc5914ac52ff to your computer and use it in GitHub Desktop.
project:
id: 8854c0fe-1367-4a83-8632-6193d2a20609
name: traces
defaults:
python_version: 3.11
import bauplan
all_cols = [
'resource_attributes.service.name',
'resource_attributes.cloud.provider',
'resource_attributes.cloud.region',
'resource_attributes.k8s.deployment.name',
'resource_attributes.k8s.cluster.name',
'resource_attributes.k8s.pod.name',
'resource_attributes.k8s.namespace.name',
'resource_attributes.k8s.container.name',
'span_attributes.load_generator.seq_num',
'span_attributes.version', 'span_attributes.customer',
'span_attributes.starter', 'span_traceId', 'span_spanId',
'span_parentSpanId', 'span_name', 'span_kind',
'span_startTimeUnixNano', 'span_endTimeUnixNano', 'span_status',
'resource_attributes.host.type', 'span_attributes.region',
'resource_attributes.host.name',
'resource_attributes.instrument.name',
'resource_attributes.http.method', 'resource_attributes.customer',
'resource_attributes.client.platform',
'span_attributes.falling-pond', 'span_attributes.restless-shadow',
'span_attributes.restless-sound', 'span_attributes.quiet-dream',
'span_attributes.purple-bush', 'span_attributes.spring-cherry',
'span_attributes.blue-field', 'span_attributes.muddy-butterfly',
'span_attributes.bold-fog', 'span_attributes.bold-mountain',
'span_durationNano', 'span_startDatetime', 'span_endDatetime',
]
def extract_attribute_value(attr):
value = attr['value']
if 'stringValue' in value:
return value['stringValue']
elif 'boolValue' in value:
return bool(value['boolValue'])
elif 'intValue' in value:
return int(value['intValue'])
else:
print(value)
return None
def extract_data_from_json(json_obj):
flattened_data = []
for resource_span in json_obj['resourceSpans']:
resource_attrs = {f"resource_attributes.{attr['key']}": extract_attribute_value(attr)
for attr in resource_span['resource'].get('attributes', [])}
for scope_span in resource_span['scopeSpans']:
for span in scope_span['spans']:
span_attrs = {f"span_attributes.{attr['key']}": extract_attribute_value(attr)
for attr in span.get('attributes', [])}
combined_data = {**resource_attrs, **span_attrs,
**{f"span_{k}": v for k, v in span.items() if k != 'attributes'}}
flattened_data.append(combined_data)
return flattened_data
def read_json_from_url(url):
import requests, json, polars as pl
response = requests.get(url)
if response.status_code != 200:
raise Exception(f'Failed to download JSON from {url}')
lines = response.text.strip().split('\n')
all_data = []
for line in lines:
if line:
json_obj = json.loads(line)
extracted_data = extract_data_from_json(json_obj)
all_data.extend(extracted_data)
df = pl.DataFrame(all_data)
if 'span_startTimeUnixNano' in df.columns:
df = df.with_columns(
df['span_startTimeUnixNano'].cast(pl.Int64).alias('span_startTimeUnixNano')
)
if 'span_endTimeUnixNano' in df.columns:
df = df.with_columns(
df['span_endTimeUnixNano'].cast(pl.Int64).alias('span_endTimeUnixNano')
)
if 'span_startTimeUnixNano' in df.columns and 'span_endTimeUnixNano' in df.columns:
span_durationNano = df['span_endTimeUnixNano'] - df['span_startTimeUnixNano']
df = df.with_columns(
span_durationNano.alias('span_durationNano'),
pl.col('span_startTimeUnixNano').cast(pl.Datetime).alias('span_startDatetime'),
pl.col('span_endTimeUnixNano').cast(pl.Datetime).alias('span_endDatetime')
)
return df
@bauplan.python('3.11', pip={
'polars': '0.19.19',
'requests': '2.31.0',
})
@bauplan.model(
columns=[],
materialize=True,
)
def traces_all(
data=bauplan.Model(
'root',
columns=[],
)
):
import polars as pl
import requests
import json
from io import StringIO
url = 'https://12-9-2023-hackathon-blobs.s3.us-east-1.amazonaws.com/traces.json'
df = read_json_from_url(url)
return df.to_arrow()
@bauplan.python('3.11', pip={
'polars': '0.19.19',
})
@bauplan.model(columns=[])
def summary(
trace_stats=bauplan.Model(
'trace_stats',
columns=[],
),
):
import polars as pl
stats_df = pl.from_arrow(trace_stats)
print("STATS")
print("TRACE COUNT:", len(stats_df))
print("ESTIMATED BYTES:", stats_df.estimated_size())
print("SLOWEST TRACES:")
print(stats_df.head(10))
return stats_df.to_arrow()
@bauplan.python('3.11', pip={
'polars': '0.19.19',
'scipy': '1.11.4',
'requests': '2.31.0',
'numpy': '1.26.2',
})
@bauplan.model(columns=[])
def system_samples_5pct(
samples=bauplan.Model(
'sampled_spans_5pct_system',
columns=[],
),
):
import polars as pl
from scipy.special import kl_div
import requests
import numpy as np
samples_df = pl.from_arrow(samples)
url = 'https://12-9-2023-hackathon-blobs.s3.us-east-1.amazonaws.com/traces.json'
all_traces_df = read_json_from_url(url)
samples_duration = samples_df['span_durationNano'].to_numpy()
all_traces_duration = all_traces_df['span_durationNano'].to_numpy()
bins = np.histogram_bin_edges(np.concatenate([samples_duration, all_traces_duration]), bins='auto')
samples_hist, _ = np.histogram(samples_duration, bins=bins, density=True)
all_traces_hist, _ = np.histogram(all_traces_duration, bins=bins, density=True)
epsilon = 1e-10
samples_hist += epsilon
all_traces_hist += epsilon
kl_values = kl_div(samples_hist, all_traces_hist)
kl_sum = np.sum(kl_values)
print("")
print("SYSTEM SUMMED KL DIVERGENCE (LATENCY)")
print(kl_sum*100000000)
print("")
return samples_df.to_arrow()
@bauplan.python('3.11', pip={
'polars': '0.19.19',
'scipy': '1.11.4',
'requests': '2.31.0',
'numpy': '1.26.2',
})
@bauplan.model(columns=[])
def bernoulli_samples_5pct(
samples=bauplan.Model(
'sampled_spans_5pct_bernoulli',
columns=[],
),
):
import polars as pl
from scipy.special import kl_div
import requests
import numpy as np
samples_df = pl.from_arrow(samples)
url = 'https://12-9-2023-hackathon-blobs.s3.us-east-1.amazonaws.com/traces.json'
all_traces_df = read_json_from_url(url)
samples_duration = samples_df['span_durationNano'].to_numpy()
all_traces_duration = all_traces_df['span_durationNano'].to_numpy()
bins = np.histogram_bin_edges(np.concatenate([samples_duration, all_traces_duration]), bins='auto')
samples_hist, _ = np.histogram(samples_duration, bins=bins, density=True)
all_traces_hist, _ = np.histogram(all_traces_duration, bins=bins, density=True)
epsilon = 1e-10
samples_hist += epsilon
all_traces_hist += epsilon
kl_values = kl_div(samples_hist, all_traces_hist)
kl_sum = np.sum(kl_values)
print("")
print("BERNOULLI SUMMED KL DIVERGENCE (LATENCY)")
print(kl_sum*100000000)
print("")
return samples_df.to_arrow()
-- not actually used for anything
-- data is loaded in over network
SELECT COUNT(*)
FROM
taxi_fhvhv
WHERE
pickup_datetime >= '2023-01-01T00:01:01-05:00' AND
pickup_datetime < '2023-01-01T00:01:02-05:00';
SELECT * FROM traces_all
WHERE span_traceId IN
(
SELECT span_traceId
FROM
(SELECT * FROM traces_all
USING SAMPLE bernoulli(5%))
);
SELECT * FROM traces_all
WHERE span_traceId IN
(
SELECT span_traceId
FROM
(SELECT * FROM traces_all
USING SAMPLE 5%)
);
SELECT
span_traceId AS trace_id,
COUNT(DISTINCT span_spanId) AS total_trace_spans,
MAX(span_durationNano) / 1000000 AS root_duration_ms
FROM
traces_all
GROUP BY
trace_id
ORDER BY
root_duration_ms DESC
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment