Source code for stglib.wxt

import pandas as pd
import xarray as xr

from .core import utils


[docs]def read_wxt(filnam, skiprows=7, encoding="utf-8"): """Read data from a Vaisala WXT met .csv file into an xarray Dataset. Parameters ---------- filnam : string The filename skiprows : int, optional How many header rows to skip. Default 7 encoding : string, optional File encoding. Default 'utf-8' Returns ------- xarray.Dataset An xarray Dataset of the WXT data """ df = pd.read_csv( filnam, skiprows=skiprows, header=0, na_values=[-9999], encoding=encoding, index_col=False, ) df["Date and Time in UTC"] = pd.to_datetime(df["Date and Time in UTC"]) df.rename(columns={"Date and Time in UTC": "time"}, inplace=True) df.index.names = ["time"] wxt = xr.Dataset.from_dataframe(df) return wxt
# Make raw CDF
[docs]def csv_to_cdf(metadata): basefile = metadata["basefile"] ds = read_wxt(basefile + ".csv", skiprows=metadata["skiprows"]) metadata.pop("skiprows") ds = utils.write_metadata(ds, metadata) ds = utils.ensure_cf(ds) # configure file cdf_filename = ds.attrs["filename"] + "-raw.cdf" ds.to_netcdf(cdf_filename, unlimited_dims=["time"]) print("Finished writing data to %s" % cdf_filename) return ds
# Process data and write to .nc file
[docs]def cdf_to_nc(cdf_filename): """ Load a raw .cdf file and generate a processed .nc file """ # Load raw .cdf data ds = xr.open_dataset(cdf_filename) # remove units in case we change and we can use larger time steps ds.time.encoding.pop("units") # Get rid of unneeded variables for k in [ "SampNum", "Battery", "BoardTemp", "signalPercent", ]: if k in ds: ds = ds.drop_vars(k) # create vert dim z ds = utils.create_z(ds) # Rename variables to CF compliant names ds = ds_rename_vars(ds) # force WD vars to floats if "WD_min" in ds: ds["WD_min"] = ds["WD_min"].astype(float) if "WD_410" in ds: ds["WD_410"] = ds["WD_410"].astype(float) if "WD_gust" in ds: ds["WD_gust"] = ds["WD_gust"].astype(float) # If sensor wasn't pointing to magnetic north, apply offset to direction if "dir_offset" in ds.attrs: print(f"Applying dir_offset of {ds.attrs['dir_offset']}") if "WD_min" in ds: ds["WD_min"] = ds["WD_min"] + ds.attrs["dir_offset"].astype(float) if "WD_410" in ds: ds["WD_410"] = ds["WD_410"] + ds.attrs["dir_offset"].astype(float) if "WD_gust" in ds: ds["WD_gust"] = ds["WD_gust"] + ds.attrs["dir_offset"].astype(float) # Convert direction from magnetic to true with magenetic declination print( f"Rotating directions from magnetic north to true north by applying magnetic_variation of {ds.attrs['magnetic_variation']}" ) if "WD_min" in ds: ds["WD_min"] = ds["WD_min"] + ds.attrs["magnetic_variation"].astype(float) ds["WD_min"] = ds["WD_min"].round(0) ds["WD_min"][ds["WD_min"] < 0.0] = ds["WD_min"][ds["WD_min"] < 0.0] + 360.0 ds["WD_min"][ds["WD_min"] >= 360.0] = ( ds["WD_min"][ds["WD_min"] >= 360.0] - 360.0 ) ds["WD_min"][ds["WD_min"] == 0.0] = 0.0 # convert any -0. to 0. if "WD_410" in ds: ds["WD_410"] = ds["WD_410"] + ds.attrs["magnetic_variation"].astype(float) ds["WD_410"] = ds["WD_410"].round(0) ds["WD_410"][ds["WD_410"] < 0.0] = ds["WD_410"][ds["WD_410"] < 0.0] + 360.0 ds["WD_410"][ds["WD_410"] >= 360.0] = ( ds["WD_410"][ds["WD_410"] >= 360.0] - 360.0 ) ds["WD_410"][ds["WD_410"] == 0.0] = 0.0 # convert any -0. to 0. if "WD_gust" in ds: ds["WD_gust"] = ds["WD_gust"] + ds.attrs["magnetic_variation"].astype(float) ds["WD_gust"] = ds["WD_gust"].round(0) ds["WD_gust"][ds["WD_gust"] < 0.0] = ds["WD_gust"][ds["WD_gust"] < 0.0] + 360.0 ds["WD_gust"][ds["WD_gust"] >= 360.0] = ( ds["WD_gust"][ds["WD_gust"] >= 360.0] - 360.0 ) ds["WD_gust"][ds["WD_gust"] == 0.0] = 0.0 # convert any -0. to 0. # Run utilities ds = utils.clip_ds(ds) ds = utils.add_start_stop_time(ds) ds = utils.ds_add_lat_lon(ds) ds = utils.add_min_max(ds) ds = utils.add_delta_t(ds) # Add attributes ds = ds_add_attrs(ds) # Write to .nc file print("Writing cleaned/trimmed data to .nc file") nc_filename = ds.attrs["filename"] + "-a.nc" ds.to_netcdf( nc_filename, unlimited_dims=["time"], encoding={"time": {"dtype": "i4"}} ) utils.check_compliance(nc_filename, conventions=ds.attrs["Conventions"]) print("Done writing netCDF file", nc_filename)
# Rename variables to be CF compliant def ds_rename_vars(ds): varnames = { "WXTDn": "WD_min", "WXTDm": "WD_410", "WXTDx": "WD_gust", "WXTSn": "WS_min", "WXTSm": "WS_401", "WXTSx": "WG_402", "WXTTa": "T_21", "WXTUa": "RH_910", "WXTPa": "BPR_915", "WXTRc": "Rn_963", } # Check to make sure they exist before trying to rename newvars = {} for k in varnames: if k in ds: newvars[k] = varnames[k] return ds.rename(newvars) # Add attributes: units, standard name from CF website, epic code def ds_add_attrs(ds): ds = utils.ds_coord_no_fillvalue(ds) ds["time"].attrs.update( {"standard_name": "time", "axis": "T", "long_name": "time (UTC)"} ) if "WD_min" in ds: ds["WD_min"].attrs.update( { "units": "degrees", "long_name": "minimum wind from direction relative to true north", } ) if "WD_410" in ds: ds["WD_410"].attrs.update( { "units": "degrees", "long_name": "mean wind from direction relative to true north", "standard_name": "wind_from_direction", "epic_code": "410", } ) if "WD_gust" in ds: ds["WD_gust"].attrs.update( { "units": "degrees", "long_name": "maximum wind from direction relative to true north", "standard_name": "wind_gust_from_direction", } ) if "WS_min" in ds: ds["WS_min"].attrs.update({"units": "m/s", "long_name": "minimum wind speed"}) if "WS_401" in ds: ds["WS_401"].attrs.update( { "units": "m/s", "long_name": "mean wind speed", "standard_name": "wind_speed", "epic_code": "401", } ) if "WG_402" in ds: ds["WG_402"].attrs.update( { "units": "m/s", "long_name": "maximum wind speed", "standard_name": "wind_speed_of_gust", "epic_code": "402", } ) if "T_21" in ds: ds["T_21"].attrs.update( {"units": "degree_C", "standard_name": "air_temperature", "epic_code": "21"} ) if "RH_910" in ds: ds["RH_910"].attrs.update( { "units": "percent", "standard_name": "relative_humidity", "epic_code": "910", } ) if "BPR_915" in ds: ds["BPR_915"].attrs.update( {"units": "pascals", "standard_name": "air_pressure", "epic_code": "915"} ) if "Rn_963" in ds: ds["Rn_963"].attrs.update( { "units": "mm", "standard_name": "thickness_of_rainfall_amount", "epic_code": "963", } ) # add initial height information and fill values to variables def add_attributes(var, dsattrs): var.attrs["initial_instrument_height"] = dsattrs["initial_instrument_height"] var.attrs["height_depth_units"] = "m" if "initial_instrument_height_note" in dsattrs: var.attrs["initial_instrument_height_note"] = dsattrs[ "initial_instrument_height_note" ] if "sensor_type" not in dsattrs: var.attrs["sensor_type"] = "Vaisala WXT536" # don't include all attributes for coordinates that are also variables for var in ds.variables: if (var not in ds.coords) and ("time" not in var): add_attributes(ds[var], ds.attrs) return ds