""""""
"""
This file is part of WaveBreaking.
WaveBreaking provides indices to detect, classify
and track Rossby Wave Breaking (RWB) in climate and weather data.
The tool was developed during my master thesis at the University of Bern.
Link to thesis: https://occrdata.unibe.ch/students/theses/msc/406.pdf
---
Streamer calculation
"""
__author__ = "Severin Kaderli"
__license__ = "MIT"
__email__ = "severin.kaderli@unibe.ch"
# import modules
import numpy as np
import xarray as xr
import pandas as pd
import geopandas as gpd
from tqdm import tqdm
import itertools as itertools
from shapely.geometry import LineString, Polygon
from sklearn.metrics import DistanceMetric
dist = DistanceMetric.get_metric("haversine")
from wavebreaking.utils.index_utils import (
calculate_properties,
transform_polygons,
combine_shared,
)
from wavebreaking.utils.data_utils import get_dimension_attributes, check_argument_types
from wavebreaking.indices.contour_index import decorator_contour_calculation
[docs]@check_argument_types(["data"], [xr.DataArray])
@get_dimension_attributes("data")
@decorator_contour_calculation
def calculate_streamers(
data,
contour_level,
contours=None,
geo_dis=800,
cont_dis=1500,
intensity=None,
periodic_add=120,
*args,
**kwargs
):
"""
Identify streamer structures based on the Streamer Index
developed by Wernli and Sprenger (2007).
The default parameters for the streamer identification
are based on the study by Wernli and Sprenger (2007).
Dimension names ("time_name", "lon_name", "lat_name"), size ("ntime", "nlon", "nlat")
and resolution ("dlon", "dlat") can be passed as key=value argument.
Before the index calculation, the contour lines are calculated if not provided.
Parameters
----------
data : xarray.DataArray
data for the contour and streamer calculation
contour_levels : array_like
levels for contour calculation
contours : geopandas.GeoDataFrame, optional
contours calculated with wavebreaking.calculate_contours(...,
original_coordinates=False)
geo_dis : int or float, optional
Maximal geographic distance between two contour points that describe a streamer
cont_dis : int or float, optional
Minimal distance along the contour line between two points that describe a streamer
intensity : xarray.DataArray, optional
data for the intensity calculation (hint: use wb_spatial.calculate_momentum_flux)
periodic_add: int or float, optional
number of longitudes in degrees to expand the dataset
to correctly capture undulations at the date border
if the input field is not periodic, use periodic_add = 0
Returns
-------
streamers: geopandas.GeoDataFrame:
GeoDataFrame containing different characteristics of the streamers
* "date": date of the streamers
* "level": level of the contour line
* "com": center of mass in the format (x,y)
* "mean_var": mean of the variable used for the streamer calculations
* "event_area": area of a streamer
* "intensity": sum of the intensity (momentum flux)
* "geometry": (Multi)Polygon with coordinates in the format (x,y)
"""
# filter contours from contour iteration
if contours is None:
contours = kwargs["contours"]
contours = contours[contours.exp_lon == contours.exp_lon.max()].reset_index(
drop=True
)
# loop over contours
streamers = []
for index, series in tqdm(
contours.iterrows(),
total=contours.shape[0],
desc="Calculating streamers ",
leave=True,
position=0,
):
# calculate all possible basepoints combinations
# (1) get coordinates of the contour points
contour_index = pd.DataFrame(
np.asarray(series.geometry.coords.xy).T, columns=["x", "y"]
).astype("int")
contour_coords = np.c_[
data[kwargs["lat_name"]].values[contour_index.y],
data[kwargs["lon_name"]].values[contour_index.x % kwargs["nlon"]],
]
# (2) calculate geographic distance between all contour coordinates
geo_matrix = dist.pairwise((np.radians(contour_coords))) * 6371
# (3) calculate the distance connecting all combinations of contour points
on = np.insert(np.diagonal(geo_matrix, 1), 0, 0)
on_mat = np.triu(np.tile(on, (len(on), 1)), k=1)
cont_matrix = np.cumsum(on_mat, axis=1)
# (4) get indices of all contour coordinates that fulfill both conditions
check = (geo_matrix < geo_dis) * (cont_matrix > cont_dis)
check = np.transpose(check.nonzero())
# (5) store the coordinates of the point combinations in a DataFrame
df1 = (
contour_index[["x", "y"]]
.iloc[check[:, 0]]
.reset_index()
.rename(columns={"x": "x1", "y": "y1", "index": "ind1"})
)
df2 = (
contour_index[["x", "y"]]
.iloc[check[:, 1]]
.reset_index()
.rename(columns={"x": "x2", "y": "y2", "index": "ind2"})
)
df_bp = pd.concat([df1, df2], axis=1)
# (6) drop combinations that are invalid
df_bp = df_bp.drop(df_bp[np.abs(df_bp.x1 - df_bp.x2) > 120].index)
# apply several routines that neglect invalid basepoints
def check_duplicates(df):
"""
Check if there are basepoint duplicates
due to the periodic expansion in the longitudinal direction
"""
# drop duplicates after mapping the coordinates to the original grid
temp = pd.concat(
[df.x1 % kwargs["nlon"], df.y1, df.x2 % kwargs["nlon"], df.y2], axis=1
)
temp = temp[temp.duplicated(keep=False)]
if len(temp) == 0:
check = []
else:
check = list(
np.asarray(
temp.groupby(list(temp))
.apply(lambda x: tuple(x.index))
.tolist()
)[:, 1]
)
return df.drop(check)
def check_intersections(df):
"""
Check for intersections of the basepoints with the contour line
"""
# calculate line connecting each basepoint pair
# and drop the ones that intersect with the contour line
df["dline"] = [
LineString([(row.x1, row.y1), (row.x2, row.y2)])
for index, row in df.iterrows()
]
check_intersections = [
row.dline.touches(series.geometry) for index, row in df.iterrows()
]
return df[check_intersections]
def check_overlapping(df):
"""
Check for overlapping of the contour segments each described by basepoints
"""
# calculate contour segment for each basepoint pair
# and drop the pairs that are fully covered by another pair
ranges = [range(int(r2.ind1), int(r2.ind2 + 1)) for i2, r2 in df.iterrows()]
index_combinations = list(itertools.permutations(df.index, r=2))
check_overlapping = set(
[
item[0]
for item in index_combinations
if (
ranges[item[0]][0] in ranges[item[1]]
and ranges[item[0]][-1] in ranges[item[1]]
)
]
)
return df.drop(check_overlapping)
def check_groups(df):
"""
Check if there are still several basepoints describing the same streamer
"""
# group basepoints that describe the same streamer
index_combinations = np.asarray(
list(itertools.combinations_with_replacement(df.index, r=2))
)
check_crossing = [
df.iloc[item[0]].dline.intersects(df.iloc[item[1]].dline)
for item in index_combinations
]
groups = combine_shared(index_combinations[check_crossing])
keep_index = [
item[
np.argmax(
[
on[int(row.ind1) : int(row.ind2) + 1].sum()
for index, row in df.iloc[item].iterrows()
]
)
]
for item in groups
]
return df.iloc[keep_index]
# apply all four routines and stop if no basepoints are left after a routine
routines = [
check_duplicates,
check_intersections,
check_overlapping,
check_groups,
]
i = 0
while len(df_bp.index) > 1 and i <= 3:
df_bp = routines[i](df_bp).reset_index(drop=True)
i += 1
# define Polygons
dates = pd.DataFrame({"date": [series.date for i in range(0, len(df_bp))]})
levels = pd.DataFrame({"level": [series.level for i in range(0, len(df_bp))]})
polys = [
Polygon(contour_index[int(row.ind1) : int(row.ind2) + 1])
for index, row in df_bp.iterrows()
]
streamers.append(
gpd.GeoDataFrame(pd.concat([dates, levels], axis=1), geometry=polys)
)
# concat GeoDataFrames
if len(streamers) == 0:
return gpd.GeoDataFrame()
else:
gdf = pd.concat(streamers).reset_index(drop=True)
gdf = gdf.reset_index().rename(columns={"index": "id"})
# calculate properties and transform polygons
return gpd.GeoDataFrame(
calculate_properties(gdf, data, intensity, periodic_add, **kwargs),
geometry=transform_polygons(gdf, data, **kwargs).geometry,
)