Last active
January 9, 2023 22:08
-
-
Save MaxDragonheart/46445a150aac9d528dadd2ec877203a5 to your computer and use it in GitHub Desktop.
Personal solution for issue https://github.com/geopandas/geopandas/issues/1035
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pathlib import PosixPath, Path | |
import fiona | |
from typing import Union | |
import pandas as pd | |
import geopandas as gpd | |
def find_duplicates(input_df: pd.core.frame.DataFrame, column_id: Union[str, int]) -> dict: | |
"""Make a dictionary with list of `column_id` single | |
and duplicates. | |
Args: | |
input_df: pandas.core.frame.DataFrame | |
column_id: Union[str, int] | |
Returns: | |
dict | |
""" | |
# Find single elements and make a list | |
single_data = input_df.drop_duplicates(subset=[column_id], keep=False) | |
id_single_list = single_data[column_id].to_list() | |
# Make a list of duplicates | |
duplicates_data = input_df[~input_df[column_id].isin(id_single_list)].drop_duplicates( | |
subset=[column_id], | |
keep='first' | |
) | |
id_duplicates_list = duplicates_data[column_id].to_list() | |
return { | |
'single': id_single_list, | |
'duplicates': id_duplicates_list | |
} | |
def read_geodata(file_path: Union[str, PosixPath, Path], layer: str = None) -> gpd.geodataframe.GeoDataFrame: | |
"""Read geodata and return Geodataframe | |
Args: | |
file_path: Union[str, PosixPath, Path] | |
layer: str | |
Returns: | |
geopandas.geodataframe.GeoDataFrame | |
""" | |
file_path = Path(file_path) | |
file_format = file_path.suffix | |
if file_format == '.gpkg': | |
read_file = _read_geopackage(input_data=file_path, layer=layer) | |
else: | |
read_file = gpd.read_file(file_path, layer=layer) | |
# Check if `id` column already exists. This column must be the | |
# column that contains unique values and not nan or NULL values. | |
if 'id' in read_file.columns: | |
# Check unique values and nan or NULL values | |
read_file['id'] = read_file['id'].fillna(0) | |
read_file = read_file.astype({'id': 'int64'}) | |
unique_value = len(find_duplicates(input_df=read_file, column_id='id')['duplicates']) | |
if unique_value == 0: | |
if 'fid' in read_file.columns: | |
read_file.drop(columns={'fid'}, inplace=True) | |
else: | |
raise Exception("The id column must contain unique values.") | |
else: | |
# Check if `fid` column exist and rename it as 'id': | |
if 'fid' in read_file.columns: | |
read_file.rename(columns={'fid': 'id'}, inplace=True) | |
else: | |
read_file.reset_index(inplace=True) | |
read_file.rename(columns={'index': 'id'}, inplace=True) | |
read_file.sort_values(by='id', inplace=True) | |
return read_file | |
def _read_geopackage(input_data: Union[PosixPath, Path], layer: str) -> gpd.geodataframe.GeoDataFrame: | |
"""Read Geopackage | |
Args: | |
input_data: str | |
layer: str | |
Returns: | |
gpd.geodataframe.GeoDataFrame | |
""" | |
with fiona.open(input_data, layer=layer, mode='r') as src: | |
crs = src.crs['init'] | |
# Make DataFrame | |
data = [] | |
for feature in src: | |
feature_id = int(feature['id']) | |
geodata = gpd.GeoDataFrame.from_features([feature], crs=crs) | |
geodata['id'] = feature_id | |
data.append(geodata) | |
df = pd.concat(data) | |
# Make Geodataframe | |
gdf = gpd.GeoDataFrame(df, crs=crs) | |
return gdf |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment