Source code for stglib.indexvel

import numpy as np
import pandas as pd
import scipy.stats
import xmltodict


def read_areacomp_stationarea(filename):
    return read_areacomp(filename)


def read_areacomp_stationstage(filename):
    return read_areacomp(filename)


def read_areacomp(filename):
    return pd.read_csv(filename, skiprows=4)


def read_qrev_xml(filename, encoding="utf-8"):
    with open(filename, encoding=encoding) as fd:
        return xmltodict.parse(fd.read())


[docs]def parse_qrev_xml(doc, negateq=False, xarray=False): """ Parse XML output from QRev and return as a Pandas DataFrame Parameters ---------- doc : dict XML document as generated by `xmltodict.parse()` of the QRev XML file. negateq : bool or list of bools, optional, default False Negate q (discharge) values. Useful for changing the upstream/ downstream convention. Default False If negateq is a single bool, negate all transects. If negateq is a a list of bools, only negate those whose value is True xarray : bool, optional, default False If True, return an xarray Dataset Returns ------- pandas.DataFrame pandas DataFrame of relevant values extracted from the QRev XML tree. if xarray is True, returns an xarray Dataset """ adcp = {} dct = doc["Channel"]["Transect"] r = range(len(dct)) adcp["starttime"] = pd.to_datetime([dct[n]["StartDateTime"]["#text"] for n in r]) adcp["endtime"] = pd.to_datetime([dct[n]["EndDateTime"]["#text"] for n in r]) adcp["q"] = np.asarray([float(dct[n]["Discharge"]["Total"]["#text"]) for n in r]) adcp["AreaQrev"] = np.asarray([float(dct[n]["Other"]["Area"]["#text"]) for n in r]) adcp["Width"] = np.asarray([float(dct[n]["Other"]["Width"]["#text"]) for n in r]) adcp["QoverA"] = np.asarray([float(dct[n]["Other"]["QoverA"]["#text"]) for n in r]) adcp["filename"] = np.asarray([dct[n]["Filename"]["#text"] for n in r]) adcp["qTop"] = np.asarray([float(dct[n]["Discharge"]["Top"]["#text"]) for n in r]) adcp["qMiddle"] = np.asarray( [float(dct[n]["Discharge"]["Middle"]["#text"]) for n in r] ) adcp["qBottom"] = np.asarray( [float(dct[n]["Discharge"]["Bottom"]["#text"]) for n in r] ) adcp["qLeft"] = np.asarray([float(dct[n]["Discharge"]["Left"]["#text"]) for n in r]) adcp["qRight"] = np.asarray( [float(dct[n]["Discharge"]["Right"]["#text"]) for n in r] ) adcp["LeftDistance"] = np.asarray( [float(dct[n]["Edge"]["LeftDistance"]["#text"]) for n in r] ) adcp["RightDistance"] = np.asarray( [float(dct[n]["Edge"]["RightDistance"]["#text"]) for n in r] ) adcp["time"] = pd.to_datetime( np.mean( [adcp["starttime"].view("i8"), adcp["endtime"].view("i8")], axis=0 ).astype("datetime64[ns]") ) adcp["qnegated"] = negateq if np.any(negateq): for v in ["q", "QoverA"]: adcp[v][adcp["qnegated"]] = -adcp[v][adcp["qnegated"]] df = pd.DataFrame( adcp, columns=[ "time", "q", "AreaQrev", "Width", "QoverA", "qnegated", "starttime", "endtime", "filename", "qTop", "qMiddle", "qBottom", "qLeft", "qRight", "LeftDistance", "RightDistance", ], ) df = df.set_index("time") if xarray: return df.to_xarray() else: return df
def linregress(adcp): """ Perform a linear regression and return slope, intercept, r value, p value, and standard error of the slope. This is just a wrapper around `scipy.stats.linregress()` """ ( adcp["slope"], adcp["intercept"], adcp["r_value"], adcp["p_value"], adcp["std_err"], ) = scipy.stats.linregress(adcp["veli"], adcp["Vca"]) return adcp