Skip to content

Instantly share code, notes, and snippets.

@transfluxus
Last active March 28, 2023 10:23
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save transfluxus/a2bef148adbd1dca260b1aaf9dc03a67 to your computer and use it in GitHub Desktop.
Save transfluxus/a2bef148adbd1dca260b1aaf9dc03a67 to your computer and use it in GitHub Desktop.
LCA-ana
import json
from pathlib import Path
from typing import Optional, Union
import numpy as np
import pandas as pd
from frictionless import extract
import matplotlib.pyplot as plt
class Experiment:
def __init__(self, experiment_file: str):
self.file_path = Path(experiment_file)
self.experiment_path = self.file_path.parent.joinpath(self.file_path.stem)
if not self.file_path.exists():
raise ValueError(f"File {self.file_path} does not exist")
self.setup_folder()
def setup_folder(self):
self.experiment_path.mkdir(exist_ok=True)
self.experiment_path.joinpath("results").mkdir(exist_ok=True)
self.experiment_path.joinpath("results/plots").mkdir(exist_ok=True)
self.experiment_path.joinpath("results/simple_trees").mkdir(exist_ok=True)
self.experiment_path.joinpath("split-scenario").mkdir(exist_ok=True)
self.experiment_path.joinpath("split-scenario-indicator").mkdir(exist_ok=True)
def select_file(self, scenario: str, indicator: Optional[str]) -> str:
if indicator:
path = self.experiment_path.joinpath(f"split-scenario-indicator/{scenario}_{indicator}.csv")
else:
path = self.experiment_path.joinpath(f"split-scenario/{scenario}.csv")
if not path.exists():
raise ValueError(f"File {path} does not exist")
return path.as_posix()
def get_data(self, scenario: str, indicator: Optional[str]) -> list[dict[str, any]]:
data = extract(self.select_file(scenario, indicator))
key0 = list(data.keys())[0]
return data[key0]
def split_experiment(self):
data: dict[str, any] = extract(self.file_path.as_posix())[self.file_path.stem]
# turn data into a pandas dataframe
complete_df = pd.DataFrame(data)
# group by scenario
scenarios = complete_df.groupby("Scenario")
# get the name of each group
scenarios.groups.keys()
# write each group into its own csv file with the name of the group
for scenario, group in scenarios:
group.to_csv(self.experiment_path.joinpath(f"split-scenario/f{scenario}.csv").as_posix(),
index=False)
# split each group further by the "Indicator" column
indicators = group.groupby("Indicator")
# write each indicator group into a file in "split-scenario-indicator" folder
for indicator_name, indicator_group in indicators:
indicator_group.to_csv(self.experiment_path.joinpath(f"split-scenario-indicator/"
f"{scenario}_{indicator_name}.csv",
).as_posix(), index=False)
# get all scenarios and write them into a file
scenarios = complete_df["Scenario"].unique()
with self.experiment_path.joinpath("scenarios.json").open("w") as f:
json.dump(scenarios.tolist(), f, indent=2)
# get all indicators and write them into a file
indicators = complete_df["Indicator"].unique()
with self.experiment_path.joinpath("indicators.json").open("w") as f:
json.dump(indicators.tolist(), f, indent=2)
processors = complete_df["Processor"].unique()
with self.experiment_path.joinpath("processor.json").open("w") as f:
json.dump([p.split(".") for p in processors.tolist()], f, indent=2)
def get_scenario_indicator_data(self,
scenario: str,
indicator: str,
processor_name: Union[str, tuple, list]) -> pd.DataFrame:
data = self.get_data(scenario, indicator)
complete_df = pd.DataFrame(data)
# split the df Processor column by "." and add the result into columns named
# "level_x" where x is the level starting from 0.
complete_df = complete_df.join(complete_df["Processor"].str.split(".", expand=True).add_prefix("level_"))
# split the processor_name into a tuple by "."
if isinstance(processor_name, str):
processor_name = tuple(processor_name.split("."))
# filter the df by the processor_name
df = complete_df.copy()
for i, name in enumerate(processor_name):
df = df[df[f"level_{i}"] == name]
# get the rows of the next level bytes filtering "Dendrogram level" column
df = df[df["Dendrogram level"] == len(processor_name)]
# sum up the values the same level as the processor_name
# others = complete_df[complete_df["Dendrogram level"] == len(processor_name) - 2]
# we don't need that. just get the total sum at the top level
total_value = float(complete_df[complete_df["Dendrogram level"] == 0]["Value"])
# print(total_value)
# convert values in Value to float
df["Value"] = df["Value"].astype(float)
# throw out all columns but "level_x" and "Value"
df = df[[f"level_{len(processor_name)}", "Value"]]
# rename the "level_x" column to "Technology"
df = df.rename(columns={f"level_{len(processor_name)}": "Technology"})
# all a column relative to the total value
df["Relative"] = df["Value"] / total_value
return df
def plot_impacts(self,
scenarios: Union[str, list[str]],
indicator: str,
processor_name: Union[str, tuple, list]) -> pd.DataFrame:
scenarios_data = {
scenario: self.get_scenario_indicator_data(scenario, indicator, processor_name)
for scenario in (scenarios if isinstance(scenarios, list) else [scenarios])
}
technologies = list(list(scenarios_data.values())[0]["Technology"])
#print(technologies)
# weights = {
# np.array(sc[sc["Technology" == tech]]["Values"] for sc in scenarios_data.values())
# for tech in technologies
# }
for tech in technologies:
print(tech)
for scenario in scenarios_data.values():
# print(scenario)
v = list(scenario[scenario["Technology"] == tech]["Value"])[0]
print(tech, v)
return
# # create a stacked bar
# # absolute numbers
# fig, ax = plt.subplots()
# # for each row of the df
# bottom = 0
# for i, row in df.iterrows():
# value = row["Value"]
# technology = row["Technology"]
# ax.bar(f"{scenario}\nf{indicator}", value, label=technology, bottom=bottom)
# bottom += value
# ax.legend(loc="upper right")
# fig.savefig(self.experiment_path.joinpath(f"results/plots/{scenario}_{indicator}.png"))
#
# # relative numbers
# fig, ax = plt.subplots()
# # for each row of the df
# bottom = 0
# for i, row in df.iterrows():
# # get the value of the row
# value = row["Relative"]
# technology = row["Technology"]
# ax.bar(f"{scenario}\nf{indicator}\nREL", value, label=technology, bottom=bottom)
# bottom += value
# ax.legend(loc="upper right")
#
# plt.show()
# fig.savefig(self.experiment_path.joinpath(f"results/plots/{scenario}_{indicator}-REL.png"))
return scenarios_data
def build_simple_tree(self, scenario: str, indicator: str, save: bool = True) -> dict[str, any]:
data = self.get_data(scenario, indicator)
tree = {}
for row in data:
# get the value in column called "Processor"
proc = row["Processor"]
proc_tuple = tuple(proc.split("."))
if len(proc_tuple) == 1:
tree[proc] = {"name": proc, "children": {}, "value": float(row["Value"])}
else:
# find all parent nodes starting from the root
parent = tree
for i in range(len(proc_tuple) - 1):
parent = parent[proc_tuple[i]]["children"]
parent[proc_tuple[-1]] = {"name": proc, "children": {}, "value": float(row["Value"])}
json.dump(tree, open(self.experiment_path.joinpath(f"results/simple_trees/{scenario}_{indicator}.json"), "w"),
indent=2, ensure_ascii=False)
return tree
exp = Experiment("../data/results_filtered.csv")
# exp.split_experiment()
# exp.build_simple_tree("installed_2020", "ionising_radiation_ionising_radiation_potential__IRP_")
df = exp.plot_impacts("installed_2020", "ionising_radiation_ionising_radiation_potential__IRP_",
[
"electricity_production",
"renewables", "others_renew"])
# print(df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment