Skip to content

Instantly share code, notes, and snippets.

@tomsail
Created September 22, 2023 07:45
Show Gist options
  • Save tomsail/09a82bf7d7fa617066030faa4e805df4 to your computer and use it in GitHub Desktop.
Save tomsail/09a82bf7d7fa617066030faa4e805df4 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# GLOSS DATABASE DETIDE, ANALYSIS AND LIVE DE-TIDE FOR OPERATIONAL SURGE MODEL\n",
"## Notebook 2/4\n",
"\n",
"This Notebook is the second part: \n",
" * clean the signal and retain a simple segment of data for the analysis \n",
" * analyse the clean segments using [utide](https://github.com/wesleybowman/UTide) and extract the tidal constituents in json for every sensor of every station\n",
" \n",
"This Notebook uses a simple function for yearly tide analysis, that is part of `analysea`.\n",
"To install analysea, simply do: \n",
"\n",
" pip install git+https://github.com/tomsail/analysea\n",
" \n",
"If you want to be able to run all the Notebooks, run the foollowing to get all the libraries : \n",
"\n",
" mamba create -n seadev python xarray pandas geopandas cartopy pyarrow fastparquet numpy ipykernel git owslib utide scikit-image ruptures ipympl holoviews bokeh jupyter_bokeh jenkspy datashader searvey pyfes netCDF4 -c fbriol\n",
" mamba activate seadev\n",
" pip install git+https://github.com/tomsail/analysea\n",
" \n",
"To download the stations contained in the data folder, you need to run the [previous Notebook](https://gist.github.com/tomsail/6de6b83a95d425ab62ba7e4cc6d293aa)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from __future__ import annotations\n",
"\n",
"import functools\n",
"import io\n",
"import itertools\n",
"import operator\n",
"import sys\n",
"import warnings\n",
"\n",
"import numpy as np\n",
"import matplotlib as mpl\n",
"import matplotlib.pyplot as plt\n",
"import pandas as pd\n",
"import sklearn.cluster as sk_cluster\n",
"from searvey import ioc\n",
"\n",
"from analysea.tide import yearly_tide_analysis, demean_amps_phases\n",
"from analysea.plot import plot_multiyear_tide_analysis\n",
"\n",
"import json\n",
"import pyfes\n",
"import os"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## loading in the IOC dataframe that covers the gloss stations "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from searvey import ioc \n",
"IOC_STATIONS = ioc.get_ioc_stations()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## loading in the gloss dataframe saved from the previous Notebook"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"gloss_ioc = pd.read_csv('gloss_ioc.csv',header=0,index_col=0)\n",
"gloss_ioc"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Split timeseries to segments\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def detect_splits(sr: pd.Series, max_gap: pd.Timedelta) -> pd.DatetimeIndex:\n",
" split_points = pd.DatetimeIndex([sr.index[0], sr.index[-1]])\n",
" condition = (sr.index.to_series().diff() > max_gap)\n",
" for i, point in enumerate(sr[condition].index, 1):\n",
" split_points = split_points.insert(i, point)\n",
" return split_points\n",
"\n",
"def split_series(sr: pd.Series, max_gap: pd.Timedelta = pd.Timedelta(hours=24)) -> pd.Series:\n",
" for (start, stop) in itertools.pairwise(detect_splits(sr=sr, max_gap=max_gap)):\n",
" segment = sr[start:stop]\n",
" yield segment[:-1]\n",
" \n",
"def calc_stats(segments: list[pd.Series]) -> pd.DataFrame:\n",
" data = [] \n",
" for i, segment in enumerate(segments):\n",
" ss = dict(\n",
" start=segment.index[0],\n",
" end=segment.index[-1],\n",
" duration=segment.index[-1] - segment.index[0],\n",
" scount=segment.count(),\n",
" smean=segment.mean(),\n",
" sstd=segment.std(),\n",
" smin=segment.min(),\n",
" s01=segment.quantile(0.01),\n",
" s10=segment.quantile(0.10),\n",
" s25=segment.quantile(0.25),\n",
" s50=segment.quantile(0.50),\n",
" s75=segment.quantile(0.75),\n",
" s90=segment.quantile(0.90),\n",
" s99=segment.quantile(0.99),\n",
" smax=segment.max(),\n",
" sskewness=segment.skew(),\n",
" skurtosis=segment.kurtosis(),\n",
" )\n",
" data.append(ss)\n",
" stats = pd.DataFrame(data)\n",
" return stats"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Outlier removal\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# https://stackoverflow.com/questions/23199796/detect-and-exclude-outliers-in-a-pandas-dataframe\n",
"def remove_outliers(sr: pd.Series, lower: float = 0.25, upper: float = 0.75) -> pd.Series:\n",
" iqr = sr.quantile(upper) - sr.quantile(lower)\n",
" lim = np.abs((sr - sr.median()) / iqr) < 2.22\n",
" return sr[lim]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## cleaning data using multiprocessing"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from searvey.multi import multithread\n",
"from searvey.multi import multiprocess\n",
"import itertools\n",
"\n",
"def generate_stations(stations: pd.DataFrame) -> list[str]:\n",
" codes = []\n",
" for station in stations.ioc_code.unique():\n",
" codes.append(dict(station=station))\n",
" return codes\n",
"\n",
"def clean_one_ioc(station: str):\n",
" # read station data\n",
" s = pd.read_parquet(f\"data/gloss/{station}.parquet\")\n",
" # remove flat areas \n",
" s1 = s[s.diff() != 0]\n",
" # remove steps\n",
" # s1 = remove_outliers(s0)\n",
" ss = s1[abs(s1.diff()) < s1.std()]\n",
" # \n",
" df = pd.DataFrame()\n",
" for sensor in ss.columns: \n",
" sr = ss[sensor]\n",
" max_gap = pd.Timedelta(hours=6)\n",
" segments = list(split_series(sr, max_gap=max_gap))\n",
" # suppress empty segments\n",
" for i in range(len(segments)-1, 0, -1):\n",
" if segments[i].empty:\n",
" del segments[i]\n",
" # \n",
" stats = calc_stats(segments)\n",
" # keep the longest segment\n",
" iseg = np.argmax(stats.duration)\n",
" seg = segments[iseg]\n",
" # remove outliers\n",
" seg = remove_outliers(seg,0.1,0.9)\n",
" if stats.iloc[iseg].duration > pd.Timedelta(days=365):\n",
" d = pd.DataFrame({sensor:seg}, index=seg.index)\n",
" if df.empty:\n",
" df = d\n",
" else : \n",
" df = pd.concat([df,d])\n",
" if len(df)>0:\n",
" df.to_parquet(f\"data/clean/{station}.parquet\")\n",
" del df\n",
" del d \n",
"\n",
" \n",
"def clean_ioc(stations: pd.DataFrame) -> None:\n",
" stations_codes = generate_stations(stations)\n",
" multiprocess(\n",
" clean_one_ioc,\n",
" stations_codes,\n",
" n_workers=20, ##!!CAREFUL!! here adapt the numper of procs to your machine !\n",
" disable_progress_bar=False,\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"clean_ioc(gloss_ioc)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ASTRO = [\"2N2\",\"EPS2\",\"J1\",\"K1\",\"K2\",\"L2\",\"M2\",\"M3\",\"M4\",\"M6\",\"M8\",\"Mf\",\"MKS2\",\n",
" \"Mm\",\"MN4\",\"MS4\",\"MSf\",\"Mu2\",\"N2\",\"N4\",\"Nu2\",\"O1\",\"P1\",\"Q1\",\n",
" \"R2\",\"S1\",\"S2\",\"S4\",\"Sa\",\"Ssa\"] #Using only the constiuents in FES2014 for comparison with the FES dataset, except \"La2\",\"MSqm\",\"Mtm\",\n",
"\n",
"ASTRO_CAP = [ast.upper() for ast in ASTRO]\n",
"ASTRO_LOW = [ast.lower() for ast in ASTRO]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"OPTS = {\n",
" \"conf_int\": \"linear\",\n",
" \"constit\": ASTRO_CAP,\n",
" \"method\": \"ols\", # ols is faster and good for missing data (Ponchaut et al., 2001)\n",
" \"order_constit\": \"frequency\",\n",
" \"Rayleigh_min\": 0.97,\n",
" \"lat\": None,\n",
" \"verbose\": False,\n",
"} # careful if there is only one Nan parameter, the analysis crashes"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# create copy of the IOC_STATIONS dataframe\n",
"IOC_OUT = pd.DataFrame(IOC_STATIONS)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tide analysis \n",
"We will proceed to the tide on each clean signal \n",
"If each analysis proceeded without problems, the information from the utide will be saved in the JSON format in the following format: \n",
"\n",
" <station>_<sensor>.json\n",
"\n",
"In order to saved at the json format, we'd need to format all the values of dictionary return by utide"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def json_format(d: Dict[Any, Any]) -> Dict[Any, Any]:\n",
" for key, value in d.items():\n",
" if isinstance(value, dict):\n",
" json_format(value) # Recurse into nested dictionaries\n",
" elif isinstance(value, np.ndarray):\n",
" d[key] = value.tolist() # Convert NumPy array to list\n",
" elif isinstance(value, pd.Timestamp):\n",
" d[key] = value.strftime(\"%Y-%m-%d %H:%M:%S\") # Convert pandas Timestamp to string\n",
" elif isinstance(value, pd.Timedelta):\n",
" d[key] = str(value) # Convert pandas Timedelta to string\n",
" return d"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"class StationInfo: \n",
" def __init__(self,lat,lon,id,df): \n",
" self.lat = lat\n",
" self.lon = lon\n",
" self.first_obs = pd.Timestamp(df.index[0])\n",
" self.last_obs = pd.Timestamp(df.index[-1])\n",
" self.span = (self.last_obs - self.first_obs).days\n",
" self.analysis = ''\n",
" self.id = id\n",
" self.sensor = ''\n",
" self.analysed = False\n",
" self.missing = df.isna().sum()/len(df)\n",
" def end_analysis(self,outJson): \n",
" with open(outJson,'w') as f: \n",
" dout = json_format(self.__dict__)\n",
" json.dump(dout,f)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The 2 following functions also come from `analysea` :"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_const_amps_labels(keep, tide):\n",
" \"\"\"\n",
" recognise & plot only the main constituents\n",
" \"\"\"\n",
" ix = []\n",
" for c in keep:\n",
" if c in tide[\"name\"].tolist():\n",
" i = tide[\"name\"].tolist().index(c)\n",
" ix.append(i)\n",
" amps = np.append(tide[\"A\"][np.sort(ix)], np.zeros(len(keep) - len(ix)))\n",
" const = np.append(tide[\"name\"][np.sort(ix)], np.empty(len(keep) - len(ix)))\n",
" phases = np.append(tide[\"g\"][np.sort(ix)], np.empty(len(keep) - len(ix)))\n",
" return amps, const, phases\n",
"\n",
"def demean_amps_phases(tides, keep_const):\n",
" amps = np.zeros((len(keep_const), len(tides)))\n",
" phases = np.zeros((len(keep_const), len(tides)))\n",
" vect = np.ones(len(tides)) / len(tides)\n",
" #\n",
" for iyear, tide in enumerate(tides):\n",
" _amps, const, _phases = get_const_amps_labels(keep_const, tide)\n",
" amps[:, iyear] = _amps\n",
" phases[:, iyear] = _phases\n",
" #\n",
" mean_amps = np.dot(amps, vect)\n",
" mean_phases = np.dot(phases, vect)\n",
" #\n",
" return mean_amps, mean_phases"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def generate_files(repo: str) -> list[str]:\n",
" files = []\n",
" for file in os.listdir(repo):\n",
" if file.endswith('.parquet'):\n",
" files.append(dict(file=file))\n",
" return files\n",
"\n",
"def analyse_one_ioc(file:str):\n",
" ioc_code = file.split('.parquet')[0]\n",
" _ioc_index = np.where(IOC_STATIONS.ioc_code == ioc_code)[0][0]\n",
" #\n",
" lat = IOC_STATIONS.iloc[_ioc_index].lat\n",
" lon = IOC_STATIONS.iloc[_ioc_index].lon\n",
" OPTS['lat'] = lat\n",
" df = pd.read_parquet('data/clean/'+file)\n",
" for sensor in df.columns:\n",
" sr = df[sensor]\n",
" station = StationInfo(lat,lon,ioc_code,sr)\n",
" station.sensor = sensor\n",
" if (station.missing < 0.3) and (station.span>365):\n",
" filenameOut = 'data/analysed/'+ ioc_code + \"_\" + sensor + \".png\"\n",
" df = df.rename(columns={sensor:'anomaly'})\n",
" # title = IOC_STATIONS.iloc[_ioc_index].location + \" - Sensor \" + sensor\n",
" # \n",
" srt, srs, coefs, years = yearly_tide_analysis(sr,365, OPTS)\n",
" if len(coefs)>0:\n",
" mean_amps, mean_phases = demean_amps_phases(coefs, coefs[0][\"name\"])\n",
" station.coef = json_format(coefs[-1])\n",
" station.coef[\"weights\"] = 0 # weights list is too long and unused in the reconstruction\n",
" station.coef[\"A\"] = mean_amps.tolist()\n",
" station.coef[\"g\"] = mean_phases.tolist()\n",
" station.analysis = \"Success!\"\n",
" station.analysed = True\n",
" station.end_analysis('data/analysed/'+ioc_code+\"_\"+sensor+'.json')\n",
" elif station.span < 365: \n",
" station.analysis = \"Failed: not enough days in record\"\n",
" station.end_analysis('data/analysed/'+ioc_code+\"_\"+sensor+'.json')\n",
" elif station.missing >= 0.3:\n",
" station.analysis = \"Failed: more than 30% missing data for the analysis\"\n",
" station.end_analysis('data/analysed/'+ioc_code+\"_\"+sensor+'.json')\n",
"\n",
"def analyse_ioc(repo): \n",
" stations_codes = generate_files(repo)\n",
" multiprocess(\n",
" analyse_one_ioc,\n",
" stations_codes,\n",
" n_workers=20, ##!!CAREFUL!! here adapt the numper of procs to your machine !\n",
" disable_progress_bar=False,\n",
" )"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"analyse_ioc('data/clean')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The quality of the \"de-tide\" analysis will be inspected in the next Notebook"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "seadev",
"language": "python",
"name": "seadev"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment