from functools import cache
import json
import logging
import math
import abc
from collections import defaultdict
import csv
from datetime import datetime
import inspect
import pathlib
import re
import sys
import types
from typing import Any
import numpy as np
import numpy.typing as npt
from .Data import Data, Flag
from .Station import Station
from ..mathutils import haversine
try:
# Optional dependencies required for relative altitude filter.
import xarray as xr
from cf_units import Unit
except ImportError:
pass
logger = logging.getLogger(__name__)
[docs]
class Filter(abc.ABC):
"""Base-class for all filters used from pyaro-Readers"""
time_format = "%Y-%m-%d %H:%M:%S"
def __init__(self, **kwargs):
"""constructor of Filters. All filters must have a default constructor without kwargs
for an empty filter object"""
return
[docs]
def args(self) -> dict[str, Any]:
"""retrieve the kwargs possible to retrieve a new object of this filter with filter restrictions
:return: a dictionary possible to use as kwargs for the new method
"""
ba = inspect.signature(self.__class__.__init__).bind(0)
ba.apply_defaults()
args = ba.arguments
args.pop("self")
return args
[docs]
@abc.abstractmethod
def init_kwargs(self) -> dict:
"""return the init kwargs"""
[docs]
@abc.abstractmethod
def name(self) -> str:
"""Return a unique name for this filter
:return: a string to be used by FilterFactory
"""
[docs]
def filter_data(
self, data: Data, stations: dict[str, Station], variables: list[str]
) -> Data:
"""Filtering of data
:param data: Data of e.g. a Reader.data(varname) call
:param stations: List of stations, e.g. from a Reader.stations() call
:param variables: variables, i.e. from a Reader.variables() call
:return: a updated Data-object with this filter applied
"""
return data
[docs]
def filter_stations(self, stations: dict[str, Station]) -> dict[str, Station]:
"""Filtering of stations list
:param stations: List of stations, e.g. from a Reader.stations() call
:return: dict of filtered stations
"""
return stations
[docs]
def filter_variables(self, variables: list[str]) -> list[str]:
"""Filtering of variables
:param variables: List of variables, e.g. from a Reader.variables() call
:return: List of filtered variables.
"""
return variables
def __repr__(self):
return f"{type(self).__name__}(**{self.init_kwargs()})"
[docs]
class DataIndexFilter(Filter):
"""A abstract baseclass implementing filter_data by an abstract method
filter_data_idx"""
[docs]
@abc.abstractmethod
def filter_data_idx(
self, data: Data, stations: dict[str, Station], variables: list[str]
):
"""Filter data to an index which can be applied to Data.slice(idx) later
:return: a index for Data.slice(idx)
"""
pass
[docs]
def filter_data(
self, data: Data, stations: dict[str, Station], variables: list[str]
) -> Data:
idx = self.filter_data_idx(data, stations, variables)
return data.slice(idx)
class FilterFactoryException(Exception):
pass
class FilterException(Exception):
pass
[docs]
class FilterFactory:
def __new__(cls):
if not hasattr(cls, "instance"):
cls.instance = super(FilterFactory, cls).__new__(cls)
cls.instance._filters = {}
return cls.instance
[docs]
def register(self, filter: Filter):
"""Register a new filter to the factory
with a filter object (might be empty)
:param filter: a filter implementation
"""
if filter.name() in self._filters:
raise FilterFactoryException(
f"Cannot use {filter}: {filter.name()} already in use by {self.get(filter.name())}"
)
self._filters[filter.name()] = filter
[docs]
def get(self, name, **kwargs):
"""Get a filter by name. If kwargs are given, they will be send to the
filters new method
:param name: a filter-name
:return: a filter, optionally initialized
"""
filter = self._filters[name]
return filter.__class__(**kwargs)
[docs]
def list(self) -> dict[str, Filter]:
"""List all available filter-names and their initializations"""
return types.MappingProxyType(self._filters)
filters = FilterFactory()
def registered_filter(filter_class):
"""Simple decorator to register a FilterClass to the FilterFactory on construction
:param filter_class: class to register
"""
filters.register(filter_class())
return filter_class
class FilterCollectionException(Exception):
pass
[docs]
class FilterCollection:
"""A collection of DataIndexFilters which can be applied together.
:param filterlist: _description_, defaults to []
:return: _description_
"""
def __init__(self, filterlist=[]):
self._filters = []
tmp_filterlist = []
if isinstance(filterlist, dict):
for name, kwargs in filterlist.items():
tmp_filterlist.append(filters.get(name, **kwargs))
else:
tmp_filterlist = filterlist
for f in tmp_filterlist:
self.add(f)
[docs]
def add(self, difilter: DataIndexFilter):
if not isinstance(difilter, DataIndexFilter):
raise FilterCollectionException(
f"filter not a DataIndexFilter, so can't add to collection"
)
else:
self._filters.append(difilter)
[docs]
def filter_data(
self, data: Data, stations: dict[str, Station], variables: str
) -> Data:
"""Filter data with all filters in this collection.
:param data: Data from a timeseries-reader, i.e. retrieved by ts.data(varname)
:param stations: stations-dict of a reader, i.e. retrieved by ts.stations()
:param variables: variables of a reader, i.e. retrieved by ts.variables()
:return: _description_
"""
for fi in self._filters:
data = fi.filter_data(data, stations, variables)
return data
[docs]
def filter(self, ts_reader, variable: str) -> Data:
"""Filter the data for a variable of a reader with all filters in this collection.
:param ts_reader: a timeseries-reader instance
:param variable: a valid variable-name
:return: filtered data
"""
stations = ts_reader.stations()
variables = ts_reader.variables()
data = ts_reader.data(variable)
return self.filter_data(data, stations, variables)
def __iter__(self):
return self._filters.__iter__()
[docs]
@registered_filter
class VariableNameFilter(Filter):
"""Filter to change variable-names and/or include/exclude variables
:param reader_to_new: dictionary from readers-variable names to new variable-names,
e.g. used in your project, defaults to {}
:param include: list of variables to include only (new names if changed), defaults to []
meaning keep all variables unless excluded.
:param exclude: list of variables to exclude (new names if changed), defaults to []
"""
def __init__(
self,
reader_to_new: dict[str, str] = {},
include: list[str] = [],
exclude: list[str] = [],
):
self._reader_to_new = reader_to_new
self._new_to_reader = {v: k for k, v in reader_to_new.items()}
self._include = set(include)
self._exclude = set(exclude)
return
[docs]
def init_kwargs(self):
return {
"reader_to_new": self._reader_to_new,
"include": list(self._include),
"exclude": list(self._exclude),
}
[docs]
def name(self):
return "variables"
[docs]
def reader_varname(self, new_variable: str) -> str:
"""convert a new variable name to a reader-variable name
:param new_variable: variable name after translation
:return: variable name in the original reader
"""
return self._new_to_reader.get(new_variable, new_variable)
[docs]
def new_varname(self, reader_variable: str) -> str:
"""convert a reader-variable to a new variable name
:param reader_variable: variable as used in the reader
:return: variable name after translation
"""
return self._reader_to_new.get(reader_variable, reader_variable)
[docs]
def filter_data(self, data, stations, variables) -> Data:
"""Translate data's variable"""
from .Wrappers import VariableNameChangingReaderData
return VariableNameChangingReaderData(
data, self._reader_to_new.get(data.variable, data.variable)
)
[docs]
def filter_variables(self, variables: list[str]) -> list[str]:
"""change variable name and reduce variables applying include and exclude parameters
:param variables: variable names as in the reader
:return: valid variable names in translated nomenclature
"""
newlist = []
for x in variables:
newvar = self.new_varname(x)
if self.has_variable(newvar):
newlist.append(newvar)
return newlist
[docs]
def has_variable(self, variable) -> bool:
"""check if a variable-name is in the list of variables applying include and exclude
:param variable: variable name in translated, i.e. new scheme
:return: True or False
"""
if len(self._include) > 0:
if not variable in self._include:
return False
if variable in self._exclude:
return False
return True
[docs]
def has_reader_variable(self, variable) -> bool:
"""Check if variable-name is in the list of variables applying include and exclude
:param variable: variable as returned from the reader
:return: True or False
"""
new_var = self.new_varname(variable)
return self.has_variable(new_var)
class StationReductionFilter(DataIndexFilter):
"""Abstract method for all filters, which work on reducing the number of stations only.
The filtering of stations has to be implemented by subclasses, while filtering of data
is already implemented.
"""
@abc.abstractmethod
def filter_stations(self, stations: dict[str, Station]) -> dict[str, Station]:
pass
def filter_data_idx(
self, data: Data, stations: dict[str, Station], variables: list[str]
):
stat_names = self.filter_stations(stations).keys()
dstations = data.stations
stat_names = np.fromiter(stat_names, dtype=dstations.dtype)
index = np.isin(dstations, stat_names)
return index
[docs]
@registered_filter
class StationFilter(StationReductionFilter):
def __init__(self, include: list[str] = [], exclude: list[str] = []):
self._include = set(include)
self._exclude = set(exclude)
return
[docs]
def init_kwargs(self):
return {"include": list(self._include), "exclude": list(self._exclude)}
[docs]
def name(self):
return "stations"
[docs]
def has_station(self, station) -> bool:
if len(self._include) > 0:
if station not in self._include:
return False
if station in self._exclude:
return False
return True
[docs]
def filter_stations(self, stations: dict[str, Station]) -> dict[str, Station]:
return {s: v for s, v in stations.items() if self.has_station(s)}
[docs]
@registered_filter
class CountryFilter(StationReductionFilter):
"""Filter countries by ISO2 names (capitals!)
:param include: countries to include, defaults to [], meaning all countries
:param exclude: countries to exclude, defaults to [], meaning none
"""
def __init__(self, include: list[str] = [], exclude: list[str] = []):
self._include = set(include)
self._exclude = set(exclude)
return
[docs]
def init_kwargs(self):
return {"include": list(self._include), "exclude": list(self._exclude)}
[docs]
def name(self):
return "countries"
[docs]
def has_country(self, country) -> bool:
if len(self._include) > 0:
if country not in self._include:
return False
if country in self._exclude:
return False
return True
[docs]
def filter_stations(self, stations: dict[str, Station]) -> dict[str, Station]:
return {s: v for s, v in stations.items() if self.has_country(v.country)}
class BoundingBoxException(Exception):
pass
[docs]
@registered_filter
class BoundingBoxFilter(StationReductionFilter):
"""Filter using geographical bounding-boxes. Coordinates should be given in the range
[-180,180] (degrees_east) for longitude and [-90,90] (degrees_north) for latitude.
Order of coordinates is clockwise starting with north, i.e.: (north, east, south, west) = NESW
:param include: bounding boxes to include. Each bounding box is a tuple of four float for
(NESW), defaults to [] meaning no restrictions
:param exclude: bounding boxes to exclude. Defaults to []
:raises BoundingBoxException: on any errors of the bounding boxes
"""
def __init__(
self,
include: list[tuple[float, float, float, float]] = [],
exclude: list[tuple[float, float, float, float]] = [],
):
for tup in include:
self._test_bounding_box(tup)
for tup in exclude:
self._test_bounding_box(tup)
self._include = set(include)
self._exclude = set(exclude)
return
def _test_bounding_box(self, tup):
"""_summary_
:param tup: A bounding-box tuple of form (north, east, south, west)
:raises BoundingBoxException: on any errors of the bounding box
"""
if len(tup) != 4:
raise BoundingBoxException(f"({tup}) has not four NESW elements")
if not (-90 <= tup[0] <= 90):
raise BoundingBoxException(f"north={tup[0]} not within [-90,90] in {tup}")
if not (-90 <= tup[2] <= 90):
raise BoundingBoxException(f"south={tup[2]} not within [-90,90] in {tup}")
if not (-180 <= tup[1] <= 180):
raise BoundingBoxException(f"east={tup[1]} not within [-180,180] in {tup}")
if not (-180 <= tup[3] <= 180):
raise BoundingBoxException(f"west={tup[3]} not within [-180,180] in {tup}")
if tup[0] < tup[2]:
raise BoundingBoxException(f"north={tup[0]} < south={tup[2]} in {tup}")
if tup[1] < tup[3]:
raise BoundingBoxException(f"east={tup[1]} < west={tup[3]} in {tup}")
return True
[docs]
def init_kwargs(self):
return {"include": list(self._include), "exclude": list(self._exclude)}
[docs]
def name(self):
return "bounding_boxes"
[docs]
def has_location(self, latitude, longitude):
"""Test if the locations coordinates are part of this filter.
:param latitude: latitude coordinate in degree_north [-90, 90]
:param longitude: longitude coordinate in degree_east [-180, 180]
"""
if len(self._include) == 0:
inside_include = True
else:
inside_include = False
for n, e, s, w in self._include:
if not inside_include: # one inside test is enough
if s <= latitude <= n:
if w <= longitude <= e:
inside_include = True
if not inside_include:
return False # no more tests required
outside_exclude = True
for n, e, s, w in self._exclude:
if (
outside_exclude
): # if known to be inside of any other exclude BB, no more tests
if s <= latitude <= n:
if w <= longitude <= e:
outside_exclude = False
return inside_include & outside_exclude
[docs]
def filter_stations(self, stations: dict[str, Station]) -> dict[str, Station]:
return {
s: v
for s, v in stations.items()
if self.has_location(v.latitude, v.longitude)
}
[docs]
@registered_filter
class FlagFilter(DataIndexFilter):
"""Filter data by Flags
:param include: flags to include, defaults to [], meaning all flags
:param exclude: flags to exclude, defaults to [], meaning none
"""
def __init__(self, include: list[Flag] = [], exclude: list[Flag] = []):
self._include = set(include)
if len(include) == 0:
all_include = set([f for f in Flag])
else:
all_include = self._include
self._exclude = set(exclude)
self._valid = all_include.difference(self._exclude)
return
[docs]
def name(self):
return "flags"
[docs]
def init_kwargs(self):
return {"include": list(self._include), "exclude": list(self._exclude)}
[docs]
def usable_flags(self):
return self._valid
[docs]
def filter_data_idx(
self, data: Data, stations: dict[str, Station], variables: list[str]
):
validflags = np.fromiter(self._valid, dtype=data.flags.dtype)
index = np.isin(data.flags, validflags)
return index
# Upper and lower bound inclusive
TimeBound = tuple[str | np.datetime64 | datetime, str | np.datetime64 | datetime]
# Internal representation
_TimeBound = tuple[np.datetime64, np.datetime64]
class TimeBoundsException(Exception):
pass
[docs]
@registered_filter
class TimeBoundsFilter(DataIndexFilter):
"""Filter data by start and/or end-times of the measurements. Each timebound consists
of a bound-start and bound-end (both included). Timestamps are given as YYYY-MM-DD HH:MM:SS in UTC
:param start_include: list of tuples of start-times, defaults to [], meaning all
:param start_exclude: list of tuples of start-times, defaults to []
:param startend_include: list of tuples of start and end-times, defaults to [], meaning all
:param startend_exclude: list of tuples of start and end-times, defaults to []
:param end_include: list of tuples of end-times, defaults to [], meaning all
:param end_exclude: list of tuples of end-times, defaults to []
:raises TimeBoundsException: on any errors with the time-bounds
Examples:
end_include: `[("2023-01-01 10:00:00", "2024-01-01 07:00:00")]`
will only include observations where the end time of each observation
is within the interval specified
(i.e. "end" >= 2023-01-01 10:00:00 and "end" <= "2024-01-01 07:00:00")
Including multiple bounds will act as an OR, allowing multiple selections.
If we want every observation in January for 2021, 2022, 2023, and 2024 this
could be made as the following filter::
startend_include: [
("2021-01-01 00:00:00", "2021-02-01 00:00:00"),
("2022-01-01 00:00:00", "2022-02-01 00:00:00"),
("2023-01-01 00:00:00", "2023-02-01 00:00:00"),
("2024-01-01 00:00:00", "2024-02-01 00:00:00"),
]
"""
def __init__(
self,
start_include: list[TimeBound] = [],
start_exclude: list[TimeBound] = [],
startend_include: list[TimeBound] = [],
startend_exclude: list[TimeBound] = [],
end_include: list[TimeBound] = [],
end_exclude: list[TimeBound] = [],
):
self._start_include = self._timebounds_canonicalise(start_include)
self._start_exclude = self._timebounds_canonicalise(start_exclude)
self._startend_include = self._timebounds_canonicalise(startend_include)
self._startend_exclude = self._timebounds_canonicalise(startend_exclude)
self._end_include = self._timebounds_canonicalise(end_include)
self._end_exclude = self._timebounds_canonicalise(end_exclude)
[docs]
def name(self):
return "time_bounds"
def _timebounds_canonicalise(self, tuple_list: list[TimeBound]) -> list[_TimeBound]:
retlist = []
for start, end in tuple_list:
if isinstance(start, str):
start_dt = np.datetime64(datetime.strptime(start, self.time_format))
else:
start_dt = np.datetime64(start)
if isinstance(end, str):
end_dt = np.datetime64(datetime.strptime(end, self.time_format))
else:
end_dt = np.datetime64(end)
if start_dt > end_dt:
raise TimeBoundsException(
f"(start later than end) for (f{start} > f{end})"
)
retlist.append((start_dt, end_dt))
return retlist
def _datetime_list_to_str_list(self, tuple_list) -> list[tuple[str, str]]:
retlist = []
for start_dt, end_dt in tuple_list:
retlist.append(
(
start_dt.astype(datetime).strftime(self.time_format),
end_dt.astype(datetime).strftime(self.time_format),
)
)
return retlist
[docs]
def init_kwargs(self) -> dict[str, list[tuple[str, str]]]:
return {
"start_include": self._datetime_list_to_str_list(self._start_include),
"start_exclude": self._datetime_list_to_str_list(self._start_exclude),
"startend_include": self._datetime_list_to_str_list(self._startend_include),
"startend_exclude": self._datetime_list_to_str_list(self._startend_exclude),
"end_include": self._datetime_list_to_str_list(self._startend_include),
"end_exclude": self._datetime_list_to_str_list(self._startend_exclude),
}
def _index_from_include_exclude(
self,
times1: npt.NDArray[np.datetime64],
times2: npt.NDArray[np.datetime64],
includes: list[_TimeBound],
excludes: list[_TimeBound],
):
if len(includes) == 0:
idx = np.repeat(True, len(times1))
else:
idx = np.repeat(False, len(times1))
for start, end in includes:
idx |= (start <= times1) & (times2 <= end)
for start, end in excludes:
idx &= (times1 < start) | (end < times2)
return idx
[docs]
def has_envelope(self) -> bool:
"""Check if this filter has an envelope, i.e. a earliest and latest time"""
return bool(
len(self._start_include)
or len(self._startend_include)
or len(self._end_include)
)
[docs]
def envelope(self) -> tuple[datetime, datetime]:
"""Get the earliest and latest time possible for this filter.
:return: earliest start and end-time (approximately)
:raises TimeBoundsException: if has_envelope() is False, or internal errors
"""
if not self.has_envelope():
raise TimeBoundsException(
"TimeBounds-envelope called but no envelope exists"
)
start = np.datetime64(datetime.max)
end = np.datetime64(datetime.min)
for s, e in self._start_include + self._startend_include + self._end_include:
start = min(start, s)
end = max(end, e)
if end < start:
raise TimeBoundsException(
f"TimeBoundsEnvelope end < start: {end} < {start}"
)
return (start.astype(datetime), end.astype(datetime))
[docs]
def contains(
self, dt_start: npt.NDArray[np.datetime64], dt_end: npt.NDArray[np.datetime64]
) -> npt.NDArray[np.bool_]:
"""Test if datetimes in dt_start, dt_end belong to this filter
:param dt_start: start of each observation as a numpy array of datetimes
:param dt_end: end of each observation as a numpy array of datetimes
:return: numpy boolean array with True/False values
"""
idx = self._index_from_include_exclude(
dt_start, dt_start, self._start_include, self._start_exclude
)
idx &= self._index_from_include_exclude(
dt_start, dt_end, self._startend_include, self._startend_exclude
)
idx &= self._index_from_include_exclude(
dt_end, dt_end, self._end_include, self._end_exclude
)
return idx
[docs]
def filter_data_idx(
self, data: Data, stations: dict[str, Station], variables: list[str]
) -> npt.NDArray[np.bool_]:
return self.contains(data.start_times, data.end_times)
[docs]
@registered_filter
class TimeVariableStationFilter(DataIndexFilter):
"""Exclude combinations of variable station and time from the data
This filter is really a cleanup of the database, but sometimes it is not possible to
modify the original database and the cleanup needs to be done on a filter basis.
:param exclude: tuple of 4 elements: start-time, end-time, variable, station
:param exclude_from_csvfile: this is a helper option to enable a large list of excludes
to be read from a "\t" separated file with columns
start \t end \t variable \t station
where start and end are timestamps of format YYYY-MM-DD HH:MM:SS in UTC, e.g.
the year 2020 is:
2020-01-01 00:00:00 \t 2020-12-31 23:59:59 \t ...
"""
def __init__(self, exclude=[], exclude_from_csvfile=""):
csvexclude = self._excludes_from_csv(exclude_from_csvfile)
self._exclude = self._order_exclude(exclude + csvexclude)
def _excludes_from_csv(self, file):
csvexcludes = []
if file:
with open(file, "rt", newline="") as fh:
crd = csv.reader(fh, delimiter="\t")
for row in crd:
try:
if len(row) == 0:
continue
if row[0].startswith("#"):
continue
if len(row) < 4:
raise Exception(f"need 4 elements in row, got {len(row)}")
datetime.strptime(row[0], self.time_format)
datetime.strptime(row[1], self.time_format)
csvexcludes.append((row[0], row[1], row[2], row[3]))
except Exception as ex:
raise Exception(
f"malformated TimeVariableStationFilter exclude file, row: {row}",
ex,
)
return csvexcludes
def _order_exclude(self, exclude):
"""Order excludes to a dict of: [variable][start_time][end_time] -> list[stations]
:param excludes: tuples of start-time, end-time, variable, station
"""
retval = defaultdict(lambda: defaultdict(lambda: defaultdict(lambda: [])))
for start_time, end_time, variable, station in exclude:
# make sure start and end_time can be parsed
datetime.strptime(start_time, self.time_format)
datetime.strptime(end_time, self.time_format)
retval[variable][start_time][end_time].append(station)
return retval
[docs]
def init_kwargs(self):
retval = []
for var, start_times in sorted(self._exclude.items()):
for start_time, end_times in sorted(start_times.items()):
for end_time, stations in sorted(end_times.items()):
for station in sorted(stations):
retval.append((start_time, end_time, var, station))
# sort by start_time
retval.sort(key=lambda x: x[1])
return {"exclude": retval}
[docs]
def name(self):
return "time_variable_station"
[docs]
def filter_data_idx(
self, data: Data, stations: dict[str, Station], variables: list[str]
):
idx = data.start_times.astype(bool)
idx |= True
if data.variable in self._exclude:
for start_time, end_times in self._exclude[data.variable].items():
start_time_dt = datetime.strptime(start_time, self.time_format)
for end_time, stations in end_times.items():
end_time_dt = datetime.strptime(end_time, self.time_format)
dstations = data.stations
stat_names = np.fromiter(stations, dtype=dstations.dtype)
exclude_idx = np.isin(dstations, stat_names)
exclude_idx &= (start_time_dt <= data.start_times) & (
end_time_dt > data.start_times
)
idx &= np.logical_not(exclude_idx)
return idx
[docs]
@registered_filter
class DuplicateFilter(DataIndexFilter):
"""remove duplicates from the data. By default, data with common
station, start_time, end_time are consider duplicates. Only one of the duplicates
is kept.
:param duplicate_keys: list of data-fields/columns, defaults to None, being the same
as ["stations", "start_times", "end_times"]
"""
default_keys = ["stations", "start_times", "end_times"]
def __init__(self, duplicate_keys: list[str] | None = None):
self._keys = duplicate_keys
[docs]
def init_kwargs(self):
if self._keys is None:
return {}
else:
return {"duplicate_keys": self._keys}
[docs]
def name(self):
return "duplicates"
[docs]
def filter_data_idx(
self, data: Data, stations: dict[str, Station], variables: list[str]
):
if self._keys is None:
xkeys = self.default_keys
else:
xkeys = self._keys
return np.unique(data[xkeys], return_index=True)[1]
[docs]
@registered_filter
class TimeResolutionFilter(DataIndexFilter):
"""The timeresolution filter allows to restrict the observation data to
certain time-resolutions. Time-resolutions are not exact, and might be interpreted
slightly differently by different observation networks.
Default named time-resolutions are
* minute: 59 to 61 s (+-1sec)
* hour: 59*60 s to 61*60 s (+-1min)
* day: 22:59:00 to 25:01:00 to allow for leap-days and a extra min
* week: 6 to 8 days (+-1 day)
* month: 27-33 days (30 +- 3 days)
* year: 360-370 days (+- 5days)
:param resolutions: a list of wanted time resolutions. A resolution consists of a integer
number and a time-resolution name, e.g. 3 hour (no plural).
"""
pattern = re.compile(r"\s*(\d+)\s*(\w+)\s*")
named_resolutions = dict(
minute=(59, 61),
hour=(59 * 60, 61 * 60),
day=(60 * (59 + (60 * 22)), 60 * (1 + (60 * 25))),
week=(6 * 24 * 60 * 60, 8 * 24 * 60 * 60),
month=(27 * 24 * 60 * 60, 33 * 24 * 60 * 60),
year=(360 * 24 * 60 * 60, 370 * 24 * 60 * 60),
)
def __init__(self, resolutions: list[str] = []):
self._resolutions = resolutions
self._minmax = self._resolve_resolutions()
def _resolve_resolutions(self):
minmax_list = []
for res in self._resolutions:
minmax = None
if m := self.pattern.match(res):
count = int(m[1])
name = m[2]
if name in self.named_resolutions:
minmax = tuple(count * x for x in self.named_resolutions[name])
if minmax is None:
raise FilterException(f"Cannot parse time-resolution of {res}")
else:
minmax_list.append(minmax)
return minmax_list
[docs]
def init_kwargs(self):
if len(self._resolutions) == 0:
return {}
else:
return {"resolutions": self._resolutions}
[docs]
def name(self):
return "time_resolution"
[docs]
def filter_data_idx(
self, data: Data, stations: dict[str, Station], variables: list[str]
):
idx = data.start_times.astype(bool)
idx[:] = True
if len(self._minmax) > 0:
idx[:] = False
data_resolution = (data.end_times - data.start_times) / np.timedelta64(
1, "s"
)
for minmax in self._minmax:
idx |= (minmax[0] <= data_resolution) & (data_resolution <= minmax[1])
return idx
[docs]
@registered_filter
class AltitudeFilter(StationReductionFilter):
"""
Filter which filters stations based on their altitude. Can be used to filter for a
minimum and/or maximum altitude.
:param min_altitude : float of minimum altitude in meters required to keep the station (inclusive).
:param max_altitude : float of maximum altitude in meters required to keep the station (inclusive).
If station elevation is nan, it is always excluded.
"""
def __init__(
self, min_altitude: float | None = None, max_altitude: float | None = None
):
if min_altitude is not None and max_altitude is not None:
if min_altitude > max_altitude:
raise ValueError(
f"min_altitude ({min_altitude}) > max_altitude ({max_altitude})."
)
self._min_altitude = min_altitude
self._max_altitude = max_altitude
[docs]
def init_kwargs(self):
return {"min_altitude": self._min_altitude, "max_altitude": self._max_altitude}
[docs]
def name(self):
return "altitude"
[docs]
def filter_stations(self, stations: dict[str, Station]) -> dict[str, Station]:
if self._min_altitude is not None:
stations = {
n: s
for n, s in stations.items()
if (
not math.isnan(s["altitude"])
and s["altitude"] >= self._min_altitude
)
}
if self._max_altitude is not None:
stations = {
n: s
for n, s in stations.items()
if (
not math.isnan(s["altitude"])
and s["altitude"] <= self._max_altitude
)
}
return stations
[docs]
@registered_filter
class RelativeAltitudeFilter(StationFilter):
"""
Filter class which filters stations based on the relative difference between
the station altitude, and the gridded topography altitude.
:param topo_file: A .nc file from which to read gridded topography data.
:param topo_var: Name of variable that stores altitude.
:param rdiff: Relative difference (in meters).
Note:
-----
- Stations will be kept if abs(altobs-altmod) <= rdiff.
- Stations will not be kept if station altitude is NaN.
Note:
-----
This filter requires additional dependencies (xarray, netcdf4, cf-units) to function. These can be installed
with `pip install .[optional]
"""
# https://cfconventions.org/Data/cf-conventions/cf-conventions-1.11/cf-conventions.html#latitude-coordinate
_UNITS_LAT = set(
[
"degrees_north",
"degree_north",
"degree_N",
"degrees_N",
"degreeN",
"degreesN",
]
)
# https://cfconventions.org/Data/cf-conventions/cf-conventions-1.11/cf-conventions.html#longitude-coordinate
_UNITS_LON = set(
["degrees_east", "degree_east", "degree_E", "degrees_E", "degreeE", "degreesE"]
)
def __init__(
self,
topo_file: str | None = None,
topo_var: str = "topography",
rdiff: float = 0,
):
if "cf_units" not in sys.modules:
logger.info(
"relaltitude filter is missing dependency 'cf-units'. Please install to use."
)
if "xarray" not in sys.modules:
logger.info(
"relaltitude filter is missing dependency 'xarray'. Please install to use."
)
self._topo_file = topo_file
self._topo_var = topo_var
self._rdiff = rdiff
# topography and unit-m property initialization
self._topography = None
self._UNITS_METER = None
@property
def UNITS_METER(self):
"""internal representation of units, don't use
:return: m-unit in internal representation
:meta private:
"""
if self._UNITS_METER is None:
self._UNITS_METER = Unit("m")
return self._UNITS_METER
@property
def topography(self):
"""Internal property, don't use.
:raises ModuleNotFoundError: if cf-units or xarray is not installed
:raises FilterException: if topograpy file is not provided
:return: topography as internal representation
:meta private:
"""
if "cf_units" not in sys.modules:
raise ModuleNotFoundError(
"relaltitude filter is missing required dependency 'cf-units'. Please install to use this filter."
)
if "xarray" not in sys.modules:
raise ModuleNotFoundError(
"relaltitude filter is missing required dependency 'xarray'. Please install to use this filter."
)
if self._topography is None:
if self._topo_file is None:
raise FilterException(
f"No topography data provided (topo_file='{self._topo_file}'). Relative elevation filtering will not be applied."
)
else:
try:
with xr.open_dataset(self._topo_file) as topo:
self._topography = self._convert_altitude_to_meters(topo)
lat, lon = self._find_lat_lon_variables(topo)
self._extract_bounding_box(lat, lon)
except Exception as ex:
raise FilterException(
f"Cannot read topography from '{self._topo_file}:{self._topo_var}' : {ex}"
)
return self._topography
def _convert_altitude_to_meters(self, topo_xr):
"""
Method which attempts to convert the altitude variable in the gridded topography data
to meters.
:param topo_xr xarray dataset containing topo
:raises TypeError
If conversion isn't possible.
:return xr.DataArray
"""
# Convert altitude to meters
units = Unit(topo_xr[self._topo_var].units)
if units.is_convertible(self.UNITS_METER):
topography = topo_xr[self._topo_var]
topography.values = self.UNITS_METER.convert(
topography.values, self.UNITS_METER
)
topography["units"] = str(self.UNITS_METER)
else:
raise TypeError(
f"Expected altitude units to be convertible to 'm', got '{units}'"
)
return topography
def _find_lat_lon_variables(self, topo_xr):
"""
Find and load DataArrays from topo which represent the latitude and longitude
dimensions in the topography data.
These are assigned to self._lat, self._lon, respectively for later use.
:param topo_xr: xr.Dataset of topography
:return: lat, lon DataArrays
"""
for var_name in self._topography.coords:
unit_str = self._topography[var_name].attrs.get("units", None)
if unit_str in self._UNITS_LAT:
lat = topo_xr[var_name]
continue
if unit_str in self._UNITS_LON:
lon = topo_xr[var_name]
continue
if any(x is None for x in [lat, lon]):
raise ValueError(
f"Required variable names for lat, lon dimensions could not be found in file '{self._topo_file}"
)
return lat, lon
def _extract_bounding_box(self, lat, lon):
"""
Extract the bounding box of the grid, sets self._boundary_(north|east|south|west)
:param lat: latitude (DataArray)
:param lon: longitude (DataArray)
"""
self._boundary_west = float(lon.min())
self._boundary_east = float(lon.max())
self._boundary_south = float(lat.min())
self._boundary_north = float(lat.max())
logger.info(
"Bounding box (NESW) of topography: %.2f, %.2f, %.2f, %.2f",
self._boundary_north,
self._boundary_east,
self._boundary_south,
self._boundary_west,
)
def _gridded_altitude_from_lat_lon(
self, lat: np.ndarray, lon: np.ndarray
) -> np.ndarray:
altitude = self.topography.sel(
{
"lat": xr.DataArray(lat, dims="latlon"),
"lon": xr.DataArray(lon, dims="latlon"),
},
method="nearest",
)
return altitude.values[0]
def _is_close(
self, alt_gridded: np.ndarray, alt_station: np.ndarray
) -> npt.NDArray[np.bool_]:
"""
Function to check if two altitudes are within a relative tolerance of each
other.
:param alt_gridded : Gridded altitude (in meters).
:param alt_station : Observation / station altitude (in meters).
:returns :
True if the absolute difference between alt_gridded and alt_station is
<= self._rdiff
"""
return np.abs(alt_gridded - alt_station) <= self._rdiff
[docs]
def init_kwargs(self):
return {
"topo_file": self._topo_file,
"topo_var": self._topo_var,
"rdiff": self._rdiff,
}
[docs]
def name(self):
return "relaltitude"
[docs]
def filter_stations(self, stations: dict[str, Station]) -> dict[str, Station]:
if self.topography is None:
return stations
names = np.ndarray(len(stations), dtype=np.dtypes.StrDType)
lats = np.ndarray(len(stations), dtype=np.float64)
lons = np.ndarray(len(stations), dtype=np.float64)
alts = np.ndarray(len(stations), dtype=np.float64)
for i, name in enumerate(stations):
station = stations[name]
names[i] = name
lats[i] = station["latitude"]
lons[i] = station["longitude"]
alts[i] = station["altitude"]
out_of_bounds_mask = np.logical_or(
np.logical_or(lons < self._boundary_west, lons > self._boundary_east),
np.logical_or(lats < self._boundary_south, lats > self._boundary_north),
)
if np.sum(out_of_bounds_mask) > 0:
logger.warning(
"Some stations were removed due to being out of bounds of the gridded topography"
)
topo = self._gridded_altitude_from_lat_lon(lats, lons)
within_rdiff_mask = self._is_close(topo, alts)
mask = np.logical_and(~out_of_bounds_mask, within_rdiff_mask)
selected_names = names[mask]
return {name: stations[name] for name in selected_names}
[docs]
@registered_filter
class ValleyFloorRelativeAltitudeFilter(StationFilter):
"""
Filter for filtering stations based on the difference between the station altitude and valley
floor altitude (defined as the lowest altitude within a radius around the station). This ensures
that plateau sites are treated like "surface" sites, while sites in hilly or mountaineous areas
(eg. Schauinsland) are considered mountain sites. This approach has been used by several papers
(eg. Fowler et al., Lloibl et al. 1994).
:param topo: Topography file path (either a file or a directory). Must be a dataset openable by
xarray, with latitude and longitude stored as "lat" and "lon" respectively. The variable
that contains elevation data is assumed to be in meters. If `topo` is a directory, a
metadata.json file containing the geographic bounds of each file must be present (see below
for example).
:param radius: Radius (in meters)
:param topo_var: Variable name to use in topography dataset
:param lower: Optional lower bound needed for relative altitude for station to be kept (in meters)
:param upper: Optional upper bound needed for relative altitude for station to be kept (in meters)
:param keep_nan: Whether to keep values where relative altitude is calculated as nan. Defaults to True.
Note: Since the topography does not contain values for oceans this may happen for small islands and
coastal stations.
:raises ModuleNotFoundError: If necessary required additional dependencies (cf_units, xarray) are
not available.
Note
----
This implementation is only tested with GTOPO30 dataset to far.
Available versions of gtopo30 can be found here:
`/lustre/storeB/project/aerocom/aerocom1/AEROCOM_OBSDATA/GTOPO30/`
Note
----
metadata.json should contain a mapping from each nc file, to it's geographic latitude/longitude
bounds.
For example:
```
{
"N.nc": {
"w": -180,
"e": 180,
"n": 90,
"s": -10
},
"S.nc": {
"w": -180,
"e": 180,
"n": -10,
"s": -90
}
}
```
"""
def __init__(
self,
topo: str | None = None,
*,
radius: float = 5000,
topo_var: str = "Band1",
lower: float | None = None,
upper: float | None = None,
keep_nan: bool = True,
):
if "cf_units" not in sys.modules:
logger.info(
"valleyfloor_relaltitude filter is missing required dependency 'cf-units'. Please install to use this filter."
)
if "xarray" not in sys.modules:
logger.info(
"valleyfloor_relaltitude filter is missing required dependency 'xarray'. Please install to use this filter."
)
self._topo = None
if topo is not None:
try:
self._topo = pathlib.Path(topo)
except TypeError as e:
raise TypeError(
f"Topo needs to be an instance of str. Got {type(topo)}."
) from e
if not self._topo.exists():
logger.warning(
f"Provided location for topography data ({self._topo}) does not exist. It should be either a .nc file, or a folder with several .nc files and a metadata.json file."
)
self._topo_var = topo_var
self._radius = radius
self._lower = lower
self._upper = upper
self._keep_nan = keep_nan
@property
@cache
def _metadata(self) -> dict:
if not self._topo.is_dir():
raise RuntimeError("Should be impossible...")
metadata_file = self._topo / "metadata.json"
try:
with open(metadata_file) as f:
return json.load(f)
except FileNotFoundError as e:
raise FileNotFoundError(
f"No 'metadata.json' file found in directory."
) from e
[docs]
def init_kwargs(self):
return {
# Converting to string for serialization purposes.
"topo": None if self._topo is None else str(self._topo),
"topo_var": self._topo_var,
"radius": self._radius,
"lower": self._lower,
"upper": self._upper,
"keep_nan": self._keep_nan,
}
[docs]
def name(self):
return "valleyfloor_relaltitude"
def _get_topo_file_path(self, lat: float, lon: float) -> pathlib.Path:
"""Returns the path of the topofile that needs to be read for the lat / lon pair.
:param lat: Latitude
:param lon: Longitude
:raises FileNotFoundError: If self._topo does not exist.
:raises FileNotFoundError: If self._topo is a directory and 'metadata.json' does not exist.
:return: Boolean indicating whether _topo_file changed.
"""
file_path = None
if self._topo.is_file():
file_path = self._topo
if self._topo.is_dir():
metadata = self._metadata
file = None
for file in metadata:
if lat < metadata[file]["s"] or lat > metadata[file]["n"]:
continue
if lon < metadata[file]["w"] or lon > metadata[file]["e"]:
continue
file_path = self._topo / file
break
else:
raise FileNotFoundError(
f"No matching topography file found for coordinate pair (lat={lat:.6f}; lon={lon:.6f})"
)
if file_path is None:
raise FileNotFoundError
return file_path
def _batch_stations(
self, stations: dict[str, Station]
) -> dict[pathlib.Path, dict[str, Station]]:
"""Batches a stations dict according to the topography file that needs to be read in order
to calculate relative altitude.
:param stations: Dict mapping of str id to Station (as passed to .filter_stations()).
:return: A dict mapping the topography file path to a Stations dict.
"""
result = {}
for k, v in stations.items():
topo_file = self._get_topo_file_path(v.latitude, v.longitude)
if topo_file not in result:
result[topo_file] = {}
result[topo_file][k] = v
return result
[docs]
def filter_stations(self, stations: dict[str, Station]) -> dict[str, Station]:
if self._topo is None or (self._upper is None and self._lower is None):
# Default initialized filter should not do anything, so return unfiltered stations.
return stations
if "cf_units" not in sys.modules:
raise ModuleNotFoundError(
"valleyfloor_relaltitude filter is missing required dependency 'cf-units'. Please install to use this filter."
)
if "xarray" not in sys.modules:
raise ModuleNotFoundError(
"valleyfloor_relaltitude filter is missing required dependency 'xarray'. Please install to use this filter."
)
if not self._topo.exists():
raise FileNotFoundError(
f"Provided location for topography data ({self._topo}) does not exist. It should be either a .nc file, or a folder with several .nc files and a metadata.json file."
)
filtered_stations = {}
batches = self._batch_stations(stations)
for topo_file, stations in batches.items():
topo = xr.load_dataset(topo_file)
names = np.array([k for k in stations.keys()])
latitudes = np.array([s.latitude for s in stations.values()])
longitudes = np.array([s.longitude for s in stations.values()])
altitudes = np.array([s.altitude for s in stations.values()])
stats = np.array(list(stations.values()))
ralt = self._calculate_relative_altitude(
latitudes,
longitudes,
radius=self._radius,
altitudes=altitudes,
topo=topo,
)
mask = np.ones_like(ralt)
if self._lower is not None:
mask = np.logical_and(mask, (ralt >= self._lower))
if self._upper is not None:
mask = np.logical_and(mask, (ralt <= self._upper))
if self._keep_nan:
mask = np.logical_or(mask, np.isnan(ralt))
for name, stat in zip(names[mask], stats[mask]):
filtered_stations[name] = stat
return filtered_stations
def _calculate_relative_altitude(
self,
lats: np.ndarray,
lons: np.ndarray,
*,
radius: float,
altitudes: np.ndarray,
topo: "xr.Dataset",
) -> np.ndarray:
"""Calculates relative altitude for multiple latitude-longitude pairs
:param lats: Array of latitudes
:param lons: Array of longitudes
:param radius: Radius for base altitude calculation (in meters)
:param altitudes: Array of station altitudes (in meters)
:param topo: Topography dataset
:return:
Array of relative altitudes (in meters)
"""
nptopo = topo[self._topo_var].values
topolat = topo[self._topo_var]["lat"].values
topolon = topo[self._topo_var]["lon"].values
# Indexes of the latitude and longitude of the stations in the topo dataset.
latidx = np.searchsorted(topolat, lats)
lonidx = np.searchsorted(topolon, lons)
relative_altitudes = np.empty_like(lats, dtype=np.float64)
# Margin for rough slicing of topo data, to avoid expensive distance calculation.
dist = abs(topolat[1] - topolat[0])
margin = int(0.1 + (1 / dist) * (radius / 1_000) / 100)
for i, (lat, lon, altitude) in enumerate(zip(lats, lons, altitudes)):
# For small radiuses, do a rough slicing of topo dataset to avoid expensive distance
# calculation for distant points.
if radius < 100_000:
lat_slice = slice(latidx[i] - margin, latidx[i] + margin)
lat_subset = topolat[lat_slice]
if lat >= 88 or lat <= -88:
# Include 360deg longitude near poles
subset_topo = nptopo[lat_slice, :]
else:
lon_slice = slice(lonidx[i] - margin, lonidx[i] + margin)
subset_topo = nptopo[lat_slice, lon_slice]
lon_subset = topolon[lon_slice]
else:
subset_topo = nptopo
lon_subset = topolon
# Distance calculation for each point.
coord = np.meshgrid(lon_subset, lat_subset)
distances = haversine(coord[0], coord[1], lon, lat)
values_within_radius = subset_topo[distances <= radius]
min_value = np.nanmin(values_within_radius)
relative_altitudes[i] = altitude - max(min_value, 0)
return relative_altitudes