"""Utilities module
"""
import glob
import os
import shutil
import gdown
import numpy as np
import pandas as pd
import pooch
import xarray as xr
[docs]
def sample_data(key: str):
if key == "wc_6beam":
file_list = pooch.retrieve(
url="doi:10.5281/zenodo.7312960/wc_6beam.zip",
known_hash="md5:a7ea3c10a6d2f4a97ff955dc4398f930",
path="~/.cache/lidarwind",
processor=pooch.Unzip(),
)
elif key == "wc_long_dbs":
file_list = pooch.retrieve(
url="doi:10.5281/zenodo.7312960/wc_long_dbs.zip",
known_hash="md5:53b4eb6e5dad6dfdaddfbb718dcf8910",
path="~/.cache/lidarwind",
processor=pooch.Unzip(),
)
elif key == "wc_short_dbs":
file_list = pooch.retrieve(
url="doi:10.5281/zenodo.7312960/wc_short_dbs.zip",
known_hash="md5:9cbd93f89052d6c6f4407bcce415e277",
path="~/.cache/lidarwind",
processor=pooch.Unzip(),
)
else:
raise ValueError
return file_list
[docs]
class Util:
"""
This class contains useful tools
"""
[docs]
def get_time_bins(sel_day, freq="10min"):
"""Bins estimation
Creating time bins for a given day and time resolution
"""
start = sel_day.strftime("%Y%m%d")
start_time = pd.to_datetime(f"{start} 00:00:00")
end = (sel_day + pd.to_timedelta(1, "D")).strftime("%Y%m%d")
end_time = pd.to_datetime(f"{end} 00:00:00")
time_bins = pd.date_range(start_time, end_time, freq=freq)
return time_bins
[docs]
def get_sample_data(sample_path, file_type):
"""Downloading data
It downloads the sample needed for the examples.
"""
if file_type == "12-00":
file_id = "1i6iX6KuZOkP_WLuPZHG5uCcvRjlWS-SU"
url = f"https://drive.google.com/uc?export=download&id={file_id}"
if file_type == "dbs":
url = "path"
output = f"{sample_path}{file_type}.zip"
gdown.download(url, output, quiet=False)
print(f"Extracting: {output}")
shutil.unpack_archive(output, sample_path)
os.remove(output)
[docs]
def data_filenames():
"""Sample file list
It searches for the sample files.
If the files do not exist, it downloads them.
"""
home = os.path.expanduser("~")
sample_path = f"{home}/.lidarwindrc/sample_data/"
file_type = "12-00" # change to 6 beam in the future
if os.path.isdir(sample_path):
if os.path.isdir(f"{sample_path}{file_type}/"):
file_list = sorted(glob.glob(f"{sample_path}{file_type}/*.nc"))
if bool(file_list) is False:
Util.get_sample_data(sample_path, file_type)
file_list = sorted(
glob.glob(f"{sample_path}{file_type}/*.nc")
)
else:
Util.get_sample_data(sample_path, file_type)
file_list = sorted(glob.glob(f"{sample_path}{file_type}/*.nc"))
else:
os.makedirs(sample_path)
Util.get_sample_data(sample_path, file_type)
file_list = sorted(glob.glob(f"{sample_path}{file_type}/*.nc"))
return file_list
[docs]
class CloudMask:
"""
This class generates the time-height cloud mask
and the temporal cloud mask using observations
from lidar and ceilometer.
"""
def __init__(self, wc_data=None, ceilo_data=None, radar_data=None):
self.ceilo_data = ceilo_data
self.radar_data = radar_data
self.wc_data = wc_data
self.call_methods()
[docs]
def call_methods(self):
if self.ceilo_data is None or self.radar_data is None:
self.get_time_mask(mask_type="aux")
else:
self.clean_ceilo()
self.clean_radar()
self.get_cloud_mask_2d()
self.get_time_mask(mask_type="real")
[docs]
def clean_ceilo(self):
positive_beta = self.ceilo_data.beta_raw.where(
self.ceilo_data.beta_raw > 0
)
positive_beta = positive_beta.rolling(
time=20, center=True, min_periods=13
).mean()
positive_beta = positive_beta.rolling(
range=10, center=True, min_periods=8
).mean()
# grid interpolation: to lidar time
self.clean_ceilo_data = positive_beta.interp(
{"time": self.wc_data.time}
)
[docs]
def clean_radar(self):
positive_ze = self.radar_data.radar_equivalent_reflectivity.where(
self.radar_data.radar_equivalent_reflectivity > 0
)
# grid interpolation: to lidar time, to ceilo range
self.clean_radar_data = positive_ze.interp(
{"time": self.wc_data.time, "range": self.clean_ceilo_data.range}
)
[docs]
def get_cloud_mask_2d(self):
# CEILOMETER mask
self.clean_ceilo_data.values[
np.isfinite(self.clean_ceilo_data.values)
] = 2
self.clean_ceilo_data.values[
~np.isfinite(self.clean_ceilo_data.values)
] = 0
# RADAR mask
self.clean_radar_data.values[
np.isfinite(self.clean_radar_data.values)
] = 1
self.clean_radar_data.values[
~np.isfinite(self.clean_radar_data.values)
] = 0
# final mask
self.cloud_mask = self.clean_ceilo_data + self.clean_radar_data
[docs]
def get_time_mask(self, mask_type=None):
if mask_type == "aux":
print("aux mask")
aux_cloud_mask = xr.DataArray(
np.ones(len(self.wc_data.time)),
dims="time",
coords={"time": self.wc_data.time.values},
)
self.time_cloud_mask = aux_cloud_mask
elif mask_type == "real":
print("real mask")
# 6500 is the value I defined as maximum range
high_cloud_layer = self.cloud_mask.where(
self.cloud_mask.range > 6500
)
time_cloud_mask = high_cloud_layer.sum(dim="range")
# 1 indicates that there is a cloud above
# the maximum range
time_cloud_mask.values[time_cloud_mask.values > 0] = 1
self.time_cloud_mask = time_cloud_mask
else:
print("mask_type not defined")