"""Module to manage and prepare the data for other modules
"""
import datetime as dt
import logging
import numpy as np
import pandas as pd
import xarray as xr
from .filters import Filtering
from .lidar_code import GetLidarData
module_logger = logging.getLogger("lidarwind.data_operator")
module_logger.debug("loading data_operator")
[docs]
def wc_fixed_preprocessing(ds: xr.Dataset, azimuth_resolution: int = 1):
ds["azimuth"] = ds["azimuth"].round(azimuth_resolution)
# Avoid ambiguity on 360 degrees
ds["azimuth"] = ds["azimuth"].where(ds.azimuth != 360, 0)
assert "elevation" in ds
assert ds["elevation"].dims == ("time",)
assert ds.dims["time"] == 1
ds["elevation"] = ds["elevation"].squeeze()
return ds.expand_dims("elevation").set_coords("elevation")
[docs]
class DataOperations:
"""Basic data manager
It performs some basic operations. For example: rounds
the values from the azimuth coordinate and separates
the vertical observations from the slanted observations.
It is helpful first use this class to process all data
and then save the data as NetCDF files to speed up the
derivation of wind properties later.
Examples
--------
>>> merged_ds = lidarwind.DataOperations(file_list).merged_data
>>> merged_ds.to_netcdf(output_file_path)
Parameters
----------
data_paths : list
List of paths of the original WindCube's output.
Returns
-------
object : object
it returns an object containing an instance of the
original files merged (.merged_data)
"""
[docs]
def __init__(self, data_paths, verbose=False):
self.logger = logging.getLogger(
"lidarwind.data_operator.DataOperations"
)
self.logger.info("creating an instance of DataOperations")
if bool(data_paths) is False:
self.logger.error(
"lidarwind stopped due to an empty list of files."
)
raise FileNotFoundError
self.verbose = verbose
self.data_paths = data_paths
self.tmp90 = xr.Dataset()
self.tmp_non_90 = xr.Dataset()
self.elevation_filter()
self.rename_var_90()
self.get_merge_data()
[docs]
def elevation_filter(self):
"""
It groups the data from the vertical and slanted observations
and rounds the azimuth coordinate
"""
self.logger.info("coverting azimuth: from 360 to 0 degrees")
for file_path in self.data_paths:
try:
tmp_file = GetLidarData(file_path).open_lidar_file()
self.logger.debug(f"reading file: {file_path}")
except:
self.logger.warning(f"This file has a problem: {file_path}")
try:
elevation = tmp_file.elevation.round(1)
tmp_file["elevation"] = elevation
tmp_file["azimuth"] = tmp_file.azimuth.round(1)
tmp_file["azimuth"][tmp_file.azimuth == 360] = 0
except:
self.logger.info(f"Problems reading elv and axm: {file_path}")
try:
self.tmp90 = xr.merge(
[self.tmp90, tmp_file.where(elevation == 90, drop=True)]
)
except:
self.logger.info(
f"This file does not have 90 elv: {file_path}"
)
try:
self.tmp_non_90 = xr.merge(
[
self.tmp_non_90,
tmp_file.where(elevation != 90, drop=True),
]
)
except:
self.logger.info(f"This file only has 90 elv: {file_path}")
return self
[docs]
def rename_var_90(self):
"""
It renames the vertical coordinate
"""
self.logger.info(
"renaming range coordinate from vertical measurements"
)
self.tmp90 = self.tmp90.rename_dims({"range": "range90"})
for var in self.tmp90.variables:
if "range90" in self.tmp90[var].dims:
self.tmp90 = self.tmp90.rename({var: f"{var}90"})
return self
[docs]
def get_merge_data(self):
"""
It merges all readable data
"""
self.logger.info("merging vertical and non-vertical measurements")
self.merged_data = xr.merge([self.tmp90, self.tmp_non_90])
return self
[docs]
class ReadProcessedData:
"""Pre-processed data reader
It reads all data pre-processed by data_operator.DbsOperations
and merges them.
Examples
--------
>>> merged_data = lidarwind.ReadProcessedData(file_list).merge_data()
Parameters
----------
file_list : list
list of pre-processed NetCDF files
Returns
-------
object : object
an instance of all pre-processed data
"""
[docs]
def __init__(self, file_list):
self.logger = logging.getLogger(
"lidarwind.data_operator.ReadProcessedData"
)
self.logger.info("creating an instance of ReadProcessedData")
if bool(file_list) is False:
self.logger.error(
"lidarwind stopped due to an empty list of files."
)
raise FileNotFoundError
self.file_list = file_list
[docs]
def merge_data(self):
"""
It merges all data from the file_list. It can choose between
two different methods. One uses xr.open_mfdataset and the other
uses xr.open_dataset.
"""
# open_msfdataset was massing up the dimensions
# from radial observations.
self.logger.info("merging pre-processed data")
try:
tmp_merged = self.merge_data_method_1()
except:
print("switching from xr.open_mfdataset to xr.open_dataset")
tmp_merged = self.merge_data_method_2()
return tmp_merged
[docs]
def merge_data_method_1(self):
"""
It merges data using xr.open_mfdatset
"""
self.logger.info("mergin files using xr.open_mfdataset")
tmp_merged = xr.open_mfdataset(self.file_list, parallel=True)
return tmp_merged
[docs]
def merge_data_method_2(self):
"""
It merges data using xr.open_dataset
"""
self.logger.info("mergin files using xr.open_dataset")
tmp_merged = xr.Dataset()
for file_name in sorted(self.file_list):
try:
self.logger.info(f"opening {file_name}")
tmp_merged = xr.merge([tmp_merged, xr.open_dataset(file_name)])
except:
self.logger.info(f"problems with: {file_name}")
pass
return tmp_merged
[docs]
class GetRestructuredData:
"""Data re-structurer
It prepares the data structure for using the wind retrieval
modules.
Examples
--------
>>> wind_prop = lidarwind.GetRestructuredData(merged_data)
Parameters
----------
data : xr.Dataset
a xr.Dataset of pre-processed data
snr : bool, int, optional
if an interger is given it is used to
as threshold to filter the data based on
the signal to noise ratio
status : bool, optional
if true it filters the data using the status
variable generated by the WindCube's software
sProf : int, optional
number of profiles used to calculate the anomaly for
partially filter the second trip echoes
(IT GOES TO FILTER MODULE)
center : bool, optional
(IT GOES TO FILTER MODULE)
min_periods : int, optional
(IT GOES TO FILTER MODULE)
n_std : int, optional
size of the standard deviation window used
to partially remove the second trip echoes
(IT GOES TO FILTER MODULE)
check90 : bool, optional
If True, it checks if the vertical observations
are available. If False, the verification is ignored.
Returns
-------
object : object
an instance of the prepared for the retrieval
"""
[docs]
def __init__(
self,
data: xr.Dataset,
snr=False,
status=True,
n_prof=500,
center=True,
min_periods=30,
n_std=2,
check90=True,
):
self.logger = logging.getLogger(
"lidarwind.data_operator.GetRestructuredData"
)
self.logger.info("creating an instance of GetRestructuredData")
if not isinstance(data, xr.Dataset):
self.logger.error("wrong data type: expecting a xr.Dataset")
raise TypeError
self.data = data
self.snr = snr
self.status = status
self.n_prof = n_prof
self.center = center
self.min_periods = min_periods
self.n_std = n_std
self.vertical_component_check(check90)
self.get_coord_non_90()
self.data_transform()
self.data_transform_90()
[docs]
def get_coord_non_90(self):
"""
It identifies and selects the slanted data
"""
self.logger.info("identifying and selecting the slanted observations")
self.elv_non_90 = np.unique(
self.data.elevation.where(self.data.elevation != 90, drop=True)
)
self.azm_non_90 = np.unique(
self.data.azimuth.where(self.data.elevation != 90, drop=True)
)
self.azm_non_90 = np.sort(self.azm_non_90)
self.time_non_90 = self.data.time.where(
self.data.elevation != 90, drop=True
)
self.range_non_90 = self.data.range
return self
[docs]
def vertical_component_check(self, check90):
if check90:
if not hasattr(self.data, "radial_wind_speed90"):
raise KeyError
else:
print("Vertical component check was ignored.")
[docs]
class GetResampledData:
"""Alternative basic data resample
This class is used to resample the data
into a given temporal grid.
It mainly used internal processings of
the package.
Parameters
-----------
xr_data_array : xr.DataArray
varaiable that will be resampled
vert_coord : str
name of the vertical coordinate
time_freq : str
size of the window e.g.: '15s'
tolerance : int
maximum separation from the reference
time_coord : str
name of the time coordinate
Returns
-------
data : xr.DataArray
time resampled variable
"""
def __init__(
self,
xr_data_array: xr.DataArray,
vert_coord="range",
time_freq="15s",
tolerance=10,
time_coord="time",
):
self.logger = logging.getLogger(
"lidarwind.data_operator.GetResampledData"
)
self.logger.info("creating an instance of GetResampledData")
if not isinstance(xr_data_array, xr.DataArray):
self.logger.error("wrong data type: expecting a xr.DataArray")
raise TypeError
self.var_name = xr_data_array.name
self.attrs = xr_data_array.attrs
data = xr_data_array
date = pd.to_datetime(data[time_coord].values[0])
self.time_ref = self.get_time_ref(date, time_freq)
self.vert_coord = data[vert_coord]
time_ref_sec = np.array(self.time_ref, float) * 10 ** (-9)
time_orig_sec = np.array(data[time_coord].values, float) * 10 ** (-9)
delta_grid = self.calc_delt_grid(time_ref_sec, time_orig_sec)
time_index_array = self.get_nearest_index_method_2(
delta_grid, tolerance
)
self.values = self.time_resample(
data, time_index_array, self.vert_coord
)
self.resampled = self.convert_to_data_array()
[docs]
def get_time_ref(self, date, time_freq="1s"):
"""
Genetates the time reference grid used for
resampling the data
Parameters
----------
date : Timestamp
date for resampling (pandas Timestamp)
dateFreq: str
resolution of the reference grid (str, default=1s)
Returns
-------
time_ref : DatetimeIndex
time reference grid (DatetimeIndex)
"""
self.logger.info("defining the reference time")
start = dt.datetime(date.year, date.month, date.day, 0, 0, 0)
end = dt.datetime(date.year, date.month, date.day, 23, 59, 59)
time_ref = pd.date_range(start, end, freq=time_freq)
return time_ref
[docs]
def calc_delt_grid(self, ref_grid, orig_grid):
"""
Calculates the distance between the reference grid
and the radar grid (time or range)
Parameters
----------
ref_grid : numpy.array
reference grid (array[n])
radarGrid : numpy.array
radar grid (array[m])
Returns
-------
delta_grid : numpy.array
distance between each element from
the reference grid to each element from the
radar grid
"""
self.logger.info("calculating the distance to the reference")
tmp_grid_2_d = np.ones((len(ref_grid), len(orig_grid))) * orig_grid
delta_grid = tmp_grid_2_d - np.reshape(ref_grid, (len(ref_grid), 1))
return delta_grid
[docs]
def get_nearest_index_method_2(self, delta_grid, tolerance):
"""
Identify the index of the delta_grid that fulfil
the resampling tolerance
Parameters
----------
delta_grid : numpy.array
output from calcRadarDeltaGrid
tolerance : int
tolerance distance for detecting
the closest neighbour (time or range)
Returns
-------
grid_index : np.array
array of indexes that fulfil the resampling
tolerance
"""
self.logger.info("identifying index that fulfil the tolerance")
grid_index = np.argmin(abs(delta_grid), axis=1)
delta_grid_min = np.min(abs(delta_grid), axis=1)
grid_index = np.array(grid_index, float)
grid_index[delta_grid_min > tolerance] = np.nan
return grid_index
[docs]
def time_resample(self, data, time_index_array, vert_coord):
"""
It resamples a given radar variable using the
time and range index calculated by get_nearest_index_method_2
Parameters
----------
var : str
radar variable name to be resampled
xrDataset : xarray.dataset
xarray dataset containing the variables to
be resampled
timeIdexArray : np.array
time resampling index (output from get_nearest_index_method_2)
rangeIndexArray : np.array
range resampling index (output from get_nearest_index_method_2)
Returns
-------
resampledArr : xarray.dataArray
time/range resampled numpy array
"""
self.logger.info(f"time resampling of: {self.var_name}")
resampled_time_arr = (
np.ones((time_index_array.shape[0], self.vert_coord.shape[0]))
* np.nan
)
for position, time_index in enumerate(time_index_array):
try:
resampled_time_arr[position] = data.values[int(time_index)]
except:
pass
return resampled_time_arr
[docs]
def convert_to_data_array(self):
"""
It creates a DataArray of the resampled data.
"""
self.logger.info(
f"generating the new resampled DataArray: {self.var_name}"
)
tmp_dt = xr.DataArray(
self.values,
dims=("time_ref", self.vert_coord.name),
coords={
"time_ref": self.time_ref,
self.vert_coord.name: self.vert_coord.values,
},
name=self.var_name,
attrs=self.attrs,
)
tmp_dt[self.vert_coord.name].attrs = self.vert_coord.attrs
return tmp_dt
[docs]
class DbsOperations:
"""DBS file manager
This class extracts the variables required to
retrieve the wind information from the DBS files.
Parameters
----------
file_list : list
list of DBS files
var_list : list
list of variables to be extracted from the DBS files
Returns
-------
object : object
it returns an object containing an instance of the
merged files (.merged_ds)
"""
[docs]
def __init__(self, file_list, var_list):
self.logger = logging.getLogger(
"lidarwind.data_operator.DbsOperations"
)
self.logger.info("creating an instance of DbsOperations")
self.merged_ds = xr.Dataset()
self.file_list = file_list
self.var_list = var_list
self.merge_data(file_list, var_list)
[docs]
def merge_data(self, file_list, var_list):
"""
This method merges all files from a list of DBS files
Parameters
----------
file_list : list
list of files to be merged
var_list : list
list of variables to be merged
"""
self.logger.info("merging all DBS files")
if bool(file_list) is False:
self.logger.error(
"lidarwind stopped due to an empty list of DBS files."
)
raise FileNotFoundError
if bool(var_list) is False:
self.logger.error(
"lidarwind stopped due to an empty list of variable"
)
raise KeyError
for file in file_list:
try:
file_to_merge = GetLidarData(file).open_lidar_file()
self.logger.debug(f"reading file: {file}")
except:
self.logger.warning(f"This file has a problem: {file}")
raise
file_to_merge = self.mean_time_derivation(file_to_merge)
# file_to_merge = self.add_mean_time(file_to_merge)
try:
self.merge_2_ds(file_to_merge, var_list)
except:
self.logger.warning(f"Merging not possible: {file}")
# raise
[docs]
def add_mean_time(self, lidar_ds):
"""
This method adds the mean time to each file from
the DBS scan strategy.
Parameters
----------
lidar_ds : xarray.DataSet
a dataset from a sequence of scans
"""
self.logger.info("calculating the mean DBS time for each file")
mean_time_ns = np.array(lidar_ds.time.values, np.float64).mean()
mean_time = pd.to_datetime(
np.ones(len(lidar_ds.time.values)) * mean_time_ns
)
mean_time_da = xr.DataArray(
data=mean_time,
dims=("time"),
coords={"time": lidar_ds.time},
name="scan_mean_time",
)
lidar_ds = lidar_ds.merge(mean_time_da)
return lidar_ds
[docs]
def merge_2_ds(self, file_to_merge, var_list):
"""
This method merges the variables extracted from
the single DBS file with the storage dataset (merged_ds).
Parameters
----------
file_to_merge : xarray.DataSet
a single file dataset
var_list : list
a list of variables to be merged
"""
self.logger.info("merging single DBS file")
for var in var_list:
self.merged_ds = xr.merge([self.merged_ds, file_to_merge[var]])
self.merged_ds = xr.merge(
[self.merged_ds, file_to_merge["scan_mean_time"]]
)
[docs]
def mean_time_derivation(self, data):
data.azimuth.values = np.round(data.azimuth.values)
data.azimuth.values[data.azimuth.values == 360] = 0
data.elevation.values = np.round(data.elevation.values, 1)
azm_ref = data.azimuth.values[0]
index_new_scan = data.ray_index.where(
(data.elevation != 90) & (data.azimuth == azm_ref)
)
index_complete_scans = index_new_scan.values[
np.isfinite(index_new_scan.values)
]
if not len(data.time) in index_complete_scans:
index_complete_scans = np.append(
index_complete_scans, len(data.time)
)
groups = data.groupby_bins(
"ray_index", index_complete_scans, right=False
)
new_data = xr.Dataset()
for grp in groups.groups:
new_data = xr.merge([new_data, self.add_mean_time(groups[grp])])
return new_data