import time
import numpy as np
import xarray as xr
from dask.diagnostics import ProgressBar
from ..aqd import aqdutils
from ..core import utils
# import os
[docs]def cdf_to_nc(cdf_filename, atmpres=False):
"""
Load a raw .cdf file and generate a processed .nc file
"""
print(f"Loading {cdf_filename}")
# print(os.listdir())
start_time = time.time()
ds = xr.open_dataset(cdf_filename, chunks={"time": 200000, "bindist": 48})
# check for user specified chunks
if "chunks" in ds.attrs:
chunksizes = dict(zip(ds.attrs["chunks"][::2], ds.attrs["chunks"][1::2]))
ds.close()
# make sure values are type int, needed because they are written out to global attribute in -raw.cdf file they are str
for key in chunksizes:
if isinstance(chunksizes[key], str):
chunksizes[key] = int(chunksizes[key])
print(f"Using user specified chunksizes = {chunksizes}")
ds = xr.open_dataset(cdf_filename, chunks=chunksizes)
end_time = time.time()
print(f"Finished loading {cdf_filename} in {end_time-start_time:.1f} seconds")
ds = aqdutils.check_attrs(ds, inst_type="SIG")
# Add atmospheric pressure offset
if atmpres is not False:
ds = aqdutils.atmos_correct(ds, atmpres)
ds = utils.create_nominal_instrument_depth(ds)
# print(ds)
# create Z depending on orientation
# ds, T, T_orig = aqdutils.set_orientation(ds, ds["Burst_Beam2xyz"].values)
ds = utils.create_z(ds)
# Clip data to in/out water times or via good_ens
# Need to clip data after coord transform when using dolfyn
ds = utils.clip_ds(ds)
if ds.attrs["data_type"] == "Burst" or ds.attrs["data_type"] == "BurstHR":
# Create separate vel variables first
ds["U"] = ds["VelEast"]
ds["V"] = ds["VelNorth"]
ds["W1"] = ds["VelUp1"]
ds["W2"] = ds["VelUp2"]
ds = aqdutils.magvar_correct(ds)
ds = aqdutils.trim_vel(ds, data_vars=["U", "V", "W1", "W2"])
# make beam shape data for beam data (vel,amp & cor)
ds["beam"] = xr.DataArray(range(1, ds["NBeams"][0].values + 1), dims="beam")
if "VelBeam1" in ds:
ds["vel"] = xr.concat(
[ds[f"VelBeam{i}"] for i in range(1, ds["NBeams"][0].values + 1)],
dim="beam",
)
if "CorBeam1" in ds:
ds["corr"] = xr.concat(
[ds[f"CorBeam{i}"] for i in range(1, ds["NBeams"][0].values + 1)],
dim="beam",
)
if "AmpBeam1" in ds:
ds["amp"] = xr.concat(
[ds[f"AmpBeam{i}"] for i in range(1, ds["NBeams"][0].values + 1)],
dim="beam",
)
ds = aqdutils.make_bin_depth(ds)
ds = ds_make_magaccel_vars(ds)
ds = ds_make_ahrs_vars(ds)
# Rename DataArrays for EPIC compliance
ds = fix_encoding(ds)
ds = aqdutils.ds_rename(ds) # for common variables
ds = ds_drop(ds)
ds = ds_rename_sig(ds) # for signature vars not in aqds or vecs
# swap vert dim to z or user specified in vert_dim
ds = aqdutils.ds_swap_dims(ds)
ds = utils.ds_add_lat_lon(ds)
# Add EPIC and CMG attributes
ds = aqdutils.ds_add_attrs(ds, inst_type="SIG") # for common adcp vars
ds = ds_add_attrs_sig(ds) # for signature vars
# Add min/max values
# if ds.attrs["data_type"] not in ["Echo1"]:
print("add max/min variable attributes")
ds = utils.add_min_max(ds)
# Add DELTA_T for EPIC compliance
# ds = utils.add_delta_t(ds)
# Add start_time and stop_time attrs
ds = utils.add_start_stop_time(ds)
# Add history showing file used
ds = utils.add_history(ds)
ds = utils.add_standard_names(ds) # add common standard names
# ds = drop_unused_dims(ds)
# ds = fix_encoding(ds)
ds = utils.ds_add_lat_lon(ds)
ds = utils.ds_coord_no_fillvalue(ds)
"""
ds["time"].attrs.update(
{"standard_name": "time", "axis": "T", "long_name": "time (UTC)"}
)
"""
# write out nc file by data_type
if "prefix" in ds.attrs:
nc_filename = ds.attrs["prefix"] + ds.attrs["filename"]
else:
nc_filename = ds.attrs["filename"]
if ds.attrs["data_type"] == "Burst" or ds.attrs["data_type"] == "BurstHR":
nc_out = nc_filename + "b-cal.nc"
print("writing Burst (b) data to netCDF nc file")
delayed_obj = ds.to_netcdf(nc_out, compute=False)
with ProgressBar():
delayed_obj.compute()
print("Done writing netCDF file", nc_out)
elif ds.attrs["data_type"] == "IBurst" or ds.attrs["data_type"] == "IBurstHR":
nc_out = nc_filename + "b5-cal.nc"
print("writing IBurst (b5) data to netCDF nc file")
delayed_obj = ds.to_netcdf(nc_out, compute=False)
with ProgressBar():
delayed_obj.compute()
print("Done writing netCDF file", nc_out)
elif ds.attrs["data_type"] == "Echo1":
nc_out = nc_filename + "e1-cal.nc"
print("writing Echo1 (echo1) data to netCDF nc file")
delayed_obj = ds.to_netcdf(nc_out, compute=False)
with ProgressBar():
delayed_obj.compute()
print("Done writing netCDF file", nc_out)
utils.check_compliance(nc_out, conventions=ds.attrs["Conventions"])
end_time = time.time()
print(
f"Proceesing cdf2nc for Signature data type {ds.attrs['data_type']} in deployment {ds.attrs['filename']} completed"
)
print(f"elapsed time = {end_time-start_time:.1f} seconds")
return ds
def drop_unused_dims(ds):
"""only keep dims that will be in the final files"""
thedims = []
for v in ds.data_vars:
for x in ds[v].dims:
thedims.append(x)
for x in ds.dims:
if x not in thedims:
ds = ds.drop_vars(x)
return ds
def ds_drop(ds):
"""
Drop old DataArrays from Dataset that won't make it into the final .nc file
"""
todrop = [
"ExtStatus",
"NBeams",
"NCells",
"PressureSensorTemperature",
"RTCTemperature",
"MagnetometerTemperature",
"Magnetometer",
"Accelerometer",
"VEL1",
"VEL2",
"VEL3",
"AMP1",
"AMP2",
"AMP3",
"VelEast",
"VelNorth",
"VelUp1",
"VelUp2",
"VelSpeed",
"VelDirection",
"VelX",
"VelY",
"VelZ1",
"VelZ2",
"Beam2xyz",
"AHRSRotationMatrix",
"AHRSGyroX",
"AHRSGyroY",
"AHRSGyroZ",
"AHRSQuaternionW",
"AHRSQuaternionX",
"AHRSQuaternionY",
"AHRSQuaternionZ",
"VelBeam1",
"VelBeam2",
"VelBeam3",
"VelBeam4",
"AmpBeam1",
"AmpBeam2",
"AmpBeam3",
"AmpBeam4",
"CorBeam1",
"CorBeam2",
"CorBeam3",
"CorBeam4",
"AltimeterStatus",
]
if ("AnalogInput1" in ds.attrs) and (ds.attrs["AnalogInput1"].lower() == "true"):
todrop.remove("AnalogInput1")
if ("AnalogInput2" in ds.attrs) and (ds.attrs["AnalogInput2"].lower() == "true"):
todrop.remove("AnalogInput2")
return ds.drop_vars([t for t in todrop if t in ds.variables])
def ds_rename_sig(ds, waves=False):
"""
Rename DataArrays within Dataset for compliance
"""
varnames = {
"EnsembleCount": "sample",
"AmbiguityVel": "ambig_vel",
"Status": "status",
"Error": "error",
"TransmitEnergy": "xmit_energy",
"U": "u_1205",
"V": "v_1206",
"W1": "w_1204",
"W2": "w2_1204",
"VelSpeed": "CS_300",
"VelDirection": "CD_310",
"VelBeam1": "vel1_1277",
"VelBeam2": "vel1_1278",
"VelBeam3": "vel1_1279",
"VelBeam4": "vel1_1280",
"AmpBeam1": "Sv1_1233",
"AmpBeam2": "Sv2_1234",
"AmpBeam3": "Sv3_1235",
"AmpBeam4": "Sv4_1236",
"CorBeam1": "cor1_1285",
"CorBeam2": "cor2_1286",
"CorBeam3": "cor3_1287",
"CorBeam4": "cor4_1288",
"AltimeterDistanceLE": "brangeLE",
"AltimeterQualityLE": "alt_quality",
"AltimeterDistanceAST": "brangeAST",
"AltimeterQualityAST": "ast_quality",
"AltimeterTimeOffsetAST": "ast_offset_time",
"AltimeterPressure": "ast_pressure",
"NominalCorrelation": "corr_nominal",
"VelBeam5": "vel_b5",
"AmpBeam5": "amp_b5",
"CorBeam5": "corr_b5",
"Echo": "echo_amp",
}
for v in varnames:
if v in ds:
ds = ds.rename({v: varnames[v]})
"""
for v in [
"avgamp1",
"avgamp2",
"avgamp3",
"U",
"V",
"W",
"Depth",
"water_depth",
"cellpos",
"vel", # Signature velocity
]:
if v in ds:
ds = ds.drop_vars(v)
"""
return ds
def fix_encoding(ds):
"""ensure we don't set dtypes uint for CF compliance"""
if "sample" not in ds.dims:
print(
"make time encoding to dtype double because no sample dimension, round to milliseconds first"
)
ds["time"] = ds["time"].dt.round("ms") # round time to milliseconds first
ds["time"].encoding["dtype"] = "double"
"""if ds[var].max() > 2**31 - 1 or ds[var].min() < -(2**31):
print(
f"warning {var} may be too big to fit in int32: min {ds[var].min().values}, max {ds[var].max().values} so make double"
)
ds[var].encoding["dtype"] = "double"
"""
else:
ds["time"].encoding["dtype"] = "i4"
if "beam" in ds.dims:
ds["beam"].encoding["dtype"] = "i4"
for var in ds.data_vars:
if ds[var].dtype == "uint32" or ds[var].dtype == "uint8":
ds[var].encoding["dtype"] = "int32"
if var in ["corr", "corr_b5"]:
ds[var].encoding["dtype"] = "float32"
if ds[var].dtype == "float64":
ds[var].encoding["dtype"] = "float32"
return ds
def ds_add_attrs_sig(ds):
"""
add attributes to xarray Dataset
"""
"""
ds["time"].attrs.update(
{"standard_name": "time", "axis": "T", "long_name": "time (UTC)"}
)
"""
if "earth" in ds:
ds["earth"].attrs.update(
{
"units": "1",
"long_name": "Earth Reference Frame",
}
)
if "inst" in ds:
ds["inst"].attrs.update(
{
"units": "1",
"long_name": "Instrumnet Reference Frame",
}
)
if "beam" in ds:
ds["beam"].attrs.update(
{
"units": "1",
"long_name": "Beam Reference Frame",
}
)
if "q" in ds:
ds["q"].attrs.update(
{
"units": "1",
"long_name": "Quaternion Vector Components",
}
)
if "orientmat" in ds:
ds["orientmat"].attrs.update(
{
"units": "1",
"long_name": "Rotation Matrix from AHRS",
}
)
if "gyro" in ds:
ds["gyro"].attrs.update(
{
"units": "rad s-1",
"long_name": "Angular Velocity from AHRS",
}
)
if "quaternions" in ds:
ds["quaternions"].attrs.update(
{
"units": "1",
"long_name": "Quaternions",
}
)
if "mag" in ds:
ds["mag"].attrs.update(
{
"units": "uT",
"long_name": "Magnetometer Data",
}
)
if "accel" in ds:
ds["accel"].attrs.update(
{
"units": "m s-2",
"long_name": "Vector Acceleration of Instrument",
}
)
if "status" in ds:
ds["status"].attrs.update(
{
"units": "1",
"long_name": "Status Code",
}
)
if "error" in ds:
ds["error"].attrs.update(
{
"units": "1",
"long_name": "Error Code",
}
)
if "vel" in ds:
ds["vel"].attrs.update(
{
"units": "m s-1",
"standard_name": "radial_sea_water_velocity_away_from_instrument",
"long_name": "Beam Velocity",
}
)
if "vel_b5" in ds:
ds["vel_b5"].attrs.update(
{
"units": "m s-1",
"standard_name": "radial_sea_water_velocity_away_instrument",
"long_name": "Beam5 Velocity",
}
)
if "amp" in ds:
ds["amp"].attrs.update(
{
"units": "dB",
"standard_name": "signal_intensity_from_multibeam_acoustic_doppler_velocity_sensor_in_sea_water",
"long_name": "Acoustic Signal Amplitude",
}
)
if "amp_b5" in ds:
ds["amp_b5"].attrs.update(
{
"units": "dB",
"standard_name": "signal_intensity_from_multibeam_acoustic_doppler_velocity_sensor_in_sea_water",
"long_name": "Beam5 Acoustic Signal Amplitude",
}
)
if "echo_amp" in ds:
ds["echo_amp"].attrs.update(
{
"units": "dB",
"standard_name": "signal_intensity_from_multibeam_acoustic_doppler_velocity_sensor_in_sea_water",
"long_name": "Echosounder Signal Amplitude",
}
)
if "corr" in ds:
ds["corr"].attrs.update(
{
"units": "percent",
"standard_name": "beam_consistency_indicator_from_multibeam_acoustic_doppler_velocity_profiler_in_sea_water",
"long_name": "Acoustic Signal Correlation",
}
)
if "corr_b5" in ds:
ds["corr_b5"].attrs.update(
{
"units": "percent",
"standard_name": "beam_consistency_indicator_from_multibeam_acoustic_doppler_velocity_profiler_in_sea_water",
"long_name": "Beam5 Acoustic Signal Correlation",
}
)
if "corr_nominal" in ds:
ds["corr_nominal"].attrs.update(
{
"units": "percent",
"long_name": "Nominal Correlation",
}
)
if "SV_80" in ds:
ds["SV_80"].attrs.update(
{
"units": "m s-1",
"long_name": "Speed of Sound",
}
)
if "ambig_vel" in ds:
ds["ambig_vel"].attrs.update(
{
"units": "m s-1",
"long_name": "Ambiguity velocity",
}
)
if "brange" in ds:
ds["brange"].attrs.update(
{
"units": "m",
"standard_name": "altimeter_range",
"long_name": "Altimeter Range",
}
)
if "alt_quality" in ds:
ds["alt_quality"] = ds["ast_quality"] / 100
ds["alt_quality"].encoding["dtype"] = "float32"
ds["alt_quality"].attrs.update(
{
"units": "dB",
"long_name": "Altimeter Quality",
}
)
if "brangeAST" in ds:
ds["brangeAST"].attrs.update(
{
"units": "m",
# "standard_name": "altimeter_range",
"long_name": "Acoustic Surface Tracking (AST) Range",
}
)
if "ast_quality" in ds:
ds["ast_quality"] = ds["ast_quality"] / 100
ds["ast_quality"].encoding["dtype"] = "float32"
ds["ast_quality"].attrs.update(
{
"units": "dB",
"long_name": "Acoustic Surface Tracking (AST) Quality",
}
)
if "ast_pressure" in ds:
ds["ast_pressure"].attrs.update(
{
"units": "dbar",
"long_name": "Acoustic Surface Tracking (AST) Pressure",
}
)
if "ast_offset_time" in ds:
ds["ast_offset_time"].attrs.update(
{
"units": "s",
"long_name": "Acoustic Surface Tracking (AST) Offset Time",
}
)
if "xmit_energy" in ds:
ds["xmit_energy"].attrs.update(
{
"units": "dB",
"long_name": "Transmit Energy",
}
)
if "vel1_1277" in ds:
ds["vel1_1277"].attrs.update(
{
"units": "m s-1",
"long_name": "Beam 1 Velocity",
"epic_code": 1277,
}
)
if "vel2_1278" in ds:
ds["vel2_1278"].attrs.update(
{
"units": "m s-1",
"long_name": "Beam 2 Velocity",
"epic_code": 1278,
}
)
if "vel3_1279" in ds:
ds["vel3_1279"].attrs.update(
{
"units": "m s-1",
"long_name": "Beam 3 Velocity",
"epic_code": 1279,
}
)
if "vel4_1280" in ds:
ds["vel4_1280"].attrs.update(
{
"units": "m s-1",
"long_name": "Beam 4 Velocity",
"epic_code": 1280,
}
)
if "Sv1_1233" in ds:
ds["Sv1_1233"].attrs.update(
{
"units": "dB",
"long_name": "Backscatter Strength Beam 1",
"epic_code": 1233,
}
)
if "Sv2_1234" in ds:
ds["Sv2_1233"].attrs.update(
{
"units": "dB",
"long_name": "Backscatter Strength Beam 2",
"epic_code": 1234,
}
)
if "Sv3_1235" in ds:
ds["Sv3_1235"].attrs.update(
{
"units": "dB",
"long_name": "Backscatter Strength Beam 3",
"epic_code": 1235,
}
)
if "Sv4_1236" in ds:
ds["Sv1_1236"].attrs.update(
{
"units": "dB",
"long_name": "Backscatter Strength Beam 4",
"epic_code": 1236,
}
)
if "Sv_1232" in ds:
ds["Sv_1232"].attrs.update(
{
"units": "dB",
"long_name": "Mean Backscatter Strength",
"epic_code": 1232,
}
)
if "cor1_1285" in ds:
ds["cor1_1285"].attrs.update(
{
"units": "percent",
"long_name": "Beam 1 Correlation",
"epic_code": 1285,
}
)
if "cor2_1286" in ds:
ds["cor2_1286"].attrs.update(
{
"units": "percent",
"long_name": "Beam 2 Correlation",
"epic_code": 1286,
}
)
if "cor3_1286" in ds:
ds["cor2_1286"].attrs.update(
{
"units": "percent",
"long_name": "Beam 2 Correlation",
"epic_code": 1286,
}
)
return ds
def ds_make_magaccel_vars(ds):
"""
add magnetometer and accelerometer data to xarray Dataset
"""
# make mag and accel vars
if "Magnetometer" in ds:
if "inst" not in ds.dims:
ds["inst"] = xr.DataArray(["X", "Y", "Z"], dims="inst")
ds["mag"] = xr.DataArray(
ds["Magnetometer"].values.reshape(-1, 3).transpose(), dims=["inst", "time"]
)
ds["mag"] = ds["mag"] / 10 # convert milligauss to microtesla
if "Accelerometer" in ds:
if "inst" not in ds.dims:
ds["inst"] = xr.DataArray(["X", "Y", "Z"], dims="inst")
ds["accel"] = xr.DataArray(
ds["Accelerometer"].values.reshape(-1, 3).transpose(), dims=["inst", "time"]
)
ds["accel"] = ds["accel"] * 9.81 # convert gravity (g) to m s-2
return ds
def ds_make_ahrs_vars(ds):
"""
add AHRS vars to xarray Dataset (if they exists)
"""
# if AHRS will have these
# make rotation/orientation matrix if exists in dataset
if "AHRSRotationMatrix" in ds:
if "earth" not in ds.dims:
ds["earth"] = xr.DataArray(["E", "N", "U"], dims="earth")
if "inst" not in ds.dims:
ds["inst"] = xr.DataArray(["X", "Y", "Z"], dims="inst")
ds["orientmat"] = xr.DataArray(
ds["AHRSRotationMatrix"]
.values.reshape(
-1,
3,
3,
)
.transpose((2, 1, 0)),
dims=["earth", "inst", "time"],
)
# make gyro var
if "AHRSGyroX" in ds:
if "inst" not in ds.dims:
ds["inst"] = xr.DataArray(["X", "Y", "Z"], dims="inst")
ds["gyro"] = xr.concat(
[ds[f"AHRSGyro{i}"] for i in ds["inst"].values], dim="inst"
)
ds["gyro"] = ds["gyro"] * np.pi / 180 # convert to rad s-1
if "AHRSQuaternionW" in ds:
if "q" not in ds.dims:
ds["q"] = xr.DataArray(["W", "X", "Y", "Z"], dims="q")
ds["quaternions"] = xr.concat(
[ds[f"AHRSQuaternion{i}"] for i in ds["q"].values], dim="q"
)
return ds