Source code for PseudoNetCDF.camxfiles.lateral_boundary.Memmap

__all__ = ['lateral_boundary']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- lateral_boundary Memmap interface
============================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
              lateral_boundary files.  See PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""

# Distribution packages
import unittest
import struct

# Site-Packages
from numpy import zeros, array, memmap, dtype, testing

# This Package modules
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoIOAPIVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime
from PseudoNetCDF.camxfiles.units import get_uamiv_units

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


[docs] class lateral_boundary(PseudoNetCDFFile): """ lateral_boundary provides a PseudoNetCDF interface for CAMx lateral_boundary files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> lateral_boundary_path = 'camx_lateral_boundary.bin' >>> rows,cols = 65,83 >>> lbf = lateral_boundary(lateral_boundary_path,rows,cols) >>> lbf.variables.keys() ['TFLAG', 'O3', 'NO', 'NO2', ...] >>> tflag = lbf.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v = lbf.variables['O3'] >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> lbf.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ __emiss_hdr_fmt = dtype(dict(names=['SPAD', 'name', 'note', 'itzon', 'nspec', 'ibdate', 'btime', 'iedate', 'etime', 'EPAD'], formats=['>i', '(10,4)>S1', '(60,4)>S1', '>i', '>i', '>i', '>f', '>i', '>f', '>i'])) __grid_hdr_fmt = dtype(dict(names=['SPAD', 'plon', 'plat', 'iutm', 'xorg', 'yorg', 'delx', 'dely', 'nx', 'ny', 'nz', 'iproj', 'istag', 'tlat1', 'tlat2', 'rdum5', 'EPAD'], formats=['>i', '>f', '>f', '>i', '>f', '>f', '>f', '>f', '>i', '>i', '>i', '>i', '>i', '>f', '>f', '>f', '>i'])) __cell_hdr_fmt = dtype(dict(names=['SPAD', 'ione1', 'ione2', 'nx', 'ny', 'EPAD'], formats=['>i', '>i', '>i', '>i', '>i', '>i'])) __time_hdr_fmt = dtype(dict(names=['SPAD', 'ibdate', 'btime', 'iedate', 'etime', 'EPAD'], formats=['>i', '>i', '>f', '>i', '>f', '>i'])) __spc_fmt = dtype("(10,4)>S1") __ione = 1 __idum = 0 __rdum = 0.
[docs] @classmethod def isMine(cls, path): try: # initialization calls readheader, which has an assertion check lateral_boundary(path) return True except Exception: return False
def __init__(self, rf, mode='r'): """ Initialization included reading the header and learning about the format. see __readheader and __gettimestep() for more info """ self.__rffile = rf self.__mode = mode self.createDimension('DATE-TIME', 2) self.__readheader() # Add IOAPI metavariables self.NLAYS = len(self.dimensions['LAY']) self.NROWS = len(self.dimensions['ROW']) self.NCOLS = len(self.dimensions['COL']) self.NVARS = len(self.dimensions['VAR']) self.NSTEPS = len(self.dimensions['TSTEP']) varlist = "".join([i.ljust(16) for i in self.__var_names__]) setattr(self, 'VAR-LIST', varlist) self.GDTYP = 2 name = self.__emiss_hdr['name'][0, :, 0].copy().view('S10')[0] note = self.__emiss_hdr['note'][0, :, 0].copy().view('S60')[0] if hasattr(name, 'decode'): name = name.decode() if hasattr(note, 'decode'): note = note.decode() self.NAME = name self.NOTE = note self.ITZON = self.__emiss_hdr['itzon'][0] # Create variables self.variables = PseudoNetCDFVariables( self.__variables, self.__var_names__ + ['TFLAG', 'ETFLAG']) self.variables['TFLAG'] = ConvertCAMxTime( self.__memmap__['DATE']['BDATE'], self.__memmap__['DATE']['BTIME'], self.NVARS) self.variables['ETFLAG'] = ConvertCAMxTime( self.__memmap__['DATE']['EDATE'], self.__memmap__['DATE']['BTIME'], self.NVARS) self.SDATE, self.STIME = self.variables['TFLAG'][0, 0, :] def __checkfilelen(self): f = open(self.__rffile, 'rb') f.seek(0, 2) flen = f.tell() f.close() return flen def __readheader(self): offset = 0 self.__emiss_hdr = memmap( self.__rffile, mode=self.__mode, dtype=self.__emiss_hdr_fmt, shape=1, offset=offset) nspec = self.__emiss_hdr['nspec'][0] offset += self.__emiss_hdr.dtype.itemsize * self.__emiss_hdr.size self.__grid_hdr = memmap( self.__rffile, mode=self.__mode, dtype=self.__grid_hdr_fmt, shape=1, offset=offset) self.XORIG = self.__grid_hdr['xorg'][0] self.YORIG = self.__grid_hdr['yorg'][0] self.XCELL = self.__grid_hdr['delx'][0] self.YCELL = self.__grid_hdr['dely'][0] self.PLON = self.__grid_hdr['plon'][0] self.PLAT = self.__grid_hdr['plat'][0] self.TLAT1 = self.__grid_hdr['tlat1'][0] self.TLAT2 = self.__grid_hdr['tlat2'][0] self.IUTM = self.__grid_hdr['iutm'][0] self.ISTAG = self.__grid_hdr['istag'][0] self.CPROJ = self.__grid_hdr['iproj'][0] # Map CAMx projection constants to IOAPI GDTYPE = self.GDTYP = {0: 1, 1: 5, 2: 2, 3: 6}[ self.__grid_hdr['iproj'][0]] self.XCENT = self.__grid_hdr['plon'][0] self.YCENT = self.__grid_hdr['plat'][0] if GDTYPE in (1, 2): self.P_ALP = self.__grid_hdr['tlat1'][0] self.P_BET = self.__grid_hdr['tlat2'][0] self.P_GAM = self.__grid_hdr['plon'][0] elif GDTYPE == 5: self.P_ALP = self.__grid_hdr['iutm'][0] self.P_BET = 0. self.P_GAM = 0. elif GDTYPE == 6: self.P_ALP = {90: 1, -90: -1}[self.__grid_hdr['plat'][0]] self.P_BET = self.__grid_hdr['tlat1'][0] self.P_GAM = self.__grid_hdr['plon'][0] else: raise ValueError('Unknown projection') nx = self.__grid_hdr['nx'][0] ny = self.__grid_hdr['ny'][0] nz = max(self.__grid_hdr['nz'], array([1]))[0] offset += self.__grid_hdr.dtype.itemsize * self.__grid_hdr.size self.__cell_hdr = memmap( self.__rffile, mode=self.__mode, dtype=self.__cell_hdr_fmt, shape=1, offset=offset) offset += self.__cell_hdr.dtype.itemsize * self.__cell_hdr.size + 4 self.__spc_hdr = memmap(self.__rffile, mode=self.__mode, dtype=self.__spc_fmt, shape=nspec, offset=offset) offset += self.__spc_hdr.dtype.itemsize * self.__spc_hdr.size + 4 self._boundary_def = {} for bkey, bdim in [('WEST', ny), ('EAST', ny), ('SOUTH', nx), ('NORTH', nx)]: __bound_fmt = dtype(dict(names=['SPAD', 'ione', 'iedge', 'ncell', 'edgedata', 'EPAD'], formats=['>i', '>i', '>i', '>i', '(%d,%d)>i' % (bdim, 4), '>i'])) self._boundary_def[bkey] = memmap( self.__rffile, mode=self.__mode, dtype=__bound_fmt, shape=1, offset=offset) assert (self._boundary_def[bkey]['SPAD'] == (__bound_fmt.itemsize - 8)) offset += __bound_fmt.itemsize date_time_fmt = dtype(dict(names=['SPAD', 'BDATE', 'BTIME', 'EDATE', 'ETIME', 'EPAD'], formats=['>i', '>i', '>f', '>i', '>f', '>i'])) date_time_block_size = 6 spc_we_fmt = dtype(dict(names=['SPAD', 'IONE', 'SPC', 'IEDGE', 'DATA', 'EPAD'], formats=['>i', '>i', '(10,4)>S1', '>i', '(%d,%d)>f' % (ny, nz), '>i'])) spc_sn_fmt = dtype(dict(names=['SPAD', 'IONE', 'SPC', 'IEDGE', 'DATA', 'EPAD'], formats=['>i', '>i', '(10,4)>S1', '>i', '(%d,%d)>f' % (nx, nz), '>i'])) spc_lat_fmt = dtype(dict(names=['WEST', 'EAST', 'SOUTH', 'NORTH'], formats=[spc_we_fmt, spc_we_fmt, spc_sn_fmt, spc_sn_fmt, ])) # Get species names from spc_hdr self.__var_names__ = [] spc_names = [spc[:, 0].copy().view('S10')[0] for spc in self.__spc_hdr] spc_names = [spc.decode() if hasattr(spc, 'decode') else spc for spc in spc_names] self.__spc_names__ = [spc.strip() for spc in spc_names] for spc in self.__spc_names__: for bkey in ['WEST_', 'EAST_', 'SOUTH_', 'NORTH_']: self.__var_names__.append(bkey + spc) spc_lat_block_size = spc_lat_fmt.itemsize // 4 data_block_fmt = dtype(dict( names=['DATE'] + self.__spc_names__, formats=[date_time_fmt] + [spc_lat_fmt] * nspec)) data_block_size = date_time_block_size + nspec * spc_lat_block_size f = open(self.__rffile) f.seek(0, 2) size = f.tell() f.close() del f ntimes = float(size - offset) // 4. // data_block_size if int(ntimes) != ntimes: raise ValueError("Not an even number of times %f" % ntimes) ntimes = int(ntimes) self.createDimension('LAY', nz) self.createDimension('COL', nx) self.createDimension('ROW', ny) self.createDimension('TSTEP', ntimes) self.createDimension('VAR', nspec * 4) self.__memmap__ = memmap( self.__rffile, mode=self.__mode, dtype=data_block_fmt, offset=offset) def __variables(self, k): edgename = k.split('_')[0] spcname = k[len(edgename) + 1:] if edgename in ('WEST', 'EAST'): dimensions = ('TSTEP', 'ROW', 'LAY') else: dimensions = ('TSTEP', 'COL', 'LAY') outvals = self.__memmap__[spcname][edgename]['DATA'] units = get_uamiv_units(self.NAME, spcname) return PseudoIOAPIVariable(self, k, 'f', dimensions, values=outvals, units=units)
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testLB(self): import PseudoNetCDF.testcase latfile = lateral_boundary( PseudoNetCDF.testcase.camxfiles_paths['lateral_boundary']) latfile.variables['TFLAG'] aassert = testing.assert_array_almost_equal checkw = array([5.09086549e-02, 5.15854955e-02, 5.20327762e-02, 4.51020785e-02, 4.81765866e-02, 5.01902141e-02, 5.11084236e-02, 5.30455858e-02, 5.39216697e-02, 5.49603552e-02, 5.52441478e-02, 5.53695373e-02, 4.70436066e-02, 4.88652885e-02, 5.00564687e-02, 4.30268869e-02, 4.65402082e-02, 4.93281633e-02, 4.84376177e-02, 5.15173525e-02, 5.32316864e-02, 5.27723655e-02, 5.43252379e-02, 5.51613718e-02], dtype='f').reshape(2, 4, 3) aassert(latfile.variables['WEST_O3'], checkw) checke = array([4.32289392e-02, 4.32576202e-02, 4.32824045e-02, 4.28533703e-02, 4.29217815e-02, 4.29747701e-02, 4.20787595e-02, 4.21368703e-02, 4.21847440e-02, 4.20016758e-02, 4.20646742e-02, 4.21276242e-02, 4.45125513e-02, 4.45423163e-02, 4.45689932e-02, 4.34477255e-02, 4.35189568e-02, 4.35763896e-02, 4.23655175e-02, 4.24236283e-02, 4.24756072e-02, 4.21473905e-02, 4.22183946e-02, 4.22968678e-02], dtype='f').reshape(2, 4, 3) aassert(latfile.variables['EAST_O3'], checke) checks = array([5.00756986e-02, 5.07836118e-02, 5.12398519e-02, 4.53975834e-02, 4.85784635e-02, 5.06106876e-02, 4.88935970e-02, 5.18746264e-02, 5.34388497e-02, 5.52802160e-02, 5.66427484e-02, 5.73641062e-02, 5.76447174e-02, 5.91668785e-02, 5.98937273e-02, 4.64089997e-02, 4.85925563e-02, 4.98120859e-02, 4.31438088e-02, 4.85876575e-02, 5.05158082e-02, 4.63983566e-02, 5.09891734e-02, 5.34970984e-02, 5.16217351e-02, 5.49310222e-02, 5.70215993e-02, 5.37858233e-02, 5.71687669e-02, 5.90757653e-02], dtype='f').reshape(2, 5, 3) aassert(latfile.variables['SOUTH_O3'], checks) checkw = array([4.67120931e-02, 4.72769439e-02, 4.75757420e-02, 4.63561714e-02, 4.69456315e-02, 4.72794324e-02, 4.45141681e-02, 4.53266129e-02, 4.58004810e-02, 4.20911871e-02, 4.29873653e-02, 4.35210876e-02, 3.29810269e-02, 3.38455141e-02, 3.43711227e-02, 4.65101935e-02, 4.71580848e-02, 4.75305654e-02, 4.55248095e-02, 4.63739596e-02, 4.68796641e-02, 4.44590077e-02, 4.58386838e-02, 4.66508269e-02, 4.33906466e-02, 4.47074845e-02, 4.51488644e-02, 3.23279053e-02, 3.31198536e-02, 3.36514898e-02], dtype='f').reshape(2, 5, 3) aassert(latfile.variables['NORTH_O3'], checkw) def testNCF2LB(self): import PseudoNetCDF.testcase from PseudoNetCDF.pncgen import pncgen import os inpath = PseudoNetCDF.testcase.camxfiles_paths['lateral_boundary'] outpath = inpath + '.check' infile = lateral_boundary(inpath) pncgen(infile, outpath, format='camxfiles.lateral_boundary') orig = open(inpath, 'rb').read() new = open(outpath, 'rb').read() assert (orig == new) os.remove(outpath) if __name__ == '__main__': lb = lateral_boundary('../../testcase/utils/CAMx/lateral_boundary/' + 'BC.vistas_2002gt2a_STL_36_68X68_16L.2002154') # unittest.main()