Source code for PseudoNetCDF.cmaqfiles._griddesc

import PseudoNetCDF as pnc
from PseudoNetCDF.pncwarn import warn
import os
import io
import re
from datetime import datetime
from collections import OrderedDict
import numpy as np
from ._ioapi import ioapi_base

_prjp = ('GDTYP', 'P_ALP', 'P_BET', 'P_GAM', 'XCENT', 'YCENT')
_grdp = (
    'PRJNAME', 'XORIG', 'YORIG', 'XCELL', 'YCELL', 'NCOLS', 'NROWS', 'NTHIK'
)


[docs] class griddesc(ioapi_base): """ griddesc is designed to read IOAPI griddesc files and create a file with basic CF metadata. An example format of GRIDDESC is show below with two grids on one projection. ' ' 'LCC' 2 33.000 45.000 -97.000 -97.000 40.000 ' ' 'SE52BENCH' 'LCC' 792000.000 -1080000.000 12000.000 12000.000 100 80 1 '36US3' 'LCC' -2952000.000 -2772000.000 36000.000 36000.000 172 148 1 ' ' """ def __init__( self, path, GDNAM=None, VGLVLS=(1., 0.), VGTOP=5000., FTYPE=1, VGTYP=7, SDATE=-635, STIME=0, TSTEP=0, var_kwds=None, nsteps=1, withcf=True, **prop_kw ): """ Arguments --------- path : str or file-like If path is has a read method, it will be used directly. If path is a path to a file on disk, it will be read. If none of these, treat the path as text content. If path is None, load a griddesc using properties provided. GDNAM : str Name the grid to be used. If not provided, the first will be used VGLVLS : tuple Iterable of layer edge (k+1) values for the vertical grid. VGTOP : float Top of vertical grid. VGTYP : int Determines the units of VGLVLS and VGTOP. (7: WRF sigma-p, 6: meters asl, for more details see IOAPI documentation) SDATE : int Starting julian date (YYYYJJJ) or -635 STIME : int Starting time as HHMMSS TSTEP : int IOAPI time step in HHMMSS FTYPE : int 1 for gridded; 2 for boundary var_kwds: tuple, list, or dict See setdefvars; Defaults to {'DUMMY': {'units': 'unknown'}} nsteps : int Number of time steps to use. Should be coupled with SDATE, STIME, and TSTEP. nsteps > 1 is not compatible with time independent (TSTEP=0) or SDATE=-635 withcf : bool If true, then CF compatible variables that describe dimensions and time are added. prop_kw: mappable Can provide additional IOAPI (or other) properties for the file. """ now = datetime.now() prj = {} grd = OrderedDict() if hasattr(path, 'read'): gdf = path path = '<inline>' elif path is None: required_props = _prjp[:] + _grdp[1:] + ('GDNAM',) grid_kw = prop_kw.copy() if GDNAM is None: grid_kw['GDNAM'] = 'UNKNOWN' else: grid_kw['GDNAM'] = GDNAM grid_kw.setdefault('NTHIK', 1) grid_kw.setdefault('PRJNAME', 'UNKNOWN') for rpk in required_props: if rpk not in grid_kw: if path is None and 'GRIDDESC' in os.environ: path = os.environ['GRIDDESC'] gdf = open(path, 'r') break raise KeyError( f'{rpk} not found.' + f'If path is None, {required_props} are required' ) else: gdf = io.StringIO(""" ' ' '{PRJNAME}' {GDTYP} {P_ALP} {P_BET} {P_GAM} {XCENT} {YCENT} ' ' '{GDNAM}' '{PRJNAME}' {XORIG} {YORIG} {XCELL} {YCELL} {NCOLS} {NROWS} {NTHIK} ' '""".format(**grid_kw)) path = '<inline>' elif os.path.exists(path): gdf = open(path, 'r') else: gdf = io.StringIO(path) path = '<inline>' gdtxt = gdf.read().replace(',', ' ').strip() # Fortran allows exponential notation to use D or d # instead of E or e, while Python does not. dble = re.compile('([\d.])[Dd]([\d])') gdtxt = dble.sub(r'\1e\2', gdtxt) gdlines = gdtxt.split('\n') gdlines = [line.strip() for line in gdlines] # Remove comments that start with ! gdlines = [line.split('!')[0].strip() for line in gdlines] # IOAPI does not verify first line, simply discards. # dscgrid.f#L153 # assert (gdlines[0].replace(' ', '') == "''") assert (gdlines[-1].replace(' ', '') == "''") i = 0 blanks = [] while i < len(gdlines): line = gdlines[i] if line.strip() in ("' '", "''", ""): blanks.append(i) else: i += 1 parts = [eval(p) for p in gdlines[i].split()] key = eval(line) if len(blanks) == 1: prj[key] = dict(zip(_prjp, parts)) elif len(blanks) == 2: grd[key] = dict(zip(_grdp, parts)) else: pass i += 1 self._prj = prj self._grd = grd self.FTYPE = FTYPE self.VGTYP = VGTYP self.VGTOP = np.float32(VGTOP) self.VGLVLS = np.array(VGLVLS, dtype='f') self.NLAYS = len(VGLVLS) - 1 self.UPNAM = 'GRIDDESC' self.EXEC_ID = 'GRIDDESC' self.FILEDESC = 'GRIDDESC' self.HISTORY = 'made from ' + path self.CDATE = int(now.strftime('%Y%j')) self.CTIME = int(now.strftime('%H%M%S')) self.WDATE = self.CDATE self.WTIME = self.CTIME self.SDATE = SDATE self.STIME = STIME self.TSTEP = TSTEP self._synthvars = None # user supplied properties should overwrite # defaults above for pk, pv in prop_kw.items(): self.setncattr(pk, pv) self.setdefvars(var_kwds) self.addtime(nsteps) self.setgrid(key=GDNAM, withcf=withcf) self.updatemeta() self.getVarlist()
[docs] def setdefvars(self, var_kwds): """ Arguments --------- var_kwds: list, tuple, or dictionary Variables with units or just a list of variables """ if var_kwds is None: var_kwds = {'DUMMY': {'units': 'unknown'}} elif isinstance(var_kwds, (tuple, list)): var_kwds = {k: 'unknown' for k in var_kwds} elif hasattr(var_kwds, 'items'): pass else: raise TypeError('var_kwds should be a list, tuple, or dictionary') self._var_kwds = {} for k, v in var_kwds.items(): if isinstance(v, str): self._var_kwds[k] = {'units': v.ljust(16)} elif isinstance(v, dict): self._var_kwds[k] = {_k: _v for _k, _v in v.items()} else: raise TypeError( 'var_kwds should have either str values (units) or a dict' + ' of all properties' )
[docs] def setgrid(self, key=None, withcf=True): """ Remakes the file to use the grid specified by key Arguments --------- key : str GDNAM to set the grid of the file withcf : bool Passed to adddims Returns ------- None """ if key is None: if getattr(self, 'GDNAM', None) is None: self.GDNAM = list(self._grd)[0] key = self.GDNAM.strip() grd = self._grd[key] prj = self._prj[grd['PRJNAME'].strip()] self.setncattr('GDNAM', key) self.setncatts(prj) self.setncatts(grd) self.adddims(withcf=withcf)
[docs] def addtime(self, nsteps=1): """ Adds TFLAG variable Arguments --------- nsteps : int Number of time steps to add Returns ------- None """ self.createDimension('TSTEP', nsteps).setunlimited(True) self.createDimension('DATE-TIME', 2) self.NVARS = len(self._var_kwds) self.createDimension('VAR', self.NVARS) try: self.updatetflag(overwrite=True) except Exception as e: warn(str(e)) tflag = self.createVariable( 'TFLAG', 'i', ('TSTEP', 'VAR', 'DATE-TIME'), units='<YYYYJJJ,HHMMSS>', long_name='TFLAG'.ljust(16), var_desc='TFLAG'.ljust(80) ) tflag[:, :, 0] = self.SDATE tflag[:, :, 1] = 0
[docs] def adddims(self, withcf=True): """ Add spatial dimensions Arguments --------- withcf : bool If true, add Climate and Forecasting convention variables Returns ------- None """ for k in 'LAY ROW COL PERIM'.split(): if k in self.dimensions: del self.dimensions[k] self.createDimension('LAY', self.NLAYS) if self.FTYPE == 1: self.createDimension('ROW', self.NROWS) self.createDimension('COL', self.NCOLS) dims = ('TSTEP', 'LAY', 'ROW', 'COL') elif self.FTYPE == 2: perim = (self.NCOLS * 2 + self.NROWS * 2 + self.NTHIK * 4) self.createDimension('PERIM', perim * self.NTHIK) dims = ('TSTEP', 'LAY', 'PERIM') else: raise ValueError( 'Only supports FTYPE 1 or 2; received ' + str(self.FTYPE) ) for vark, varkw in self._var_kwds.items(): self.createVariable( vark, 'f', dims, long_name=vark.ljust(16), var_desc=vark.ljust(80), **varkw ) if self._synthvars is None: oldkeys = set(self.variables) else: for key in self._synthvars: if key in self.variables: del self.variables[key] if withcf: pnc.conventions.ioapi.add_cf_from_ioapi(self) if self._synthvars is None: self._synthvars = set(self.variables).difference(oldkeys)
if __name__ == '__main__': f = griddesc('GRIDDESC', '36US3')