Source code for stglib.exo

import numpy as np
import pandas as pd
import xarray as xr

from .core import qaqc, utils


[docs]def read_exo(filnam, skiprows=25, encoding="utf-8"): """Read data from a YSI EXO multiparameter sonde .csv file into an xarray Dataset. Parameters ---------- filnam : string The filename skiprows : int, optional How many header rows to skip. Default 25 encoding : string, optional File encoding. Default 'utf-8' Returns ------- xarray.Dataset An xarray Dataset of the EXO data """ try: exo = pd.read_csv( filnam, skiprows=skiprows, # parse_dates=[['Date (MM/DD/YYYY)', # 'Time (HH:MM:SS)']], parse_dates=[[0, 1]], encoding=encoding, ) except UnicodeDecodeError: exo = pd.read_csv( filnam, skiprows=skiprows, # parse_dates=[['Date (MM/DD/YYYY)', # 'Time (HH:MM:SS)']], parse_dates=[[0, 1]], encoding="mac-roman", ) except NotImplementedError as e: print( ( " *** Could not decode file. Try saving the csv file using " "UTF-8 encoding and retrying\n" ), e, ) except ValueError as e: print( ( " *** Could not decode header. " "Have you specified skiprows correctly?\n" ), e, ) # exo.rename(columns={'Date (MM/DD/YYYY)_Time (HH:MM:SS)': 'time'}, # inplace=True) exo.rename( columns={exo.columns[0]: "time"}, inplace=True ) # rename first column to time. # Need to do this because the format of the date/time header can change between versions exo.set_index("time", inplace=True) exo.rename(columns=lambda x: x.replace(" ", "_"), inplace=True) exo.rename(columns=lambda x: x.replace("/", "_per_"), inplace=True) pvar = None if "Press_psi_a" in exo.columns: pvar = "Press_psi_a" elif "Pressure_psi_a" in exo.columns: pvar = "Pressure_psi_a" else: print( "*** Could not find pressure (Press_psi_a, Pressure_psi_a) in source data file. Have you exported pressure if this instrument was equipped with a pressure sensor?" ) if pvar: # Convert from PSI to dbar exo["Press_dbar"] = exo[pvar] * 0.689476 exo = xr.Dataset(exo) hdr = read_exo_header(filnam, encoding=encoding) exo.attrs["serial_number"] = hdr["serial_number"] exo.attrs["instrument_type"] = "YSI EXO2 Multiparameter Sonde" # Apply sensor serial numbers to each sensor for k in exo.variables: if "fDOM" in k: if "fDOM" in hdr: hdrvar = "fDOM" elif "fDOM QSU" in hdr: hdrvar = "fDOM QSU" exo[k].attrs["sensor_serial_number"] = hdr[hdrvar]["sensor_serial_number"] elif "Chlorophyll" in k or "BGA" in k or "TAL" in k: if "Total Algae BGA-PE" in hdr: hdrvar = "Total Algae BGA-PE" elif "BGA PE RFU" in hdr: hdrvar = "BGA PE RFU" elif "TAL PE RFU" in hdr: hdrvar = "TAL PE RFU" exo[k].attrs["sensor_serial_number"] = hdr[hdrvar]["sensor_serial_number"] elif "Temp" in k or "Cond" in k or "Sal" in k: if "Unknown CT" in hdr: exo[k].attrs["sensor_serial_number"] = hdr["Unknown CT"][ "sensor_serial_number" ] elif "Wiped CT" in hdr: exo[k].attrs["sensor_serial_number"] = hdr["Wiped CT"][ "sensor_serial_number" ] else: hdrvar = "Sal psu" exo[k].attrs["sensor_serial_number"] = hdr[hdrvar]["sensor_serial_number"] elif "ODO" in k: try: exo[k].attrs["sensor_serial_number"] = hdr["Optical DO"][ "sensor_serial_number" ] except KeyError: exo[k].attrs["sensor_serial_number"] = hdr["ODO % sat"][ "sensor_serial_number" ] elif k == "Turbidity": exo[k].attrs["sensor_serial_number"] = hdr["Turbidity"][ "sensor_serial_number" ] elif k == "Turbidity_NTU": exo[k].attrs["sensor_serial_number"] = hdr["Turbidity NTU"][ "sensor_serial_number" ] elif k == "Turbidity_FNU": exo[k].attrs["sensor_serial_number"] = hdr["Turbidity FNU"][ "sensor_serial_number" ] elif "pH" in k: exo[k].attrs["sensor_serial_number"] = hdr["pH"]["sensor_serial_number"] elif "Press" in k or "Depth" in k: if "Depth Non-Vented 0-10m" in hdr: hdrvar = "Depth Non-Vented 0-10m" elif "Depth m" in hdr: hdrvar = "Depth m" elif "Pressure psi a" in hdr: hdrvar = "Pressure psi a" else: hdrvar = None exo[k].attrs["sensor_serial_number"] = hdr[hdrvar]["sensor_serial_number"] return exo
[docs]def csv_to_cdf(metadata): """ Process EXO .csv file to a raw .cdf file """ basefile = metadata["basefile"] try: ds = read_exo(basefile + ".csv", skiprows=metadata["skiprows"]) except UnicodeDecodeError: # try reading as Mac OS Western for old versions of Mac Excel ds = read_exo( basefile + ".csv", skiprows=metadata["skiprows"], encoding="mac-roman" ) metadata.pop("skiprows") # write out metadata first, then deal exclusively with xarray attrs ds = utils.write_metadata(ds, metadata) del metadata ds = utils.ensure_cf(ds) ds = utils.shift_time(ds, 0) # configure file cdf_filename = ds.attrs["filename"] + "-raw.cdf" ds.to_netcdf(cdf_filename, unlimited_dims=["time"]) print("Finished writing data to %s" % cdf_filename) return ds
def cdf_to_nc(cdf_filename, atmpres=False): """ Load a raw .cdf file and generate a processed .nc file """ # Load raw .cdf data ds = xr.load_dataset(cdf_filename) # remove units in case we change and we can use larger time steps ds.time.encoding.pop("units") # Clip data to in/out water times or via good_ens ds = utils.clip_ds(ds) ds = ds_rename_vars(ds) # ds = ds_add_attrs(ds) for k in [ "Press_psi_a", "Pressure_psi_a", "Site_Name", "Fault_Code", "Time_(Fract._Sec)", "TDS_mg_per_L", "TSS_mg_per_L", "Wiper_Position_volt", "Cable_Pwr_V", # https://www.ysi.com/file%20library/documents/manuals/exo-user-manual-web.pdf # nLF_Cond_µS_per_cm: "This convention is typically used in German markets." pp. 85 "nLF_Cond_µS_per_cm", "nLF_Cond_mS_per_cm", "Vertical_Position_m", "pH_mV", ]: if k in ds: ds = ds.drop(k) ds = qaqc.drop_vars(ds) if atmpres: ds = utils.atmos_correct(ds, atmpres) ds = exo_qaqc(ds) # assign min/max: ds = utils.add_min_max(ds) ds = utils.add_start_stop_time(ds) ds = utils.add_delta_t(ds) # add lat/lon coordinates ds = utils.ds_add_lat_lon(ds) # ds = utils.create_water_depth(ds) ds = utils.create_nominal_instrument_depth(ds) ds = utils.create_z(ds) ds = ds_add_attrs(ds) # add lat/lon coordinates to each variable # for var in ds.variables: # if (var not in ds.coords) and ("time" not in var): # # ds = utils.add_lat_lon(ds, var) # # ds = utils.no_p_add_depth(ds, var) # ds = utils.add_z_if_no_pressure(ds, var) # # cast as float32 # # ds = utils.set_var_dtype(ds, var) # No longer report depth if "Depth_m" in ds: ds = ds.drop("Depth_m") # Write to .nc file print("Writing cleaned/trimmed data to .nc file") nc_filename = ds.attrs["filename"] + "-a.nc" ds.to_netcdf( nc_filename, unlimited_dims=["time"], encoding={"time": {"dtype": "i4"}} ) utils.check_compliance(nc_filename, conventions=ds.attrs["Conventions"]) print("Done writing netCDF file", nc_filename) def ds_rename_vars(ds): if "Cond_mS_per_cm" in ds: ds["Cond_mS_per_cm"].values = ( ds["Cond_mS_per_cm"].values / 10 ) # convert from mS/cm to S/m if "Cond_µS_per_cm" in ds: ds["Cond_µS_per_cm"].values = ( ds["Cond_µS_per_cm"].values / 10000 ) # convert from µS/cm to S/m if "SpCond_mS_per_cm" in ds: ds["SpCond_mS_per_cm"].values = ( ds["SpCond_mS_per_cm"].values / 10 ) # convert from mS/cm to S/m if "SpCond_µS_per_cm" in ds: ds["SpCond_µS_per_cm"].values = ( ds["SpCond_µS_per_cm"].values / 10000 ) # convert from µS/cm to S/m # set up dict of instrument -> EPIC variable names varnames = { "Press_dbar": "P_1", "Battery_V": "Bat_106", "fDOM_RFU": "fDOMRFU", "fDOM_QSU": "fDOMQSU", # capitalization based on Chincoteague names "Chlorophyll_RFU": "CHLrfu", "Chlorophyll_µg_per_L": "Fch_906", "Chlorophyll_ug_per_L": "Fch_906", # added variable name "BGA-PE_RFU": "TALPErfu", # BGA is old variable name "BGA_PE_RFU": "TALPErfu", # BGA is old variable name "BGA-PE_µg_per_L": "TALPE", # BGA is old variable name "BGA_PE_ug_per_L": "TALPE", # BGA is old variable name "TAL_PE_RFU": "TALPErfu", # added variable name "TAL_PE_ug_per_L": "TALPE", # added variable name "Temp_°C": "T_28", "Temp_∞C": "T_28", "Cond_mS_per_cm": "C_51", "Cond_µS_per_cm": "C_51", "SpCond_mS_per_cm": "SpC_48", "SpCond_µS_per_cm": "SpC_48", "Sal_psu": "S_41", "ODO_%_sat": "OST_62", "ODO_mg_per_L": "DO", "Turbidity_NTU": "Turb", "Turbidity_FNU": "Turb_FNU", "pH": "pH_159", } # check to make sure they exist before trying to rename newvars = {} for k in varnames: if k in ds: newvars[k] = varnames[k] return ds.rename(newvars) def ds_add_attrs(ds): # Update attributes for EPIC and STG compliance ds = utils.ds_coord_no_fillvalue(ds) ds["time"].attrs.update( {"standard_name": "time", "axis": "T", "long_name": "time (UTC)"} ) ds["Bat_106"].attrs.update( {"units": "V", "long_name": "Battery voltage", "epic_code": 106} ) if "fDOMRFU" in ds: ds["fDOMRFU"].attrs.update( { "units": "percent", "long_name": "Fluorescent dissolved organic matter, RFU", "comments": "Relative fluorescence units (RFU)", } ) if "fDOMQSU" in ds: ds["fDOMQSU"].attrs.update( { "units": "1e-9", "long_name": "Fluorescent dissolved organic matter, QSU", "comments": "Quinine sulfate units (QSU)", } ) if "CHLrfu" in ds: ds["CHLrfu"].attrs.update( { "units": "percent", "long_name": "Chlorophyll A, RFU", "comments": "Relative fluorescence units (RFU)", } ) if "Fch_906" in ds: ds["Fch_906"].attrs.update( { "units": "ug/L", "long_name": "Chlorophyll A", "epic_code": 906, "standard_name": "mass_concentration_of_chlorophyll_in_sea_water", "comments": "from calibration of sensor with rhodamine W/T in lab", } ) if "TALPErfu" in ds: ds["TALPErfu"].attrs.update( { "units": "percent", "long_name": "Total algae phycoerythrin, RFU", "comments": "Relative fluorescence units (RFU); formerly called BGAPErfu (Blue green algae phycoerythrin, RFU)", } ) if "TALPE" in ds: ds["TALPE"].attrs.update( { "units": "ug/L", "long_name": "Total algae phycoerythrin", "comments": "Formerly called BGAPE (Blue green algae phycoerythrin)", } ) ds["T_28"].attrs.update( { "units": "degree_C", "long_name": "Temperature", "epic_code": 28, "standard_name": "sea_water_temperature", } ) ds["C_51"].attrs.update( { "units": "S/m", "long_name": "Conductivity", "epic_code": 51, "standard_name": "sea_water_electrical_conductivity", } ) ds["SpC_48"].attrs.update( { "units": "S/m", "long_name": "Specific Conductivity", "comment": "Temperature compensated to 25 °C", "epic_code": 48, # "standard_name": "sea_water_electrical_conductivity", } ) ds["S_41"].attrs.update( { "units": "1", "long_name": "Salinity, PSU", "comments": "Practical salinity units (PSU)", "epic_code": 41, "standard_name": "sea_water_practical_salinity", } ) if "OST_62" in ds: ds["OST_62"].attrs.update( { "units": "percent", "long_name": "Oxygen percent saturation", "epic_code": 62, "standard_name": "fractional_saturation_of_oxygen_in_sea_water", } ) if "DO" in ds: ds["DO"].attrs.update( { "units": "mg/L", "long_name": "Dissolved oxygen", "standard_name": "mass_concentration_of_oxygen_in_sea_water", } ) if "Turb" in ds: ds["Turb"].attrs.update( { "units": "1", "long_name": "Turbidity, NTU", "comments": "Nephelometric turbidity units (NTU)", "standard_name": "sea_water_turbidity", } ) if "Turb_FNU" in ds: ds["Turb_FNU"].attrs.update( { "units": "1", "long_name": "Turbidity, FNU", "comments": "Formazin nephelometric units (FNU)", "standard_name": "sea_water_turbidity", } ) if "pH_159" in ds.variables: ds["pH_159"].attrs.update( { "units": "1", "standard_name": "sea_water_ph_reported_on_total_scale", "epic_code": 159, } ) if "P_1" in ds: ds["P_1"].attrs.update( { "units": "dbar", "long_name": "Uncorrected pressure", "epic_code": 1, "standard_name": "sea_water_pressure", } ) if "P_1ac" in ds: ds["P_1ac"].attrs.update( { "units": "dbar", "long_name": "Corrected pressure", "standard_name": "sea_water_pressure_due_to_sea_water", } ) if "P_1ac_note" in ds.attrs: ds = utils.insert_note(ds, "P_1ac", ds.attrs["P_1ac_note"] + " ") def add_attributes(var, dsattrs): var.attrs.update( { "initial_instrument_height": dsattrs["initial_instrument_height"], # 'nominal_instrument_depth': dsattrs['nominal_instrument_depth'], "height_depth_units": "m", } ) # for var in ds.variables: # if (var not in ds.coords) and ("time" not in var): # add_attributes(ds[var], ds.attrs) return ds def read_exo_header(filnam, encoding="utf-8"): header = {} try: # Old version of KOR export file hdr = pd.read_csv(filnam, skiprows=None, encoding=encoding) hdr = pd.DataFrame(hdr.iloc[:, 0:4]) # print(hdr) header["serial_number"] = ( hdr[hdr["KOR Export File"] == "Sonde ID"].values[0][1].split(" ")[1] ) for var in [ "fDOM", "Total Algae BGA-PE", "Wiped CT", "Unknown CT", "Optical DO", "Turbidity", "pH", "Depth Non-Vented 0-10m", ]: vals = hdr[hdr["KOR Export File"] == var] if not vals.empty: header[var] = {} header[var]["sensor_serial_number"] = vals.values[0][1] header[var]["data_columns"] = [ int(x) for x in vals.values[0][3].split(";") ] except (pd.errors.ParserError, KeyError): # new version of KOR export file hdr = pd.read_csv(filnam, skiprows=4, encoding=encoding) hdr = pd.DataFrame(hdr.iloc[:, 3:]) # get instrument SN from filename -- this will fail on files not named according to the Kor default file-naming convention but I'm not sure how else to get it header["serial_number"] = filnam.split("/")[-1].split("_")[1] row = np.where(hdr.iloc[:, 0] == "SENSOR SERIAL NUMBER:") a = np.vstack([hdr.iloc[row[0] + 1, :].values, hdr.iloc[row[0], :].values]).T for v in a: if v[0] != "Site Name": header[v[0]] = {} header[v[0]]["sensor_serial_number"] = v[1] # try to get serial_number attribute from Battery V if v[0] == "Battery V": header["serial_number"] = v[1] return header def exo_qaqc(ds): """ QA/QC Trim EXO data based on metadata """ varlist = [ "S_41", "C_51", "SpC_48", "T_28", "Turb", "fDOMRFU", "fDOMQSU", "CHLrfu", "Fch_906", "TALPErfu", "TALPE", "OST_62", "DO", "pH_159", "P_1ac", "P_1", ] [varlist.append(k) for k in ds.data_vars if k not in varlist] for var in varlist: ds = qaqc.trim_min(ds, var) ds = qaqc.trim_max(ds, var) ds = qaqc.trim_min_diff(ds, var) ds = qaqc.trim_min_diff_pct(ds, var) ds = qaqc.trim_max_diff(ds, var) ds = qaqc.trim_max_diff_pct(ds, var) ds = qaqc.trim_med_diff(ds, var) ds = qaqc.trim_med_diff_pct(ds, var) ds = qaqc.trim_bad_ens(ds, var) for var in varlist: ds = qaqc.trim_by_any( ds, var ) # re-run and trim by other variables as necessary return ds