Source code for PseudoNetCDF.pnceval

from __future__ import print_function
__all__ = ['NO', 'NP', 'NOP', 'MO', 'MP', 'MdnO', 'MdnP', 'STDO',
           'STDP', 'RM', 'RMdn', 'MB', 'MdnB', 'WDMB', 'WDMdnB',
           'FB', 'MNB', 'MdnNB', 'NMB', 'NMdnB', 'USUTPB', 'PSUTMNPB',
           'PSUTMdnNPB', 'PSUTNMPB', 'PSUTNMdnPB', 'ME', 'MdnE', 'WDME',
           'WDMdnE', 'FE', 'MNE', 'MdnNE', 'NME', 'NMdnE', 'USUTPE',
           'PSUTMNPE', 'PSUTMdnNPE', 'PSUTNMPE', 'PSUTNMdnPE', 'R2',
           'RMSE', 'RMSEs', 'RMSEu', 'E1', 'IOA', 'd1', 'AC', 'WDIOA',
           'WDRMSE', 'WDAC']

from .pncload import createconsole
import numpy as np


[docs] def STDO(obs, mod, axis=None): """ Standard deviation of Observations """ return np.ma.std(obs, axis=axis)
[docs] def STDP(obs, mod, axis=None): """ Standard deviation of Predictions """ return np.ma.std(mod, axis=axis)
[docs] def MNB(obs, mod, axis=None): """ Mean Normalized Bias (%)""" return np.ma.masked_invalid((mod - obs) / obs).mean(axis=axis) * 100.
[docs] def MNE(obs, mod, axis=None): """ Mean Normalized Gross Error (%)""" ne = np.ma.abs(mod - obs) / obs return np.ma.masked_invalid(ne).mean(axis=axis) * 100.
[docs] def MdnNB(obs, mod, axis=None): """ Median Normalized Bias (%)""" nb = (mod - obs) / obs return np.ma.median(np.ma.masked_invalid(nb), axis=axis) * 100.
[docs] def MdnNE(obs, mod, axis=None): """ Median Normalized Gross Error (%)""" ne = np.ma.abs(mod - obs) / obs return np.ma.median(np.ma.masked_invalid(ne), axis=axis) * 100.
[docs] def NO(obs, mod, axis=None): """ N Observations (#)""" return (~np.ma.getmaskarray(obs)).sum(axis=axis)
[docs] def NOP(obs, mod, axis=None): """ N Observations/Prediction Pairs (#)""" obsc, modc = matchmasks(obs, mod) return (~np.ma.getmaskarray(obsc)).sum(axis=axis)
[docs] def NP(obs, mod, axis=None): """ N Predictions (#)""" return (~np.ma.getmaskarray(mod)).sum(axis=axis)
[docs] def MO(obs, mod, axis=None): """ Mean Observations (obs unit)""" return obs.mean(axis=axis)
[docs] def MP(obs, mod, axis=None): """ Mean Predictions (model unit)""" return mod.mean(axis=axis)
[docs] def MdnO(obs, mod, axis=None): """ Median Observations (obs unit)""" return np.ma.median(obs, axis=axis)
[docs] def MdnP(obs, mod, axis=None): """ Median Predictions (model unit)""" return np.ma.median(mod, axis=axis)
[docs] def RM(obs, mod, axis=None): """ Mean Ratio Observations/Predictions (none)""" return np.ma.masked_invalid(obs / mod).mean(axis=axis)
[docs] def RMdn(obs, mod, axis=None): """ Median Ratio Observations/Predictions (none)""" return np.ma.median(np.ma.masked_invalid(obs / mod), axis=axis)
[docs] def MB(obs, mod, axis=None): """ Mean Bias""" return (mod - obs).mean(axis=axis)
[docs] def MdnB(obs, mod, axis=None): """ Median Bias""" return np.ma.median(mod - obs, axis=axis)
[docs] def WDMB(obs, mod, axis=None): """ Wind Direction Mean Bias""" return circlebias(mod - obs).mean(axis=axis)
[docs] def WDMdnB(obs, mod, axis=None): """ Wind Direction Median Bias""" return np.ma.median(circlebias(mod - obs), axis=axis)
[docs] def NMB(obs, mod, axis=None): """ Normalized Mean Bias (%)""" return (mod - obs).sum(axis=axis) / obs.sum(axis=axis) * 100.
[docs] def NMdnB(obs, mod, axis=None): """ Normalized Median Bias (%)""" mdnb = np.ma.median(mod - obs, axis=axis) mdno = np.ma.median(obs, axis=axis) return mdnb / mdno * 100.
[docs] def FB(obs, mod, axis=None): """ Fractional Bias (%)""" halffb = (mod - obs) / (mod + obs) return ((np.ma.masked_invalid(halffb)).mean(axis=axis) * 2.) * 100.
[docs] def ME(obs, mod, axis=None): """ Mean Gross Error (model and obs unit)""" return np.ma.abs(mod - obs).mean(axis=axis)
[docs] def MdnE(obs, mod, axis=None): """ Median Gross Error (model and obs unit)""" return np.ma.median(np.ma.abs(mod - obs), axis=axis)
[docs] def WDME(obs, mod, axis=None): """ Wind Direction Mean Gross Error (model and obs unit)""" return np.ma.abs(circlebias(mod - obs)).mean(axis=axis)
[docs] def WDMdnE(obs, mod, axis=None): """ Wind Direction Median Gross Error (model and obs unit)""" cb = circlebias(mod - obs) return np.ma.median(np.ma.abs(cb), axis=axis)
[docs] def NME(obs, mod, axis=None): """ Normalized Mean Error (%)""" out = (np.ma.abs(mod - obs).sum(axis=axis) / obs.sum(axis=axis)) * 100 return out
[docs] def NMdnE(obs, mod, axis=None): """ Normalized Median Error (%)""" out = np.ma.median(np.ma.abs(mod - obs), axis=axis) / \ np.ma.median(obs, axis=axis) * 100 return out
[docs] def FE(obs, mod, axis=None): """ Fractional Error (%)""" return (np.ma.abs(mod - obs) / (mod + obs)).mean(axis=axis) * 2. * 100.
[docs] def USUTPB(obs, mod, axis=None): """ Unpaired Space/Unpaired Time Peak Bias (%)""" modmax = mod.max(axis=axis) obsmax = obs.max(axis=axis) return ((modmax - obsmax) / obsmax) * 100.
[docs] def USUTPE(obs, mod, axis=None): """ Unpaired Space/Unpaired Time Peak Error (%)""" obsmax = obs.max(axis=axis) return (np.ma.abs(mod.max(axis=axis) - obsmax) / obsmax) * 100.
def MNPB(obs, mod, paxis, axis=None): """ Mean Normalized Peak Bias (%)""" obsmax = obs.max(axis=paxis) return ((mod.max(axis=paxis) - obsmax) / obsmax).mean(axis=axis) * 100. def MdnNPB(obs, mod, paxis, axis=None): """ Median Normalized Peak Bias (%)""" obsmax = obs.max(axis=paxis) modmax = mod.max(axis=paxis) return np.ma.median((modmax - obsmax) / obsmax, axis=axis) * 100. def MNPE(obs, mod, paxis, axis=None): """ Mean Normalized Peak Error (%)""" modmax = mod.max(axis=paxis) obsmax = obs.max(axis=paxis) return ((np.ma.abs(modmax - obsmax)) / obsmax).mean(axis=axis) * 100. def MdnNPE(obs, mod, paxis, axis=None): """ Median Normalized Peak Bias (%)""" modmax = mod.max(axis=paxis) obsmax = obs.max(axis=paxis) pe = np.ma.abs(modmax - obsmax) return np.ma.median((pe) / obsmax, axis=axis) * 100. def NMPB(obs, mod, paxis, axis=None): """ Normalized Mean Peak Bias (%)""" modmax = mod.max(axis=paxis) obsmax = obs.max(axis=paxis) return (modmax - obsmax).mean(axis=axis) / obsmax.mean(axis=axis) * 100. def NMdnPB(obs, mod, paxis, axis=None): """ Normalized Median Peak Bias (%)""" modmax = mod.max(axis=paxis) obsmax = obs.max(axis=paxis) return (np.ma.median((modmax - obsmax), axis=axis) / np.ma.median(obsmax, axis=axis) * 100.) def NMPE(obs, mod, paxis, axis=None): """ Normalized Mean Peak Error (%)""" modmax = mod.max(axis=paxis) obsmax = obs.max(axis=paxis) return (np.ma.abs(modmax - obsmax).mean(axis=axis) / obsmax.mean(axis=axis) * 100.) def NMdnPE(obs, mod, paxis, axis=None): """ Normalized Median Peak Bias (%)""" modmax = mod.max(axis=paxis) obsmax = obs.max(axis=paxis) return (np.ma.median(np.ma.abs(modmax - obsmax), axis=axis) / np.ma.median(obsmax, axis=axis) * 100.)
[docs] def PSUTMNPB(obs, mod, axis=None): """ Paired Space/Unpaired Time Mean Normalized Peak Bias (%)""" return MNPB(obs, mod, paxis=0, axis=None)
[docs] def PSUTMdnNPB(obs, mod, axis=None): """ Paired Space/Unpaired Time Median Normalized Peak Bias (%)""" return MdnNPB(obs, mod, paxis=0, axis=None)
[docs] def PSUTMNPE(obs, mod, axis=None): """ Paired Space/Unpaired Time Mean Normalized Peak Error (%)""" return MNPE(obs, mod, paxis=0, axis=None)
[docs] def PSUTMdnNPE(obs, mod, axis=None): """ Paired Space/Unpaired Time Median Normalized Peak Error (%)""" return MdnNPE(obs, mod, paxis=0, axis=None)
[docs] def PSUTNMPB(obs, mod, axis=None): """ Paired Space/Unpaired Time Normalized Mean Peak Bias (%)""" return NMPB(obs, mod, paxis=0, axis=None)
[docs] def PSUTNMPE(obs, mod, axis=None): """ Paired Space/Unpaired Time Normalized Mean Peak Error (%)""" return NMPE(obs, mod, paxis=0, axis=None)
[docs] def PSUTNMdnPB(obs, mod, axis=None): """ Paired Space/Unpaired Time Normalized Median Peak Bias (%)""" return NMdnPB(obs, mod, paxis=0, axis=None)
[docs] def PSUTNMdnPE(obs, mod, axis=None): """ Paired Space/Unpaired Time Normalized Median Peak Error (%)""" return NMdnPE(obs, mod, paxis=0, axis=None)
[docs] def R2(obs, mod, axis=None): """ Coefficient of Determination (unit squared)""" from scipy.stats.mstats import pearsonr if axis is None: return pearsonr(obs, mod)[0]**2 else: r = pearsonr return apply_along_axis_2v(lambda x, y: r(x, y)[0]**2, axis, obs, mod)
def apply_along_axis_2v(func, axis, v1, v2, *args): v1r = np.rollaxis(v1, axis, v1.ndim) v2r = np.rollaxis(v2, axis, v2.ndim) result = np.array([func(v1r[idx], v2r[idx]) for idx in np.ndindex(v2r.shape[:1])]) result = np.rollaxis(result[..., None], result.ndim, axis) return result
[docs] def RMSE(obs, mod, axis=None): """ Root Mean Square Error (model unit)""" return np.ma.sqrt(((mod - obs)**2).mean(axis=axis))
[docs] def WDRMSE(obs, mod, axis=None): """ Wind Direction Root Mean Square Error (model unit)""" return np.ma.sqrt(((circlebias(mod - obs))**2).mean(axis=axis))
[docs] def RMSEs(obs, mod, axis=None): """Root Mean Squared Error systematic (obs, mod_hat)""" from scipy.stats.mstats import linregress if axis is None: try: m, b, rval, pval, stderr = linregress(obs, mod) mod_hat = b + m * obs return RMSE(obs, mod_hat) except ValueError: return None else: myvals = apply_along_axis_2v( lambda x, y: linregress(x, y), axis, obs, mod) myvals = np.rollaxis(myvals, myvals.ndim - 1, 0).astype(obs.dtype) m, b, rval, pval, stderr = myvals mod_hat = b + m * obs result = RMSE(obs, mod_hat, axis=axis) return result
def matchmasks(a1, a2): mask = np.ma.getmaskarray(a1) | np.ma.getmaskarray(a2) return np.ma.masked_where(mask, a1), np.ma.masked_where(mask, a2) def matchedcompressed(a1, a2): a1, a2 = matchmasks(a1, a2) return a1.compressed(), a2.compressed()
[docs] def RMSEu(obs, mod, axis=None): """Root Mean Squared Error unsystematic (mod_hat, mod)""" from scipy.stats.mstats import linregress if axis is None: try: m, b, rval, pval, stderr = linregress(obs, mod) mod_hat = b + m * obs return RMSE(mod_hat, mod) except ValueError: return None else: myvals = apply_along_axis_2v( lambda x, y: linregress(x, y), axis, obs, mod) myvals = np.rollaxis(myvals, myvals.ndim - 1, 0).astype(obs.dtype) m, b, rval, pval, stderr = myvals mod_hat = b + m * obs result = RMSE(mod_hat, mod, axis=axis) return result
[docs] def d1(obs, mod, axis=None): """ Modified Index of Agreement, d1""" return (1.0 - np.ma.abs(obs - mod).sum(axis=axis) / (np.ma.abs(mod - obs.mean(axis=axis)) + np.ma.abs(obs - obs.mean(axis=axis))).sum(axis=axis))
[docs] def E1(obs, mod, axis=None): """ Modified Coefficient of Efficiency, E1""" return (1.0 - np.ma.abs(obs - mod).sum(axis=axis) / (np.ma.abs(obs - obs.mean(axis=axis))).sum(axis=axis))
[docs] def IOA(obs, mod, axis=None): """ Index of Agreement, IOA""" obsmean = obs.mean(axis=axis) if axis is not None: obsmean = np.expand_dims(obsmean, axis=axis) return (1.0 - (np.ma.abs(obs - mod)**2).sum(axis=axis) / ((np.ma.abs(mod - obsmean) + np.ma.abs(obs - obsmean))**2).sum(axis=axis))
def circlebias(b): b = np.ma.where(b > 180, b - 360, b) b = np.ma.where(b < -180, b + 360, b) return b
[docs] def WDIOA(obs, mod, axis=None): """ Wind Direction Index of Agreement, IOA""" obsmean = obs.mean(axis=axis) if axis is not None: obsmean = np.expand_dims(obsmean, axis=axis) b = circlebias(mod - obs) bhat = circlebias(mod - obsmean) ohat = circlebias(obs - obsmean) return (1.0 - (np.ma.abs(b)**2).sum(axis=axis) / ((np.ma.abs(bhat) + np.ma.abs(ohat))**2).sum(axis=axis))
[docs] def AC(obs, mod, axis=None): """ Anomaly Correlation """ obs_bar = obs.mean(axis=axis) if axis is not None: obs_bar = np.expand_dims(obs_bar, axis=axis) p1 = ((mod - obs_bar) * (obs - obs_bar)).sum(axis=axis) p2 = (((mod - obs_bar)**2).sum(axis=axis) * ((obs - obs_bar)**2).sum(axis=axis))**0.5 return p1 / p2
[docs] def WDAC(obs, mod, axis=None): """ Wind Direction Anomaly Correlation """ obs_bar = obs.mean(axis=axis) if axis is not None: obs_bar = np.expand_dims(obs_bar, axis=axis) p1 = (circlebias(mod - obs_bar) * circlebias(obs - obs_bar)).sum(axis=axis) p2 = ((circlebias(mod - obs_bar)**2).sum(axis=axis) * (circlebias(obs - obs_bar)**2).sum(axis=axis))**0.5 return p1 / p2
def stat_spatial(ifile0, ifile1, funcs=__all__, variables=['O3'], counties=False): """ ifile0 - left hand side of equation ifile1 - right hand side of equation variables - list of variables to plot """ from mpl_toolkits.basemap import Basemap from matplotlib.pyplot import figure, show lonlatcoords = getattr(ifile0, 'lonlatcoords', getattr(ifile1, 'lonlatcoords', '')) lon, lat = np.array( map(lambda x: map(float, x.split(',')), lonlatcoords.split('/'))).T latmin, latmax = lat.min(), lat.max() lonmin, lonmax = lon.min(), lon.max() bmap = Basemap(llcrnrlat=latmin, llcrnrlon=lonmin, urcrnrlat=latmax, urcrnrlon=lonmax, resolution='i') for vark in variables: for statname in funcs: statfunc = eval(statname) fig = figure() ax = fig.add_subplot(111) ax.set_title(vark + ' ' + statname) var_0 = ifile0.variables[vark] var_1 = ifile1.variables[vark] try: pidx = list(var_0.dimensions).index('points') except Exception: pidx = list(var_1.dimensions).index('points') statvs = [] for sitei in range(var_0.shape[pidx]): val_0 = var_0[:].take([sitei], axis=pidx).ravel() val_1 = var_1[:].take([sitei], axis=pidx).ravel() statvs.append(statfunc(val_0, val_1)) dots = bmap.scatter(x=lon, y=lat, c=statvs, ax=ax, cmap='jet') bmap.drawcoastlines(ax=ax) bmap.drawcountries(ax=ax) bmap.drawstates(ax=ax) fig.colorbar(dots) if counties: bmap.counties(ax=ax) show() def stat_timeseries(ifile0, ifile1, variables=['O3'], counties=False): from pylab import figure, show for vark in variables: for statname in __all__: statfunc = eval(statname) fig = figure() ax = fig.add_subplot(111) ax.set_title(vark + ' ' + statname) var_0 = ifile0.variables[vark] var_1 = ifile1.variables[vark] tidx = list(var_0.dimensions).index('time') statvs = [] for timei in range(var_0.shape[tidx]): val_0 = var_0[:].take([timei], axis=tidx).ravel() val_1 = var_1[:].take([timei], axis=tidx).ravel() statvs.append(statfunc(val_0, val_1)) ax.plot(statvs) show() def pnceval(args): from warnings import warn from collections import OrderedDict from PseudoNetCDF.core._functions import pncbfunc if args.variables is None: args.variables = set( args.ifiles[0].variables.keys()).difference(args.coordkeys) console = createconsole(args.ifiles, args) ifile0, ifile1 = args.ifiles from PseudoNetCDF.coordutil import gettimes print('# ifile0=' + args.ipath[0]) print('# ifile1=' + args.ipath[1]) print('# Stats calculated as func(ifile0, ifile1)') print('# Generally: func(obs, mod, ....)') for ipath, ifile in zip(args.ipath, args.ifiles): print('# Meta-data from %s' % ipath) try: times = gettimes(ifile) tstart = times[:].min() tstop = times[:].max() print('# Date Range: ' + str(tstart) + ' to ' + str(tstop)) except Exception as e: warn(str(e)) try: lon = ifile.variables['longitude'] print('# Longitude Range: ' + str(float(lon.min())) + ' to ' + str(float(lon.max()))) except Exception as e: warn(str(e)) try: lat = ifile.variables['latitude'] print('# Latitude Range: ' + str(float(lat.min())) + ' to ' + str(float(lat.max()))) except Exception as e: warn(str(e)) np.seterr(divide='ignore', invalid='ignore') output = OrderedDict() for k in args.funcs: console.locals[k] = func = eval(k) output[k] = OrderedDict() if args.csv: print('# %s: %s' % (k, func.__doc__.strip())) try: console.locals[k + '_f'] = ofile = pncbfunc(func, ifile0, ifile1) except Exception as e: warn("Skipped " + k + ';' + str(e)) continue for vk in args.variables: if vk in ('time', 'TFLAG'): continue if args.csv: outv = ofile.variables[vk][:] outv = np.ma.array(outv) output[k][vk] = outv.ravel()[0] else: print('%s,%s,%s,%f' % (vk, func.__doc__.strip(), k, ofile.variables[vk].ravel()[0])) if args.csv: print(','.join(['VAR'] + args.funcs)) for vk in args.variables: print(','.join([vk] + ['%f' % output[fk].get(vk, np.nan) for fk in args.funcs])) np.seterr(divide='warn', invalid='warn') if args.interactive: console.interact() def main(): from .pncparse import getparser, pncparse parser = getparser(has_ofile=False, plot_options=False, interactive=True) parser.add_argument('--csv', dest='csv', default=False, action='store_true', help='Print all data in CSV format') parser.add_argument('--funcs', default=__all__, type=lambda x: x.split(','), help=('Functions to evaluate split by , ' + '(default: %s)') % ','.join(__all__)) ifiles, args = pncparse(has_ofile=False, interactive=True, parser=parser) args.ifiles = ifiles pnceval(args) if __name__ == '__main__': main()