import numpy as np
import pandas as pd
import scipy.stats
import xmltodict
def read_areacomp_stationarea(filename):
return read_areacomp(filename)
def read_areacomp_stationstage(filename):
return read_areacomp(filename)
def read_areacomp(filename):
return pd.read_csv(filename, skiprows=4)
def read_qrev_xml(filename, encoding="utf-8"):
with open(filename, encoding=encoding) as fd:
return xmltodict.parse(fd.read())
[docs]def parse_qrev_xml(doc, negateq=False, xarray=False):
"""
Parse XML output from QRev and return as a Pandas DataFrame
Parameters
----------
doc : dict
XML document as generated by `xmltodict.parse()` of the QRev XML file.
negateq : bool or list of bools, optional, default False
Negate q (discharge) values. Useful for changing the upstream/
downstream convention. Default False
If negateq is a single bool, negate all transects.
If negateq is a a list of bools, only negate those whose value is True
xarray : bool, optional, default False
If True, return an xarray Dataset
Returns
-------
pandas.DataFrame
pandas DataFrame of relevant values extracted from the QRev XML tree.
if xarray is True, returns an xarray Dataset
"""
adcp = {}
dct = doc["Channel"]["Transect"]
r = range(len(dct))
adcp["starttime"] = pd.to_datetime([dct[n]["StartDateTime"]["#text"] for n in r])
adcp["endtime"] = pd.to_datetime([dct[n]["EndDateTime"]["#text"] for n in r])
adcp["q"] = np.asarray([float(dct[n]["Discharge"]["Total"]["#text"]) for n in r])
adcp["AreaQrev"] = np.asarray([float(dct[n]["Other"]["Area"]["#text"]) for n in r])
adcp["Width"] = np.asarray([float(dct[n]["Other"]["Width"]["#text"]) for n in r])
adcp["QoverA"] = np.asarray([float(dct[n]["Other"]["QoverA"]["#text"]) for n in r])
adcp["filename"] = np.asarray([dct[n]["Filename"]["#text"] for n in r])
adcp["qTop"] = np.asarray([float(dct[n]["Discharge"]["Top"]["#text"]) for n in r])
adcp["qMiddle"] = np.asarray(
[float(dct[n]["Discharge"]["Middle"]["#text"]) for n in r]
)
adcp["qBottom"] = np.asarray(
[float(dct[n]["Discharge"]["Bottom"]["#text"]) for n in r]
)
adcp["qLeft"] = np.asarray([float(dct[n]["Discharge"]["Left"]["#text"]) for n in r])
adcp["qRight"] = np.asarray(
[float(dct[n]["Discharge"]["Right"]["#text"]) for n in r]
)
adcp["LeftDistance"] = np.asarray(
[float(dct[n]["Edge"]["LeftDistance"]["#text"]) for n in r]
)
adcp["RightDistance"] = np.asarray(
[float(dct[n]["Edge"]["RightDistance"]["#text"]) for n in r]
)
adcp["time"] = pd.to_datetime(
np.mean(
[adcp["starttime"].view("i8"), adcp["endtime"].view("i8")], axis=0
).astype("datetime64[ns]")
)
adcp["qnegated"] = negateq
if np.any(negateq):
for v in ["q", "QoverA"]:
adcp[v][adcp["qnegated"]] = -adcp[v][adcp["qnegated"]]
df = pd.DataFrame(
adcp,
columns=[
"time",
"q",
"AreaQrev",
"Width",
"QoverA",
"qnegated",
"starttime",
"endtime",
"filename",
"qTop",
"qMiddle",
"qBottom",
"qLeft",
"qRight",
"LeftDistance",
"RightDistance",
],
)
df = df.set_index("time")
if xarray:
return df.to_xarray()
else:
return df
def linregress(adcp):
"""
Perform a linear regression and return slope, intercept, r value, p value,
and standard error of the slope. This is just a wrapper around
`scipy.stats.linregress()`
"""
(
adcp["slope"],
adcp["intercept"],
adcp["r_value"],
adcp["p_value"],
adcp["std_err"],
) = scipy.stats.linregress(adcp["veli"], adcp["Vca"])
return adcp