Source code for stglib.hobo

import csv
import string

import numpy as np
import pandas as pd
import xarray as xr

from .core import qaqc, utils


[docs]def read_hobo( filnam, skiprows=1, skipfooter=0, names=["#", "DateTime", "AbsPres_kPa", "Temp_C"] ): """Read data from an Onset HOBO pressure sensor .csv file into an xarray Dataset. Parameters ---------- filnam : string The filename skiprows : int, optional How many header rows to skip. Default 1 skipfooter : int, optional How many footer rows to skip. Default 0 Returns ------- xarray.Dataset An xarray Dataset of the HOBO data """ hobo = pd.read_csv( filnam, usecols=np.arange(len(names)), names=names, engine="python", skiprows=skiprows, skipfooter=skipfooter, ) hobo["time"] = pd.to_datetime(hobo["DateTime"]) hobo.set_index("time", inplace=True) return xr.Dataset(hobo)
def csv_to_cdf(metadata): """ Process HOBO .csv file to a raw .cdf file """ basefile = metadata["basefile"] kwargs = {"skiprows": metadata["skiprows"], "skipfooter": metadata["skipfooter"]} if "names" in metadata: kwargs["names"] = metadata["names"] else: kwargs["names"] = get_col_names(basefile + ".csv", metadata) try: ds = read_hobo(basefile + ".csv", **kwargs) except UnicodeDecodeError: # try reading as Mac OS Western for old versions of Mac Excel ds = read_hobo(basefile + ".csv", encoding="mac-roman", **kwargs) metadata.pop("skiprows") metadata.pop("skipfooter") if "ncols" in metadata: metadata.pop("ncols") # write out metadata first, then deal exclusively with xarray attrs ds = utils.write_metadata(ds, metadata) del metadata ds = utils.ensure_cf(ds) ds = utils.shift_time(ds, 0) ds = drop_vars(ds) ds.attrs["serial_number"] = get_serial_number(basefile + ".csv") # configure file cdf_filename = ds.attrs["filename"] + "-raw.cdf" ds.to_netcdf(cdf_filename, unlimited_dims=["time"]) print("Finished writing data to %s" % cdf_filename) return ds def drop_vars(ds): todrop = ["#", "DateTime"] return ds.drop([x for x in todrop if x in ds]) def ds_rename_vars(ds): # convert some units if "Conductance_uSpercm" in ds: ds["Conductance_uSpercm"].values = ( ds["Conductance_uSpercm"].values / 10000 ) # convert from µS/cm to S/m""" if "SpecificConductance_uSpercm" in ds: ds["SpecificConductance_uSpercm"].values = ( ds["SpecificConductance_uSpercm"].values / 10000 ) # convert from µS/cm to S/m if "AbsPresBarom_kPa" in ds: ds["AbsPresBarom_kPa"].values = ( ds["AbsPresBarom_kPa"].values * 10 ) # convert from kPa to millibars ds = ds.rename({"AbsPresBarom_kPa": "AbsPresBarom_mbar"}) if "AbsPres_kPa" in ds: ds["AbsPres_kPa"].values = ( ds["AbsPres_kPa"].values / 10 ) # convert from kPa to decibars ds = ds.rename({"AbsPres_kPa": "AbsPres_dbar"}) # check to see if logger was deployed as barometer if ds.attrs["instrument_type"] == "hwlb": if "AbsPres_dbar" in ds: ds["AbsPres_dbar"].values = ( ds["AbsPres_dbar"].values * 100 ) # convert from dbar to millibars ds = ds.rename({"AbsPres_dbar": "AbsPresBarom_mbar"}) if "Temp_C" in ds: ds = ds.rename({"Temp_C": "Atemp_C"}) # set up dict of instrument -> EPIC variable names varnames = { "AbsPres_dbar": "P_1", "Temp_C": "T_28", "AbsPresBarom_mbar": "BPR_915", "SensorDepth_meters": "D_3", "Conductance_uSpercm": "C_51", "SpecificConductance_uSpercm": "SpC_48", "Salinity_ppt": "S_41", "DOPercentSat_percent": "OST_62", "DOconc_mgperL": "DO", "DOAdjConc_mgperL": "DO_Adj", "Atemp_C": "T_21", } # check to make sure they exist before trying to rename newvars = {} for k in varnames: if k in ds: newvars[k] = varnames[k] # drop unneeded vars todrop = ["FullRange_uSpercm"] ds = ds.drop([x for x in todrop if x in ds]) return ds.rename(newvars) def ds_add_attrs(ds): # Update attributes for EPIC and STG compliance ds = utils.ds_coord_no_fillvalue(ds) ds["time"].attrs.update( {"standard_name": "time", "axis": "T", "long_name": "time (UTC)"} ) # Some legacy code leave for now -part for conductivity if "condlo_uScm" in ds: ds = ds.rename({"condlo_uScm": "SpC_48_lo"}) ds["SpC_48_lo"].attrs.update( { "units": "uS/cm", "long_name": "Conductivity", "comment": "Temperature compensated to 25 °C; low range", "epic_code": 48, "standard_name": "sea_water_electrical_conductivity", } ) ds["S_41_lo"] = utils.salinity_from_spcon(ds["SpC_48_lo"]) ds["S_41_lo"].attrs.update( { "units": "1", "long_name": "Salinity; low range, PSU", "epic_code": 41, "standard_name": "sea_water_practical_salinity", } ) if "condhi_uScm" in ds: ds = ds.rename({"condhi_uScm": "SpC_48_hi"}) ds["SpC_48_hi"].attrs.update( { "units": "uS/cm", "long_name": "Conductivity", "comment": "Temperature compensated to 25 °C; high range", "epic_code": 48, "standard_name": "sea_water_electrical_conductivity", } ) ds["S_41_hi"] = utils.salinity_from_spcon(ds["SpC_48_hi"]) ds["S_41_hi"].attrs.update( { "units": "1", "long_name": "Salinity; high range, PSU", "epic_code": 41, "standard_name": "sea_water_practical_salinity", } ) # End of legacy code section if "T_28" in ds: ds["T_28"].attrs.update( { "units": "degree_C", "long_name": "Temperature", "epic_code": 28, "standard_name": "sea_water_temperature", } ) if "C_51" in ds: ds["C_51"].attrs.update( { "units": "S/m", "long_name": "Conductivity", "epic_code": 51, "standard_name": "sea_water_electrical_conductivity", } ) if "SpC_48" in ds: ds["SpC_48"].attrs.update( { "units": "S/m", "long_name": "Specific Conductivity", "comment": "Temperature compensated to 25 C", "epic_code": 48, "standard_name": "sea_water_electrical_conductivity", } ) if "S_41" in ds: ds["S_41"].attrs.update( { "units": "1", "long_name": "Salinity, PSU", "comments": "Practical salinity units (PSU)", "epic_code": 41, "standard_name": "sea_water_practical_salinity", } ) if "OST_62" in ds: ds["OST_62"].attrs.update( { "units": "percent", "long_name": "Oxygen percent saturation", "epic_code": 62, "standard_name": "fractional_saturation_of_oxygen_in_sea_water", } ) if "DO" in ds: ds["DO"].attrs.update( { "units": "mg/L", "long_name": "Dissolved oxygen", "standard_name": "mass_concentration_of_oxygen_in_sea_water", } ) if "DO_Adj" in ds: ds["DO"].values = ds["DO_Adj"].values if "DO_note" in ds.attrs: # ds = utils.insert_note(ds, "DO", ds.attrs["DO_note"] + " ") ds["DO"].attrs.update({"note": ds.attrs["DO_note"]}) else: ds["DO"].attrs.update( { "note": "Using adjusted DO concentration", } ) ds = ds.drop_vars("DO_Adj") if "P_1" in ds: ds["P_1"].attrs.update( { "units": "dbar", "long_name": "Uncorrected pressure", "epic_code": 1, "standard_name": "sea_water_pressure", } ) if "P_1ac" in ds: ds["P_1ac"].attrs.update( { "units": "dbar", "long_name": "Corrected pressure", "standard_name": "sea_water_pressure_due_to_sea_water", } ) if "P_1ac_note" in ds.attrs: # ds = utils.insert_note(ds, "P_1ac", ds.attrs["P_1ac_note"] + " ") ds["P_1ac"].attrs.update({"note": ds.attrs["P_1ac"]}) if "D_3" in ds: ds["D_3"].attrs.update( { "units": f"{ds.depth.attrs['units']}", "long_name": "depth below sea surface", "standard_name": "depth", "positive": f"{ds.depth.attrs['positive']}", } ) if "D_3_note" in ds.attrs: # ds = utils.insert_note(ds, "D_3", ds.attrs["D_3_note"] + " ") ds["D_3"].attrs.update({"note": ds.attrs["D_3_note"]}) if "BPR_915" in ds: ds["BPR_915"].attrs.update( { "units": "mbar", "long_name": "Barometric pressure", "epic_code": 915, "standard_name": "air_pressure", } ) if "T_21" in ds: ds["T_21"].attrs.update( { "units": "degree_C", "long_name": "Air Temperature", "epic_code": 21, "standard_name": "air_temperature", } ) return ds def get_serial_number(filnam): """get the serial number of the instrument""" with open(filnam) as f: f.readline() line2 = f.readline() sn = line2.split("LGR S/N: ")[1].split(",")[0] return sn def strip_non_printable(strin): """Returns the string without non printable characters""" printable = set(string.printable) return "".join(filter(lambda x: x in printable, strin)) def get_col_names(filnam, metadata): """get column names and column units from instrument input data file""" with open(filnam) as f: rdr = csv.reader(f) # check to see if first value is "#" hdrline = next(rdr) while hdrline[0] != "#": hdrline = next(rdr) collist = [x.split(" (")[0] for x in hdrline] colnames = [] colunits = [] for x in collist: # spl = x.split(",") spl = x.replace(" ", "").split(",") colnames.append(spl[0].strip(".")) if len(spl) > 1: colunits.append(spl[1].strip()) else: colunits.append("") if "ncols" in metadata: colnames = colnames[: metadata["ncols"]] colunits = colunits[: metadata["ncols"]] # make dict of names and units dcols = {} for i in range(len(colnames)): d = {colnames[i]: colunits[i]} dcols.update(d) # try removing special characters and those not allowed in var or dim names from units for k in dcols: # first step try replacing values if "µ" in dcols[k]: dcols[k] = dcols[k].replace("µ", "u") if "°" in dcols[k]: dcols[k] = dcols[k].replace("°", "") if "%" in dcols[k]: dcols[k] = dcols[k].replace("%", "percent") if "Temp" in k: if "C" in dcols[k]: dcols[k] = "C" elif "F" in dcols[k]: dcols[k] = "F" if "/" in dcols[k]: dcols[k] = dcols[k].replace("/", "per") # then strip non-ascii characters dcols[k] = strip_non_printable(dcols[k]) names = [] for k in dcols: if k == "#" or k == "DateTime": names.append(k) else: names.append(k + "_" + dcols[k]) return names def cdf_to_nc(cdf_filename): """ Load a raw .cdf file and generate a processed .nc file """ # Load raw .cdf data ds = xr.open_dataset(cdf_filename) # remove units in case we change and we can use larger time steps ds.time.encoding.pop("units") # Clip data to in/out water times or via good_ens ds = utils.clip_ds(ds) # rename variables ds = ds_rename_vars(ds) # should function this for var in ds.data_vars: ds = qaqc.trim_min(ds, var) ds = qaqc.trim_max(ds, var) ds = qaqc.trim_min_diff(ds, var) ds = qaqc.trim_max_diff(ds, var) ds = qaqc.trim_med_diff(ds, var) ds = qaqc.trim_med_diff_pct(ds, var) ds = qaqc.trim_bad_ens(ds, var) ds = qaqc.trim_maxabs_diff_2d(ds, var) ds = qaqc.trim_fliers(ds, var) # after check for masking vars by other vars for var in ds.data_vars: ds = qaqc.trim_mask(ds, var) # check for drop_vars is config yaml if "drop_vars" in ds.attrs: ds = qaqc.drop_vars(ds) ds = utils.create_z(ds) # added 7/31/2023 ds = ds_add_attrs(ds) # assign min/max: ds = utils.add_min_max(ds) ds = utils.add_start_stop_time(ds) ds = utils.add_delta_t(ds) # add lat/lon coordinates ds = utils.ds_add_lat_lon(ds) if "vert_dim" in ds.attrs: vdim = ds.attrs["vert_dim"] # axis attr set for z in utils.create_z so need to del if other than z if ds.attrs["vert_dim"] != "z": ds[vdim].attrs["axis"] = "Z" del ds["z"].attrs["axis"] # Write to .nc file print("Writing cleaned/trimmed data to .nc file") nc_filename = ds.attrs["filename"] + "-a.nc" ds.to_netcdf( nc_filename, unlimited_dims=["time"], encoding={"time": {"dtype": "i4"}} ) utils.check_compliance(nc_filename, conventions=ds.attrs["Conventions"]) print("Done writing netCDF file", nc_filename)