Source code for aqmbc.models.geoscf

__all__ = ['download_window']
import PseudoNetCDF as pnc


def tryandtime(tmpf, label, verbose=1):
    """
    This function tries to download data in tmpf via OpenDAP. If it fails,
    then it tries again up to maxtries. The entire process is timed.

    Arguments
    ---------
    tmpf : xarray.Dataset
        File from GEOS-CF
    verbose : int
        Level of verbosity

    Returns
    -------
    dt : float
        Time elapsed.
    """
    import time

    maxtries = 10
    t0 = time.time()
    ntries = 0
    success = False
    while not success and ntries < maxtries:
        ntries += 1
        try:
            # Might fail
            tmpf.load()
            success = True
        except Exception as e:
            if verbose > 0:
                print(str(e), end='.retry.')

    t1 = time.time()
    dt = t1 - t0
    if verbose > 0:
        msg = f'{label} {dt:.0f}s'
        if ntries > 1:
            msg = f'{msg} ({ntries} tries)'
        print(msg)

    return dt


[docs]def download_window( gdnam, dates, sleep=60, metvars=None, chmvars=None, xgcvars=None ): """ Arguments --------- gdnam : str Grid name with definition in GRIDDESC at environ['GRIDDESC'], ./GRIDDESC or in the GRIDDESC distributed with aqmbc dates : list Dates to process. Each will be saved separately on disk. sleep : int GEOS-CF OpenDAP will crash if too many calls are made sequentially. Heuristically, a minute between calls prevents crashes. metvars : list Optional list of metvars to subset. See GEOS-CF fluid documentation. chemvars : list Optional list of chmvars to subset. See GEOS-CF fluid documentation. xgcvars : list Optional list of xgcvars to subset. See GEOS-CF fluid documentation. Returns ------- outpaths : list List of paths that were created by download """ import pandas as pd import aqmbc import numpy as np import xarray as xr import os import time sleep = 60 dates = pd.to_datetime(dates) metaf = aqmbc.options.getmetaf(bctype='bcon', gdnam=gdnam) lonp = metaf.variables['longitude'] xlim = slice(*np.quantile(lonp, [0, 1]) + np.array([-1, 1])) latp = metaf.variables['latitude'] ylim = slice(*np.quantile(latp, [0, 1]) + np.array([-1, 1])) rooturl = 'https://opendap.nccs.nasa.gov/dods/gmao/geos-cf/assim' meturl = f'{rooturl}/met_tavg_1hr_g1440x721_v36' chmurl = f'{rooturl}/chm_tavg_1hr_g1440x721_v36' xgcurl = f'{rooturl}/xgc_tavg_1hr_g1440x721_v36' mf = xr.open_dataset(meturl) cf = xr.open_dataset(chmurl) xf = xr.open_dataset(xgcurl) if metvars is None: metvars = ['zl', 'airdens', 'ps', 'delp', 'q', 't'] else: metvars = list(metvars) if chmvars is None: chmvars = list(cf.data_vars) else: chmvars = list(chmvars) if xgcvars is None: xgcvars = list(xf.data_vars) else: xgcvars = list(xgcvars) outpaths = [] for t in dates: tv = mf.time.sel(time=t, method='nearest').values stime = pd.to_datetime(tv).round('1s').to_pydatetime() outdir = f'GEOSCF/{gdnam}/{stime:%Y/%m/%d}' pathsuf = f'{stime:%Y-%m-%dT%H%M}Z.nc' outpath = f'{outdir}/geoscf_mcx_tavg_1hr_g1440x721_v36_{pathsuf}' if os.path.exists(outpath): print(f'Keeping cached: {outpath}') outpaths.append(outpath) continue print(f'Making: {outpath}') tmpmf = mf[metvars].sel(time=tv, lat=ylim, lon=xlim) tryandtime(tmpmf, 'met') mergefs = [tmpmf] if len(chmvars) > 0: tmpcf = cf[chmvars].sel(time=tv, lat=ylim, lon=xlim) tryandtime(tmpcf, 'chm') mergefs.append(tmpcf) if len(xgcvars) > 0: tmpxf = xf[xgcvars].sel(time=tv, lat=ylim, lon=xlim) tryandtime(tmpxf, 'xgc') mergefs.append(tmpxf) outf = xr.merge(mergefs) outf.attrs['data_source'] = f'{meturl}, {chmurl}, {xgcurl}' os.makedirs(os.path.dirname(outpath), exist_ok=True) outf.expand_dims(time=[stime]).to_netcdf(outpath) outpaths.append(outpath) if len(dates) > 1: time.sleep(sleep) return outpaths
class geoscf(pnc.PseudoNetCDFFile): def __init__(self, *args, **kwds): """ Thin wrapper around raqms files to add ll2ij, getTimes, and interpSigma functions. """ f = pnc.pncopen(*args, **kwds) self.dimensions = f.dimensions self.variables = f.variables for k in f.ncattrs(): setattr(self, k, f.getncattr(k)) self.setCoords( ['time', 'lat', 'lon', 'lev'] ) def ll2ij(self, lon, lat, bounds='warn', clean='clip'): import numpy as np lon = np.asarray(lon) lat = np.asarray(lat) i = self.val2idx('lon', lon) j = self.val2idx('lat', lat) ni = self.variables['lon'].size nj = self.variables['lat'].size ic = np.minimum(np.maximum(i, 0), ni - 1) jc = np.minimum(np.maximum(j, 0), nj - 1) if ((ic != i).any() | (ic != i).any()): import warnings warnings.warn('Some cells are outside the source domain') return ic, jc def getTimes(self, datetype='datetime', bounds=False): import cftime import numpy as np time = self.variables['time'] out = np.asarray(cftime.num2pydate(time[:], units=time.units)) return out def interpSigma( self, vglvls, vgtop=None, interptype='linear', extrapolate=False, fill_value='extrapolate', verbose=0 ): """ Parameters ---------- vglvls : iterable the new vglvls (edges) vgtop : scalar Converting to new vgtop interptype : string 'linear' uses a linear interpolation 'conserve' uses a mass conserving interpolation extrapolate : boolean allow extrapolation beyond bounds with linear, default False fill_value : boolean set fill value (e.g, nan) to prevent extrapolation or edge continuation Returns ------- outf : ioapi_base PseudoNetCDFFile with all variables interpolated Notes ----- When extrapolate is false, the edge values are used for points beyond the inputs. """ from PseudoNetCDF.coordutil import sigma2coeff as sigma2coeff_con from .util import sigma2coeff_lin import numpy as np psfc = self.variables['ps'][:][:, None, ...] delp = self.variables['delp'][:] plow = psfc - np.cumsum(delp[:, ::-1], axis=1)[:, ::-1] pedges = np.concatenate([plow, psfc], axis=1) pmid = plow + delp * 0.5 ptop = psfc - delp.sum(1) pedges = np.concatenate([ptop, ptop + np.cumsum(delp, axis=1)], axis=1) newshape = list(delp.shape) nzout = len(vglvls) - 1 newshape.insert(2, nzout) itershape = [newshape[0]] + newshape[3:] sigma = (pedges - vgtop) / (psfc - vgtop) if interptype not in ('linear', 'conserve'): print(f'Unknown interptype {interptype}; default to linear') interptype = 'linear' if interptype == 'linear': sigma2coeff = sigma2coeff_lin else: sigma2coeff = sigma2coeff_con tmpv = np.zeros(newshape, dtype='f') for idx in np.ndindex(*itershape): srcidx = (idx[0], slice(None)) + tuple(idx[1:]) destidx = (idx[0], slice(None), slice(None)) + tuple(idx[1:]) fromvglvls = sigma[srcidx] # sigma2coeff expects vglvls to decrease (i.e., surface to top), # but RAQMS is ordered top-to-surface. So, the vglvls are reversed # and the results are reversed as well. tmpv[destidx] = sigma2coeff( fromvglvls[::-1], vglvls )[::-1] pweight = tmpv[:] * pmid[:, :, None, ...] pnorm = pweight.sum(1) exprkeys = [ key for key, var in self.variables.items() if var.dimensions[:2] == ('time', 'lev') ] outdims = self.variables[exprkeys[0]].dimensions outf = pnc.PseudoNetCDFFile() for k, d in self.dimensions.items(): if k == 'lev': outf.createDimension(k, nzout) else: outf.createDimension(k, len(d)) for k in self.ncattrs(): outf.setncattr(k, self.getncattr(k)) outf.setncattr('VGLVLS', vglvls) outf.setncattr('VGTOP', vgtop) outf.setncattr('VGTYP', 7) for key in exprkeys: invar = self.variables[key] props = {k: invar.getncattr(k) for k in self.variables['delp'].ncattrs()} outvar = outf.createVariable( key, invar.dtype.char, outdims, **props ) outvar[:] = ( invar[:][:, :, None, ...] * pweight ).sum(1) / pnorm for key, var in self.variables.items(): if key not in exprkeys and key not in self.dimensions: outf.copyVariable(var, key=key) return outf