from __future__ import print_function
__all__ = ['NO', 'NP', 'NOP', 'MO', 'MP', 'MdnO', 'MdnP', 'STDO',
'STDP', 'RM', 'RMdn', 'MB', 'MdnB', 'WDMB', 'WDMdnB',
'FB', 'MNB', 'MdnNB', 'NMB', 'NMdnB', 'USUTPB', 'PSUTMNPB',
'PSUTMdnNPB', 'PSUTNMPB', 'PSUTNMdnPB', 'ME', 'MdnE', 'WDME',
'WDMdnE', 'FE', 'MNE', 'MdnNE', 'NME', 'NMdnE', 'USUTPE',
'PSUTMNPE', 'PSUTMdnNPE', 'PSUTNMPE', 'PSUTNMdnPE', 'R2',
'RMSE', 'RMSEs', 'RMSEu', 'E1', 'IOA', 'd1', 'AC', 'WDIOA',
'WDRMSE', 'WDAC']
from .pncload import createconsole
import numpy as np
[docs]
def STDO(obs, mod, axis=None):
""" Standard deviation of Observations """
return np.ma.std(obs, axis=axis)
[docs]
def STDP(obs, mod, axis=None):
""" Standard deviation of Predictions """
return np.ma.std(mod, axis=axis)
[docs]
def MNB(obs, mod, axis=None):
""" Mean Normalized Bias (%)"""
return np.ma.masked_invalid((mod - obs) / obs).mean(axis=axis) * 100.
[docs]
def MNE(obs, mod, axis=None):
""" Mean Normalized Gross Error (%)"""
ne = np.ma.abs(mod - obs) / obs
return np.ma.masked_invalid(ne).mean(axis=axis) * 100.
[docs]
def MdnNB(obs, mod, axis=None):
""" Median Normalized Bias (%)"""
nb = (mod - obs) / obs
return np.ma.median(np.ma.masked_invalid(nb), axis=axis) * 100.
[docs]
def MdnNE(obs, mod, axis=None):
""" Median Normalized Gross Error (%)"""
ne = np.ma.abs(mod - obs) / obs
return np.ma.median(np.ma.masked_invalid(ne), axis=axis) * 100.
[docs]
def NO(obs, mod, axis=None):
""" N Observations (#)"""
return (~np.ma.getmaskarray(obs)).sum(axis=axis)
[docs]
def NOP(obs, mod, axis=None):
""" N Observations/Prediction Pairs (#)"""
obsc, modc = matchmasks(obs, mod)
return (~np.ma.getmaskarray(obsc)).sum(axis=axis)
[docs]
def NP(obs, mod, axis=None):
""" N Predictions (#)"""
return (~np.ma.getmaskarray(mod)).sum(axis=axis)
[docs]
def MO(obs, mod, axis=None):
""" Mean Observations (obs unit)"""
return obs.mean(axis=axis)
[docs]
def MP(obs, mod, axis=None):
""" Mean Predictions (model unit)"""
return mod.mean(axis=axis)
[docs]
def MdnO(obs, mod, axis=None):
""" Median Observations (obs unit)"""
return np.ma.median(obs, axis=axis)
[docs]
def MdnP(obs, mod, axis=None):
""" Median Predictions (model unit)"""
return np.ma.median(mod, axis=axis)
[docs]
def RM(obs, mod, axis=None):
""" Mean Ratio Observations/Predictions (none)"""
return np.ma.masked_invalid(obs / mod).mean(axis=axis)
[docs]
def RMdn(obs, mod, axis=None):
""" Median Ratio Observations/Predictions (none)"""
return np.ma.median(np.ma.masked_invalid(obs / mod), axis=axis)
[docs]
def MB(obs, mod, axis=None):
""" Mean Bias"""
return (mod - obs).mean(axis=axis)
[docs]
def MdnB(obs, mod, axis=None):
""" Median Bias"""
return np.ma.median(mod - obs, axis=axis)
[docs]
def WDMB(obs, mod, axis=None):
""" Wind Direction Mean Bias"""
return circlebias(mod - obs).mean(axis=axis)
[docs]
def WDMdnB(obs, mod, axis=None):
""" Wind Direction Median Bias"""
return np.ma.median(circlebias(mod - obs), axis=axis)
[docs]
def NMB(obs, mod, axis=None):
""" Normalized Mean Bias (%)"""
return (mod - obs).sum(axis=axis) / obs.sum(axis=axis) * 100.
[docs]
def NMdnB(obs, mod, axis=None):
""" Normalized Median Bias (%)"""
mdnb = np.ma.median(mod - obs, axis=axis)
mdno = np.ma.median(obs, axis=axis)
return mdnb / mdno * 100.
[docs]
def FB(obs, mod, axis=None):
""" Fractional Bias (%)"""
halffb = (mod - obs) / (mod + obs)
return ((np.ma.masked_invalid(halffb)).mean(axis=axis) * 2.) * 100.
[docs]
def ME(obs, mod, axis=None):
""" Mean Gross Error (model and obs unit)"""
return np.ma.abs(mod - obs).mean(axis=axis)
[docs]
def MdnE(obs, mod, axis=None):
""" Median Gross Error (model and obs unit)"""
return np.ma.median(np.ma.abs(mod - obs), axis=axis)
[docs]
def WDME(obs, mod, axis=None):
""" Wind Direction Mean Gross Error (model and obs unit)"""
return np.ma.abs(circlebias(mod - obs)).mean(axis=axis)
[docs]
def WDMdnE(obs, mod, axis=None):
""" Wind Direction Median Gross Error (model and obs unit)"""
cb = circlebias(mod - obs)
return np.ma.median(np.ma.abs(cb), axis=axis)
[docs]
def NME(obs, mod, axis=None):
""" Normalized Mean Error (%)"""
out = (np.ma.abs(mod - obs).sum(axis=axis) / obs.sum(axis=axis)) * 100
return out
[docs]
def NMdnE(obs, mod, axis=None):
""" Normalized Median Error (%)"""
out = np.ma.median(np.ma.abs(mod - obs), axis=axis) / \
np.ma.median(obs, axis=axis) * 100
return out
[docs]
def FE(obs, mod, axis=None):
""" Fractional Error (%)"""
return (np.ma.abs(mod - obs) / (mod + obs)).mean(axis=axis) * 2. * 100.
[docs]
def USUTPB(obs, mod, axis=None):
""" Unpaired Space/Unpaired Time Peak Bias (%)"""
modmax = mod.max(axis=axis)
obsmax = obs.max(axis=axis)
return ((modmax - obsmax) / obsmax) * 100.
[docs]
def USUTPE(obs, mod, axis=None):
""" Unpaired Space/Unpaired Time Peak Error (%)"""
obsmax = obs.max(axis=axis)
return (np.ma.abs(mod.max(axis=axis) - obsmax) / obsmax) * 100.
def MNPB(obs, mod, paxis, axis=None):
""" Mean Normalized Peak Bias (%)"""
obsmax = obs.max(axis=paxis)
return ((mod.max(axis=paxis) - obsmax) / obsmax).mean(axis=axis) * 100.
def MdnNPB(obs, mod, paxis, axis=None):
""" Median Normalized Peak Bias (%)"""
obsmax = obs.max(axis=paxis)
modmax = mod.max(axis=paxis)
return np.ma.median((modmax - obsmax) / obsmax, axis=axis) * 100.
def MNPE(obs, mod, paxis, axis=None):
""" Mean Normalized Peak Error (%)"""
modmax = mod.max(axis=paxis)
obsmax = obs.max(axis=paxis)
return ((np.ma.abs(modmax - obsmax)) / obsmax).mean(axis=axis) * 100.
def MdnNPE(obs, mod, paxis, axis=None):
""" Median Normalized Peak Bias (%)"""
modmax = mod.max(axis=paxis)
obsmax = obs.max(axis=paxis)
pe = np.ma.abs(modmax - obsmax)
return np.ma.median((pe) / obsmax, axis=axis) * 100.
def NMPB(obs, mod, paxis, axis=None):
""" Normalized Mean Peak Bias (%)"""
modmax = mod.max(axis=paxis)
obsmax = obs.max(axis=paxis)
return (modmax - obsmax).mean(axis=axis) / obsmax.mean(axis=axis) * 100.
def NMdnPB(obs, mod, paxis, axis=None):
""" Normalized Median Peak Bias (%)"""
modmax = mod.max(axis=paxis)
obsmax = obs.max(axis=paxis)
return (np.ma.median((modmax - obsmax), axis=axis) /
np.ma.median(obsmax, axis=axis) * 100.)
def NMPE(obs, mod, paxis, axis=None):
""" Normalized Mean Peak Error (%)"""
modmax = mod.max(axis=paxis)
obsmax = obs.max(axis=paxis)
return (np.ma.abs(modmax - obsmax).mean(axis=axis) /
obsmax.mean(axis=axis) * 100.)
def NMdnPE(obs, mod, paxis, axis=None):
""" Normalized Median Peak Bias (%)"""
modmax = mod.max(axis=paxis)
obsmax = obs.max(axis=paxis)
return (np.ma.median(np.ma.abs(modmax - obsmax), axis=axis) /
np.ma.median(obsmax, axis=axis) * 100.)
[docs]
def PSUTMNPB(obs, mod, axis=None):
""" Paired Space/Unpaired Time Mean Normalized Peak Bias (%)"""
return MNPB(obs, mod, paxis=0, axis=None)
[docs]
def PSUTMdnNPB(obs, mod, axis=None):
""" Paired Space/Unpaired Time Median Normalized Peak Bias (%)"""
return MdnNPB(obs, mod, paxis=0, axis=None)
[docs]
def PSUTMNPE(obs, mod, axis=None):
""" Paired Space/Unpaired Time Mean Normalized Peak Error (%)"""
return MNPE(obs, mod, paxis=0, axis=None)
[docs]
def PSUTMdnNPE(obs, mod, axis=None):
""" Paired Space/Unpaired Time Median Normalized Peak Error (%)"""
return MdnNPE(obs, mod, paxis=0, axis=None)
[docs]
def PSUTNMPB(obs, mod, axis=None):
""" Paired Space/Unpaired Time Normalized Mean Peak Bias (%)"""
return NMPB(obs, mod, paxis=0, axis=None)
[docs]
def PSUTNMPE(obs, mod, axis=None):
""" Paired Space/Unpaired Time Normalized Mean Peak Error (%)"""
return NMPE(obs, mod, paxis=0, axis=None)
[docs]
def PSUTNMdnPB(obs, mod, axis=None):
""" Paired Space/Unpaired Time Normalized Median Peak Bias (%)"""
return NMdnPB(obs, mod, paxis=0, axis=None)
[docs]
def PSUTNMdnPE(obs, mod, axis=None):
""" Paired Space/Unpaired Time Normalized Median Peak Error (%)"""
return NMdnPE(obs, mod, paxis=0, axis=None)
[docs]
def R2(obs, mod, axis=None):
""" Coefficient of Determination (unit squared)"""
from scipy.stats.mstats import pearsonr
if axis is None:
return pearsonr(obs, mod)[0]**2
else:
r = pearsonr
return apply_along_axis_2v(lambda x, y: r(x, y)[0]**2, axis, obs, mod)
def apply_along_axis_2v(func, axis, v1, v2, *args):
v1r = np.rollaxis(v1, axis, v1.ndim)
v2r = np.rollaxis(v2, axis, v2.ndim)
result = np.array([func(v1r[idx], v2r[idx])
for idx in np.ndindex(v2r.shape[:1])])
result = np.rollaxis(result[..., None], result.ndim, axis)
return result
[docs]
def RMSE(obs, mod, axis=None):
""" Root Mean Square Error (model unit)"""
return np.ma.sqrt(((mod - obs)**2).mean(axis=axis))
[docs]
def WDRMSE(obs, mod, axis=None):
""" Wind Direction Root Mean Square Error (model unit)"""
return np.ma.sqrt(((circlebias(mod - obs))**2).mean(axis=axis))
[docs]
def RMSEs(obs, mod, axis=None):
"""Root Mean Squared Error systematic (obs, mod_hat)"""
from scipy.stats.mstats import linregress
if axis is None:
try:
m, b, rval, pval, stderr = linregress(obs, mod)
mod_hat = b + m * obs
return RMSE(obs, mod_hat)
except ValueError:
return None
else:
myvals = apply_along_axis_2v(
lambda x, y: linregress(x, y), axis, obs, mod)
myvals = np.rollaxis(myvals, myvals.ndim - 1, 0).astype(obs.dtype)
m, b, rval, pval, stderr = myvals
mod_hat = b + m * obs
result = RMSE(obs, mod_hat, axis=axis)
return result
def matchmasks(a1, a2):
mask = np.ma.getmaskarray(a1) | np.ma.getmaskarray(a2)
return np.ma.masked_where(mask, a1), np.ma.masked_where(mask, a2)
def matchedcompressed(a1, a2):
a1, a2 = matchmasks(a1, a2)
return a1.compressed(), a2.compressed()
[docs]
def RMSEu(obs, mod, axis=None):
"""Root Mean Squared Error unsystematic (mod_hat, mod)"""
from scipy.stats.mstats import linregress
if axis is None:
try:
m, b, rval, pval, stderr = linregress(obs, mod)
mod_hat = b + m * obs
return RMSE(mod_hat, mod)
except ValueError:
return None
else:
myvals = apply_along_axis_2v(
lambda x, y: linregress(x, y), axis, obs, mod)
myvals = np.rollaxis(myvals, myvals.ndim - 1, 0).astype(obs.dtype)
m, b, rval, pval, stderr = myvals
mod_hat = b + m * obs
result = RMSE(mod_hat, mod, axis=axis)
return result
[docs]
def d1(obs, mod, axis=None):
""" Modified Index of Agreement, d1"""
return (1.0 -
np.ma.abs(obs - mod).sum(axis=axis) /
(np.ma.abs(mod - obs.mean(axis=axis)) +
np.ma.abs(obs - obs.mean(axis=axis))).sum(axis=axis))
[docs]
def E1(obs, mod, axis=None):
""" Modified Coefficient of Efficiency, E1"""
return (1.0 -
np.ma.abs(obs - mod).sum(axis=axis) /
(np.ma.abs(obs - obs.mean(axis=axis))).sum(axis=axis))
[docs]
def IOA(obs, mod, axis=None):
""" Index of Agreement, IOA"""
obsmean = obs.mean(axis=axis)
if axis is not None:
obsmean = np.expand_dims(obsmean, axis=axis)
return (1.0 -
(np.ma.abs(obs - mod)**2).sum(axis=axis) /
((np.ma.abs(mod - obsmean) +
np.ma.abs(obs - obsmean))**2).sum(axis=axis))
def circlebias(b):
b = np.ma.where(b > 180, b - 360, b)
b = np.ma.where(b < -180, b + 360, b)
return b
[docs]
def WDIOA(obs, mod, axis=None):
""" Wind Direction Index of Agreement, IOA"""
obsmean = obs.mean(axis=axis)
if axis is not None:
obsmean = np.expand_dims(obsmean, axis=axis)
b = circlebias(mod - obs)
bhat = circlebias(mod - obsmean)
ohat = circlebias(obs - obsmean)
return (1.0 - (np.ma.abs(b)**2).sum(axis=axis) /
((np.ma.abs(bhat) + np.ma.abs(ohat))**2).sum(axis=axis))
[docs]
def AC(obs, mod, axis=None):
""" Anomaly Correlation """
obs_bar = obs.mean(axis=axis)
if axis is not None:
obs_bar = np.expand_dims(obs_bar, axis=axis)
p1 = ((mod - obs_bar) * (obs - obs_bar)).sum(axis=axis)
p2 = (((mod - obs_bar)**2).sum(axis=axis) *
((obs - obs_bar)**2).sum(axis=axis))**0.5
return p1 / p2
[docs]
def WDAC(obs, mod, axis=None):
""" Wind Direction Anomaly Correlation """
obs_bar = obs.mean(axis=axis)
if axis is not None:
obs_bar = np.expand_dims(obs_bar, axis=axis)
p1 = (circlebias(mod - obs_bar) *
circlebias(obs - obs_bar)).sum(axis=axis)
p2 = ((circlebias(mod - obs_bar)**2).sum(axis=axis) *
(circlebias(obs - obs_bar)**2).sum(axis=axis))**0.5
return p1 / p2
def stat_spatial(ifile0, ifile1, funcs=__all__,
variables=['O3'], counties=False):
"""
ifile0 - left hand side of equation
ifile1 - right hand side of equation
variables - list of variables to plot
"""
from mpl_toolkits.basemap import Basemap
from matplotlib.pyplot import figure, show
lonlatcoords = getattr(ifile0, 'lonlatcoords',
getattr(ifile1, 'lonlatcoords', ''))
lon, lat = np.array(
map(lambda x: map(float, x.split(',')), lonlatcoords.split('/'))).T
latmin, latmax = lat.min(), lat.max()
lonmin, lonmax = lon.min(), lon.max()
bmap = Basemap(llcrnrlat=latmin, llcrnrlon=lonmin,
urcrnrlat=latmax, urcrnrlon=lonmax, resolution='i')
for vark in variables:
for statname in funcs:
statfunc = eval(statname)
fig = figure()
ax = fig.add_subplot(111)
ax.set_title(vark + ' ' + statname)
var_0 = ifile0.variables[vark]
var_1 = ifile1.variables[vark]
try:
pidx = list(var_0.dimensions).index('points')
except Exception:
pidx = list(var_1.dimensions).index('points')
statvs = []
for sitei in range(var_0.shape[pidx]):
val_0 = var_0[:].take([sitei], axis=pidx).ravel()
val_1 = var_1[:].take([sitei], axis=pidx).ravel()
statvs.append(statfunc(val_0, val_1))
dots = bmap.scatter(x=lon, y=lat, c=statvs, ax=ax, cmap='jet')
bmap.drawcoastlines(ax=ax)
bmap.drawcountries(ax=ax)
bmap.drawstates(ax=ax)
fig.colorbar(dots)
if counties:
bmap.counties(ax=ax)
show()
def stat_timeseries(ifile0, ifile1, variables=['O3'], counties=False):
from pylab import figure, show
for vark in variables:
for statname in __all__:
statfunc = eval(statname)
fig = figure()
ax = fig.add_subplot(111)
ax.set_title(vark + ' ' + statname)
var_0 = ifile0.variables[vark]
var_1 = ifile1.variables[vark]
tidx = list(var_0.dimensions).index('time')
statvs = []
for timei in range(var_0.shape[tidx]):
val_0 = var_0[:].take([timei], axis=tidx).ravel()
val_1 = var_1[:].take([timei], axis=tidx).ravel()
statvs.append(statfunc(val_0, val_1))
ax.plot(statvs)
show()
def pnceval(args):
from warnings import warn
from collections import OrderedDict
from PseudoNetCDF.core._functions import pncbfunc
if args.variables is None:
args.variables = set(
args.ifiles[0].variables.keys()).difference(args.coordkeys)
console = createconsole(args.ifiles, args)
ifile0, ifile1 = args.ifiles
from PseudoNetCDF.coordutil import gettimes
print('# ifile0=' + args.ipath[0])
print('# ifile1=' + args.ipath[1])
print('# Stats calculated as func(ifile0, ifile1)')
print('# Generally: func(obs, mod, ....)')
for ipath, ifile in zip(args.ipath, args.ifiles):
print('# Meta-data from %s' % ipath)
try:
times = gettimes(ifile)
tstart = times[:].min()
tstop = times[:].max()
print('# Date Range: ' + str(tstart) + ' to ' + str(tstop))
except Exception as e:
warn(str(e))
try:
lon = ifile.variables['longitude']
print('# Longitude Range: ' + str(float(lon.min())) +
' to ' + str(float(lon.max())))
except Exception as e:
warn(str(e))
try:
lat = ifile.variables['latitude']
print('# Latitude Range: ' + str(float(lat.min())) +
' to ' + str(float(lat.max())))
except Exception as e:
warn(str(e))
np.seterr(divide='ignore', invalid='ignore')
output = OrderedDict()
for k in args.funcs:
console.locals[k] = func = eval(k)
output[k] = OrderedDict()
if args.csv:
print('# %s: %s' % (k, func.__doc__.strip()))
try:
console.locals[k + '_f'] = ofile = pncbfunc(func, ifile0, ifile1)
except Exception as e:
warn("Skipped " + k + ';' + str(e))
continue
for vk in args.variables:
if vk in ('time', 'TFLAG'):
continue
if args.csv:
outv = ofile.variables[vk][:]
outv = np.ma.array(outv)
output[k][vk] = outv.ravel()[0]
else:
print('%s,%s,%s,%f' % (vk, func.__doc__.strip(),
k, ofile.variables[vk].ravel()[0]))
if args.csv:
print(','.join(['VAR'] + args.funcs))
for vk in args.variables:
print(','.join([vk] + ['%f' % output[fk].get(vk, np.nan)
for fk in args.funcs]))
np.seterr(divide='warn', invalid='warn')
if args.interactive:
console.interact()
def main():
from .pncparse import getparser, pncparse
parser = getparser(has_ofile=False, plot_options=False, interactive=True)
parser.add_argument('--csv', dest='csv', default=False,
action='store_true',
help='Print all data in CSV format')
parser.add_argument('--funcs', default=__all__,
type=lambda x: x.split(','),
help=('Functions to evaluate split by , ' +
'(default: %s)') % ','.join(__all__))
ifiles, args = pncparse(has_ofile=False, interactive=True, parser=parser)
args.ifiles = ifiles
pnceval(args)
if __name__ == '__main__':
main()