Source code for pyaro.timeseries.Data

import abc
from enum import IntEnum, unique
import numpy as np


[docs] @unique class Flag(IntEnum): """Flag of measurement data. :param IntEnum: all flags are simple integers """ VALID = 0 INVALID = 1 BELOW_THRESHOLD = 2
[docs] class Data(abc.ABC): """Baseclass for data returned from a pyaro.timeseries.Reader. This is the minimum set of columns required for a reader to return. A reader is welcome to return a self-implemented subclass of Data. """
[docs] @abc.abstractmethod def keys(self): """all available data-fields, excluding variable and units which are considered metadata""" raise NotImplementedError
[docs] @abc.abstractmethod def slice(self, index): # -> Self: for 3.11 """Get a copy of this dataset as a slice. :param index: A boolean index of the size of data or integer. array :return: a new Data object """ raise NotImplementedError
def __getitem__(self, key): return self.slice(key) @abc.abstractmethod def __len__(self) -> int: raise NotImplementedError @property @abc.abstractmethod def variable(self) -> str: """Variable name for all the data :return: variable name """ raise NotImplementedError @property @abc.abstractmethod def units(self) -> str: """Units in CF-notation, the same unit applies to all values :return: Units in CF-notation """ raise NotImplementedError @property @abc.abstractmethod def values(self) -> np.ndarray: """A 1-dimensional float array of values. :return: 1dim array of floats """ raise NotImplementedError @property @abc.abstractmethod def stations(self) -> np.ndarray: """A 1-dimensional array of station identifiers (strings, usually name) :return: 1dim array of strings, max-length 64-chars """ raise NotImplementedError @property @abc.abstractmethod def latitudes(self) -> np.ndarray: """A 1-dimensional array of latitudes (float) :return: 1dim array of floats """ raise NotImplementedError @property @abc.abstractmethod def longitudes(self) -> np.ndarray: """A 1-dimensional array of longitudes (float) :return: 1dim array of floats """ raise NotImplementedError @property @abc.abstractmethod def altitudes(self) -> np.ndarray: """A 1-dimensional array of altitudes (float) :return: 1dim array of floats """ raise NotImplementedError @property @abc.abstractmethod def start_times(self) -> np.ndarray: """A 1-dimensional array of int64 datetimes indicating the start of the measurement :return: 1dim array of datetime64 """ raise NotImplementedError @property @abc.abstractmethod def end_times(self) -> np.ndarray: """A 1-dimensional array of int64 datetimes indicating the end of the measurement :return: 1dim array of datetime64 """ raise NotImplementedError @property @abc.abstractmethod def flags(self) -> np.ndarray: """A 1-dimensional array of flags as defined in pyaro :return: 1dim array of ints """ raise NotImplementedError @property @abc.abstractmethod def standard_deviations(self) -> np.ndarray: """A 1-dimensional array of stdevs. NaNs describe not available stdev per measurement :return: 1dim array of floats """ raise NotImplementedError
class DynamicRecArrayException(Exception): pass class DynamicRecArray: def __init__(self, dtype): self.dtype = np.dtype(dtype) self.length = 0 self.capacity = 10 self._data = np.empty(self.capacity, dtype=self.dtype) def __len__(self): return self.length def keys(self): """all available data-fields, excluding variable and units which are considered metadata""" return self._data.dtype.names def append(self, rec): if self.length == self.capacity: self.capacity += 10 + (self.capacity >> 3) # 20 + 1.125self.capacity self._data = np.resize(self._data, self.capacity) self._data[self.length] = rec self.length += 1 def append_array(self, **kwargs): for key in self.keys(): if not key in kwargs: raise DynamicRecArrayException(f"missing key {key} in arguments") if kwargs[key].shape[0] != kwargs["values"].shape[0]: raise DynamicRecArrayException( f"array {key} size ({kwargs['values'].shape[0]}) != values size ({kwargs['values'].shape[0]})" ) add_len = kwargs["values"].shape[0] if add_len > 0: last_pos = len(self) data = np.resize(self.data, last_pos + add_len) for key in self.keys(): data[key][last_pos:] = kwargs[key] self.set_data(data) def set_data(self, data): self.length = len(data) self.capacity = len(data) self._data = data @property def data(self): if self.capacity != self.length: self._data = self._data[:][: self.length] self.capacity = len(self._data) return self._data
[docs] class NpStructuredData(Data): """An implementation of Data using numpy Structured Arrays. This is the minimum set of columns required for a reader to return. A reader is welcome to return a self-implemented subclass of Data. Data can be added by rows with the append method, or a completed numpy.StructuredArray can be submitted using set_data. """ _dtype = [ ("values", "f"), ("stations", "U64"), ("latitudes", "f"), ("longitudes", "f"), ("altitudes", "f"), ("start_times", "datetime64[s]"), ("end_times", "datetime64[s]"), ("flags", "i2"), ("standard_deviations", "f"), ] def __init__(self, variable: str = "", units: str = "") -> None: self._variable = variable self._units = units self._data = DynamicRecArray(self._dtype) def __len__(self) -> int: """Number of data-points""" return len(self._data) def __getitem__(self, key): """access the data as a dict""" return self._data.data[key]
[docs] def keys(self): """all available data-fields, excluding variable and units which are considered metadata""" return self._data.keys()
[docs] def append( self, value, station, latitude, longitude, altitude, start_time, end_time, flag=Flag.VALID, standard_deviation=np.nan, ): """append with a new data-row, or numpy arrays :param value :param station :param latitude :param longitude :param altitude :param start_time :param end_time :param flag: defaults to Flag.VALID :param standard_deviation: defaults to np.nan """ if type(value).__module__ == np.__name__: # numpy array handling self._data.append_array( values=value, stations=station, latitudes=latitude, longitudes=longitude, altitudes=altitude, start_times=start_time, end_times=end_time, flags=flag, standard_deviations=standard_deviation, ) return if len(station) > 64: raise Exception(f"station name too long, max 64char: {station}") # x = np.array([(value, station, latitude, longitude, altitude, start_time, end_time, flag, standard_deviation)], # dtype=self._dtype) self._data.append( ( value, station, latitude, longitude, altitude, start_time, end_time, flag, standard_deviation, ) ) return
[docs] def set_data(self, variable: str, units: str, data: np.array): """Initialization code for the data. Only known data-fields will be read from data, i.e. it is not possible to extend TimeseriesData without subclassing. :param variable: variable name :param units: variable units :param data: a numpy structured array with all fields (see append) :raises KeyError: on missing field :raises Exception: if not all data-ndarrays have same size :raises Exception: if not all data-fields are ndarrays """ for key in self.keys(): if not key in data.dtype.names: raise KeyError(f"{key} not in data: {data.dtype}") if not isinstance(data[key], (np.ndarray, np.generic)): raise Exception(f"data[{key}] is not a numpy.ndarray") if len(data[key]) != len(data["values"]): raise Exception(f"values and {key} not of same size") self._variable = variable self._units = units self._data.set_data(data) return
[docs] def slice(self, index): newData = NpStructuredData() newData.set_data(self.variable, self.units, self._data.data[index]) return newData
@property def variable(self) -> str: """Variable name for all the data :return: variable name """ return self._variable @property def units(self) -> str: """Units in CF-notation, the same unit applies to all values :return: Units in CF-notation """ return self._units @property def values(self) -> np.ndarray: """A 1-dimensional float array of values. :return: 1dim array of floats """ return self["values"] @property def stations(self) -> np.ndarray: """A 1-dimensional array of station identifiers (strings, usually name) :return: 1dim array of strings, max-length 64-chars """ return self["stations"] @property def latitudes(self) -> np.ndarray: """A 1-dimensional array of latitudes (float) :return: 1dim array of floats """ return self["latitudes"] @property def longitudes(self) -> np.ndarray: """A 1-dimensional array of longitudes (float) :return: 1dim array of floats """ return self["longitudes"] @property def altitudes(self) -> np.ndarray: """A 1-dimensional array of altitudes (float) :return: 1dim array of floats """ return self["altitudes"] @property def start_times(self) -> np.ndarray: """A 1-dimensional array of int64 datetimes indicating the start of the measurement :return: 1dim array of datetime64 """ return self["start_times"] @property def end_times(self) -> np.ndarray: """A 1-dimensional array of int64 datetimes indicating the end of the measurement :return: 1dim array of datetime64 """ return self["end_times"] @property def flags(self) -> np.ndarray: """A 1-dimensional array of flags as defined in pyaro :return: 1dim array of ints """ return self["flags"] @property def standard_deviations(self) -> np.ndarray: """A 1-dimensional array of stdevs. NaNs describe not available stdev per measurement :return: 1dim array of floats """ return self["standard_deviations"] def __str__(self): return f"{self.variable}, {self.units}, {self._data.data}"
if __name__ == "__main__": # code for micro-benchmarking import timeit def append_data(): da = NpStructuredData("var", "km") for i in range(100000): value = 0.3 station = "123" lat = 3.2 lon = 4.3 alt = 100 start = np.datetime64("1997-01-01 00:00:00") end = np.datetime64("1997-01-01 00:00:00") da.append(value, station, lat, lon, alt, start, end, Flag.VALID, np.nan) number = 3 print(timeit.timeit("append_data()", globals=globals(), number=number) / number)