Source code for stglib.rsk.cdf2nc

import numpy as np
import xarray as xr

from ..core import qaqc, utils


[docs]def cdf_to_nc(cdf_filename, atmpres=None, writefile=True, format="NETCDF4"): """ Load raw .cdf file, trim, apply QAQC, and save to .nc """ # Load raw .cdf data ds = open_raw_cdf(cdf_filename) is_profile = ( (ds.attrs["sample_mode"] == "CONTINUOUS") and ("featureType" in ds.attrs) and (ds.attrs["featureType"] == "profile") ) if is_profile: ds = profile_clip_ds(ds) else: # Clip data to in/out water times or via good_ens ds = utils.clip_ds(ds) ds = utils.create_nominal_instrument_depth(ds) if atmpres is not None and is_profile is False: ds = utils.atmos_correct(ds, atmpres) elif atmpres is not None and is_profile: ds = atmos_correct_profile(ds, atmpres) # ds = utils.shift_time(ds, # ds.attrs['burst_interval'] * # ds.attrs['sample_interval'] / 2) ds = ds_add_attrs(ds) # if "P_1" in ds: # ds = ds_add_depth_dim(ds) # add lat/lon coordinates to each variable # no longer want to do this according to the canonical forms on stellwagen # for var in ds.data_vars: # if 'time' not in var: # ds = utils.add_lat_lon(ds, var) ds = qaqc.drop_vars(ds) # trim by minimum pressure for instruments that go out of water_depth for v in ["P_1", "P_1ac"]: ds = trim_min(ds, v) ds = qaqc.trim_bad_ens(ds, v) for v in ["Turb", "C_51", "S_41", "T_28", "SpC_48"]: if v in ds: ds = qaqc.trim_min(ds, v) ds = qaqc.trim_max(ds, v) ds = qaqc.trim_min_diff(ds, v) ds = qaqc.trim_min_diff_pct(ds, v) ds = qaqc.trim_max_diff(ds, v) ds = qaqc.trim_max_diff_pct(ds, v) ds = qaqc.trim_bad_ens(ds, v) if not is_profile: # add z coordinate dim ds = utils.create_z(ds) ds = utils.add_min_max(ds) ds = utils.add_start_stop_time(ds) if not is_profile: ds = utils.ds_add_lat_lon(ds) ds = utils.ds_coord_no_fillvalue(ds) ds = utils.add_history(ds) ds = dw_add_delta_t(ds) # if we are dealing with continuous instruments, drop sample since it is a singleton dimension if "sample" in ds: if len(ds["sample"]) == 1: ds = ds.squeeze(dim="sample") if writefile: # Write to .nc file print("Writing cleaned/trimmed data to .nc file") if "burst" in ds or "sample" in ds: nc_filename = ds.attrs["filename"] + "b-cal.nc" elif is_profile: nc_filename = ds.attrs["filename"] + "prof-cal.nc" elif (ds.attrs["sample_mode"] == "CONTINUOUS") and ( "burst" not in ds or "sample" not in ds ): nc_filename = ds.attrs["filename"] + "cont-cal.nc" else: nc_filename = ds.attrs["filename"] + "-a.nc" if is_profile: ds.to_netcdf(nc_filename, format=format, unlimited_dims=["obs"]) else: ds.to_netcdf(nc_filename, format=format, unlimited_dims=["time"]) utils.check_compliance(nc_filename, conventions=ds.attrs["Conventions"]) print("Done writing netCDF file", nc_filename) if ( "split_profiles" in ds.attrs and ds.attrs["split_profiles"].lower() == "true" ): split_profiles = True else: split_profiles = False if is_profile and split_profiles: do_split_profiles(ds) return ds
def open_raw_cdf(cdf_filename): ds = xr.load_dataset(cdf_filename) # remove units in case we change and we can use larger time steps ds.time.encoding.pop("units") return ds def get_slice(ds, profile): rscs = ds.row_size.cumsum() row_start = ds.row_start.sel(profile=profile).values row_size = ds.row_size.sel(profile=profile).values return slice(row_start, row_start + row_size - 1) def atmos_correct_profile(ds, atmpres): met = xr.load_dataset(atmpres) # need to save attrs before the subtraction, otherwise they are lost attrs = ds["P_1"].attrs # apply the correction for each profile in turn. Is there a better way to do this? ds["P_1ac"] = xr.full_like(ds["P_1"], np.nan) for profile in ds.profile: ds["P_1ac"].loc[dict(obs=get_slice(ds, profile))] = ( ds["P_1"].loc[dict(obs=get_slice(ds, profile))] - met["atmpres"].sel(time=ds["time"].sel(profile=profile)).values - met["atmpres"].offset ) ds["P_1ac"].attrs = attrs ds = utils.insert_history( ds, f"Atmospherically correcting using time-series from {atmpres} and offset of {met['atmpres'].offset}", ) ds.attrs["atmospheric_pressure_correction_file"] = atmpres ds.attrs["atmospheric_pressure_correction_offset_applied"] = met["atmpres"].attrs[ "offset" ] if "comment" in met["atmpres"].attrs: ds.attrs["atmospheric_pressure_correction_comment"] = met["atmpres"].attrs[ "comment" ] return ds def do_split_profiles(ds): max_profile_len = len(str(ds.profile.max().values)) for profile in ds.profile.values: dss = ds.sel(obs=get_slice(ds, profile), profile=profile).copy(deep=True) for v in dss.data_vars: if "obs" not in dss[v].coords: dss[v] = dss[v].expand_dims("profile") # for CF compliance for v in dss.data_vars: allnan = True if "obs" in dss[v].coords: if not np.all(dss[v].isnull()): allnan = False dss = utils.insert_history(dss, f"Processed to individual profile #{profile}") if allnan: print( f"All NaN values encountered for profile {profile}; not writing this cast to netCDF" ) else: nc_filename = f"{dss.attrs['filename']}prof_{str(profile).zfill(max_profile_len)}-cal.nc" # the old unlimited_dims of obs sticks around, so need to specify empty dss.to_netcdf(nc_filename, unlimited_dims=[]) print("Done writing netCDF file", nc_filename) def trim_min(ds, var): if var + "_min" in ds.attrs: print("%s: Trimming using minimum value of %f" % (var, ds.attrs[var + "_min"])) # remove full burst if any of the burst values are less than # the indicated value if "sample" in ds: bads = (ds[var] < ds.attrs[var + "_min"]).any(dim="sample") ds[var][bads, :] = np.nan else: ds[var][ds[var] < ds.attrs[var + "_min"]] = np.nan notetxt = "Values filled where less than %f units. " % ds.attrs[var + "_min"] ds = utils.insert_note(ds, var, notetxt) return ds # def ds_add_depth_dim(ds): # print("Creating depth dimension") # if "P_1ac" in ds: # p = "P_1ac" # else: # p = "P_1" # # if "NAVD88_ref" in ds.attrs: # ds["depth"] = xr.DataArray( # [-ds.attrs["NAVD88_ref"] - ds.attrs["initial_instrument_height"]], # dims="depth", # ) # ds["depth"].attrs["VERT_DATUM"] = "NAVD88" # ds["depth"].attrs["NOTE"] = ( # "Computed as platform depth " # "[m NAVD88] minus " # "initial_instrument_height" # ) # else: # dim = ["time"] # if "sample" in ds: # dim.append("sample") # ds["depth"] = xr.DataArray(np.atleast_1d(ds[p].mean(dim=dim)), dims="depth") # ds["depth"].attrs["NOTE"] = "Computed as mean of the pressure sensor" # ds["depth"].attrs["positive"] = "down" # ds["depth"].attrs["axis"] = "Z" # ds["depth"].attrs["units"] = "m" # ds["depth"].attrs["epic_code"] = 3 # ds["depth"].attrs["standard_name"] = "depth" # # return ds def ds_add_attrs(ds): # Update attributes for EPIC and STG compliance ds = utils.ds_coord_no_fillvalue(ds) ds["time"].attrs.update( {"standard_name": "time", "axis": "T", "long_name": "time (UTC)"} ) if (ds.attrs["sample_mode"] == "CONTINUOUS") and ("sample" not in ds): if utils.check_time_fits_in_int32(ds, "time"): ds["time"].encoding["dtype"] = "i4" else: print("time variable will not fit in int32; casting to double") ds["time"].encoding["dtype"] = "double" else: if utils.check_time_fits_in_int32(ds, "time"): ds["time"].encoding["dtype"] = "i4" if "sample" in ds: if not utils.check_fits_in_int32(ds, "sample"): raise ValueError() ds["sample"].encoding["dtype"] = "i4" ds["sample"].attrs["long_name"] = "sample number" ds["sample"].attrs["units"] = "1" if "P_1" in ds: ds["P_1"].attrs["standard_name"] = "sea_water_pressure" ds["P_1"].attrs["long_name"] = "Uncorrected pressure" ds["P_1"].attrs["epic_code"] = 1 if "P_1ac" in ds: ds["P_1ac"].attrs.update( { "long_name": "Corrected pressure", "standard_name": "sea_water_pressure_due_to_sea_water", } ) if "P_1ac_note" in ds.attrs: ds["P_1ac"].attrs.update({"note": ds.attrs["P_1ac_note"]}) if "burst" in ds: if not utils.check_fits_in_int32(ds, "burst"): raise ValueError() ds["burst"].encoding["dtype"] = "i4" ds["burst"].attrs["units"] = "1" ds["burst"].attrs["long_name"] = "Burst number" if "Turb" in ds: ds["Turb"].attrs.update( {"long_name": "Turbidity (NTU)", "standard_name": "sea_water_turbidity"} ) if "T_28" in ds: ds["T_28"].attrs.update({"standard_name": "sea_water_temperature"}) if "S_41" in ds: ds["S_41"].attrs.update({"standard_name": "sea_water_practical_salinity"}) if "C_51" in ds: ds["C_51"].attrs.update({"standard_name": "sea_water_electrical_conductivity"}) if "SpC_48" in ds: ds["SpC_48"].attrs.update( { "standard_name": "sea_water_electrical_conductivity", "comment": "Temperature compensated to 25 °C", } ) def add_attributes(var, dsattrs): var.attrs.update( { "initial_instrument_height": dsattrs["initial_instrument_height"], "height_depth_units": "m", } ) # for var in ds.variables: # if (var not in ds.coords) and ("time" not in var): # add_attributes(ds[var], ds.attrs) return ds def dw_add_delta_t(ds): if "burst_interval" in ds.attrs: ds.attrs["DELTA_T"] = int(ds.attrs["burst_interval"]) return ds def profile_clip_ds(ds): print( f"first profile in full file: {ds['time'].min().values}, idx {np.argmin(ds['time'].values)}" ) print( f"last profile in full file: {ds['time'].max().values}, idx {np.argmax(ds['time'].values)}" ) if "good_ens" in ds.attrs: # we have good ensemble indices in the metadata # so we can deal with multiple good_ens ranges, or just a single range # these are good profile numbers good_ens = ds.attrs["good_ens"] goods = [] for n in range(0, len(good_ens), 2): goods.append(np.arange(good_ens[n], good_ens[n + 1])) goods = np.hstack(goods) # these are good obs numbers goodobs = [] for profile in ds.profile.values: if profile in goods: s = get_slice(ds, profile) goodobs.append(list(range(s.start, s.stop + 1))) goodobs = np.hstack(goodobs) ds = ds.sel(profile=goods, obs=goodobs) histtext = "Data clipped using good_ens values of {}.".format(str(good_ens)) ds = utils.insert_history(ds, histtext) else: print("Did not clip data; no values specified in metadata") print( f"first profile in clipped file: {ds['time'].min().values}, idx {np.argmin(ds['time'].values)}" ) print( f"last profile in clipped file: {ds['time'].max().values}, idx {np.argmax(ds['time'].values)}" ) return ds