Skip to content

Instantly share code, notes, and snippets.

@LucasMagnum
Created March 29, 2019 08:43
Show Gist options
  • Save LucasMagnum/06c15640edc4769c2007923496b86123 to your computer and use it in GitHub Desktop.
Save LucasMagnum/06c15640edc4769c2007923496b86123 to your computer and use it in GitHub Desktop.
Pipeline Costs
import datetime
from collections import defaultdict
from googleapiclient.discovery import build
from prettytable import PrettyTable
service = build("dataflow", "v1b3")
def get_costs(project_id, cpu_batch_rate, cpu_streaming_rate, memory_rate, pd_rate=0, ssd_rate=0):
"""
Calculating rates:
- Get the costs divided by the usage
"""
metrics = get_metrics(project_id)
print("Batch pipelines")
batch = PrettyTable()
batch.field_names = ["Name", "CPU Cost", "Memory Cost", "Total Cost"]
batch_pipelines = []
for metric_name, metric in metrics["batch"].items():
batch_pipelines.append([
metric_name,
round(metric["TotalVcpuTime"] * cpu_batch_rate, 2),
round(metric["TotalMemoryUsage"] * memory_rate, 2),
round(metric["TotalVcpuTime"] * cpu_batch_rate, 2) + round(metric["TotalMemoryUsage"] * memory_rate, 2)
])
map(batch.add_row, sorted(batch_pipelines, key=lambda x: x[-1], reverse=True))
print(batch)
print("Streaming pipelines")
streaming = PrettyTable()
streaming.field_names = ["Name", "CPU Cost", "Memory Cost", "Total Cost"]
streaming_pipelines = []
for metric_name, metric in metrics["streaming"].items():
streaming_pipelines.append([
metric_name,
round(metric["TotalVcpuTime"] * cpu_batch_rate, 2),
round(metric["TotalMemoryUsage"] * memory_rate, 2),
round(metric["TotalVcpuTime"] * cpu_batch_rate, 2) + round(metric["TotalMemoryUsage"] * memory_rate, 2)
])
map(streaming.add_row, sorted(streaming_pipelines, key=lambda x: x[-1], reverse=True))
print(streaming)
def get_metrics(project_id):
batch, streaming = get_jobs(project_id)
metrics_general = {}
for job_type, jobs in [("batch", batch), ("streaming", streaming)]:
metrics = defaultdict(dict)
for job_name, execution_details in jobs.items():
for execution in execution_details:
metrics_data = service.projects().locations().jobs().getMetrics(
projectId=project_id,
jobId=execution["id"],
location=execution["location"]
).execute()
number_of_days = (
1 if execution["type"] == "JOB_TYPE_BATCH" else
(datetime.datetime.now() - datetime.datetime.strptime(execution["createTime"], "%Y-%m-%dT%H:%M:%S.%fZ")).days
)
for metric in metrics_data["metrics"]:
if metric["name"]["context"].get("tentative"):
continue
if metric["name"]["name"] in ("TotalVcpuTime", "TotalMemoryUsage", "TotalPdUsage", "TotalSsdUsage"):
# We pay by hour, so it's easier to convert it here to get a better overview
metric_value = round(metric["scalar"] / (60.0 * 60.0), 3)
# Memory is measure in MB and we pay for GB
if metric["name"]["name"] == "TotalMemoryUsage":
metric_value = metric_value / 1024
metric_in_hours = metric_value / (number_of_days if number_of_days > 0 else 1)
metrics[job_name][metric["name"]["name"]] = metrics[job_name].get(metric["name"]["name"], 0) + metric_in_hours
metrics_general[job_type] = metrics
return metrics_general
def get_jobs(project_id):
"""
Return a list {
"job_name": [{
u'createTime': u'2019-02-11T12:12:02.051283Z',
u'currentState': u'JOB_STATE_RUNNING',
u'currentStateTime': u'2019-02-11T12:12:09.291512Z',
u'id': u'2019-02-11_04_12_00-2569853007502191210',
u'jobMetadata': {u'sdkVersion': {u'sdkSupportStatus': u'STALE',
u'version': u'2.8.0',
u'versionDisplayName': u'Apache Beam SDK for Java'}},
u'location': u'us-central1',
u'name': u'job-name',
u'projectId': u'projectID',
u'startTime': u'2019-02-11T12:12:02.051283Z',
u'type': u'JOB_TYPE_STREAMING'
}]
}
"""
locations = ["europe-west1", "us-central1"]
jobs_list = []
for location in locations:
jobs_list.extend(service.projects().locations().jobs().list(projectId=project_id, location=location).execute()["jobs"])
streaming = defaultdict(list)
batch = defaultdict(list)
for job in jobs_list:
# Ignore old jobs for now
create_time = datetime.datetime.strptime(job["createTime"], "%Y-%m-%dT%H:%M:%S.%fZ")
if job["type"] == "JOB_TYPE_BATCH" and (datetime.date.today() - create_time.date()).days > 4:
continue
# Ignore cancelled jobs
if job["currentState"] == "JOB_STATE_CANCELLED":
continue
if job["type"] == "JOB_TYPE_BATCH":
batch[job["name"]].append(job)
else:
streaming[job["name"]].append(job)
return batch, streaming
if __name__ == "__main__":
project_id = "projectId"
cpu_batch_rate = (997.18/19311.48)
cpu_streaming_rate = (1111.03/17631.54)
memory_rate = (683.84/187286.09)
get_costs(project_id, cpu_batch_rate, cpu_streaming_rate, memory_rate)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment