Created
March 29, 2019 08:43
-
-
Save LucasMagnum/06c15640edc4769c2007923496b86123 to your computer and use it in GitHub Desktop.
Pipeline Costs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
from collections import defaultdict | |
from googleapiclient.discovery import build | |
from prettytable import PrettyTable | |
service = build("dataflow", "v1b3") | |
def get_costs(project_id, cpu_batch_rate, cpu_streaming_rate, memory_rate, pd_rate=0, ssd_rate=0): | |
""" | |
Calculating rates: | |
- Get the costs divided by the usage | |
""" | |
metrics = get_metrics(project_id) | |
print("Batch pipelines") | |
batch = PrettyTable() | |
batch.field_names = ["Name", "CPU Cost", "Memory Cost", "Total Cost"] | |
batch_pipelines = [] | |
for metric_name, metric in metrics["batch"].items(): | |
batch_pipelines.append([ | |
metric_name, | |
round(metric["TotalVcpuTime"] * cpu_batch_rate, 2), | |
round(metric["TotalMemoryUsage"] * memory_rate, 2), | |
round(metric["TotalVcpuTime"] * cpu_batch_rate, 2) + round(metric["TotalMemoryUsage"] * memory_rate, 2) | |
]) | |
map(batch.add_row, sorted(batch_pipelines, key=lambda x: x[-1], reverse=True)) | |
print(batch) | |
print("Streaming pipelines") | |
streaming = PrettyTable() | |
streaming.field_names = ["Name", "CPU Cost", "Memory Cost", "Total Cost"] | |
streaming_pipelines = [] | |
for metric_name, metric in metrics["streaming"].items(): | |
streaming_pipelines.append([ | |
metric_name, | |
round(metric["TotalVcpuTime"] * cpu_batch_rate, 2), | |
round(metric["TotalMemoryUsage"] * memory_rate, 2), | |
round(metric["TotalVcpuTime"] * cpu_batch_rate, 2) + round(metric["TotalMemoryUsage"] * memory_rate, 2) | |
]) | |
map(streaming.add_row, sorted(streaming_pipelines, key=lambda x: x[-1], reverse=True)) | |
print(streaming) | |
def get_metrics(project_id): | |
batch, streaming = get_jobs(project_id) | |
metrics_general = {} | |
for job_type, jobs in [("batch", batch), ("streaming", streaming)]: | |
metrics = defaultdict(dict) | |
for job_name, execution_details in jobs.items(): | |
for execution in execution_details: | |
metrics_data = service.projects().locations().jobs().getMetrics( | |
projectId=project_id, | |
jobId=execution["id"], | |
location=execution["location"] | |
).execute() | |
number_of_days = ( | |
1 if execution["type"] == "JOB_TYPE_BATCH" else | |
(datetime.datetime.now() - datetime.datetime.strptime(execution["createTime"], "%Y-%m-%dT%H:%M:%S.%fZ")).days | |
) | |
for metric in metrics_data["metrics"]: | |
if metric["name"]["context"].get("tentative"): | |
continue | |
if metric["name"]["name"] in ("TotalVcpuTime", "TotalMemoryUsage", "TotalPdUsage", "TotalSsdUsage"): | |
# We pay by hour, so it's easier to convert it here to get a better overview | |
metric_value = round(metric["scalar"] / (60.0 * 60.0), 3) | |
# Memory is measure in MB and we pay for GB | |
if metric["name"]["name"] == "TotalMemoryUsage": | |
metric_value = metric_value / 1024 | |
metric_in_hours = metric_value / (number_of_days if number_of_days > 0 else 1) | |
metrics[job_name][metric["name"]["name"]] = metrics[job_name].get(metric["name"]["name"], 0) + metric_in_hours | |
metrics_general[job_type] = metrics | |
return metrics_general | |
def get_jobs(project_id): | |
""" | |
Return a list { | |
"job_name": [{ | |
u'createTime': u'2019-02-11T12:12:02.051283Z', | |
u'currentState': u'JOB_STATE_RUNNING', | |
u'currentStateTime': u'2019-02-11T12:12:09.291512Z', | |
u'id': u'2019-02-11_04_12_00-2569853007502191210', | |
u'jobMetadata': {u'sdkVersion': {u'sdkSupportStatus': u'STALE', | |
u'version': u'2.8.0', | |
u'versionDisplayName': u'Apache Beam SDK for Java'}}, | |
u'location': u'us-central1', | |
u'name': u'job-name', | |
u'projectId': u'projectID', | |
u'startTime': u'2019-02-11T12:12:02.051283Z', | |
u'type': u'JOB_TYPE_STREAMING' | |
}] | |
} | |
""" | |
locations = ["europe-west1", "us-central1"] | |
jobs_list = [] | |
for location in locations: | |
jobs_list.extend(service.projects().locations().jobs().list(projectId=project_id, location=location).execute()["jobs"]) | |
streaming = defaultdict(list) | |
batch = defaultdict(list) | |
for job in jobs_list: | |
# Ignore old jobs for now | |
create_time = datetime.datetime.strptime(job["createTime"], "%Y-%m-%dT%H:%M:%S.%fZ") | |
if job["type"] == "JOB_TYPE_BATCH" and (datetime.date.today() - create_time.date()).days > 4: | |
continue | |
# Ignore cancelled jobs | |
if job["currentState"] == "JOB_STATE_CANCELLED": | |
continue | |
if job["type"] == "JOB_TYPE_BATCH": | |
batch[job["name"]].append(job) | |
else: | |
streaming[job["name"]].append(job) | |
return batch, streaming | |
if __name__ == "__main__": | |
project_id = "projectId" | |
cpu_batch_rate = (997.18/19311.48) | |
cpu_streaming_rate = (1111.03/17631.54) | |
memory_rate = (683.84/187286.09) | |
get_costs(project_id, cpu_batch_rate, cpu_streaming_rate, memory_rate) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment