Source code for PseudoNetCDF.camxfiles.finst.Memmap

from __future__ import print_function
__all__ = ['finst']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- finst Memmap interface
============================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx fine
              instantaneus/avg files.  See PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""

from warnings import warn
# Distribution packages
import unittest
import struct

# Site-Packages
from numpy import zeros, memmap, dtype, fromfile

# This Package modules
from PseudoNetCDF.camxfiles.units import get_uamiv_units
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


[docs] class finst(PseudoNetCDFFile): """ finst provides a PseudoNetCDF interface for CAMx finst files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> finst_path = 'camx_finst.bin' >>> finstfile = finst(finst_path) >>> finstfile.variables.keys() ['TFLAG', 'O3', 'NO', 'NO2', ...] >>> v = finstfile.variables['V'] >>> tflag = finstfile.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> finstfile.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ __messagefmt = '>240S' __nestspcfmt = dtype(dict(names=['nnest', 'nspec'], formats=['>i', '>i'])) __nestparmsfmt = dtype(dict(names=['SPAD', 'ibeg', 'jbeg', 'iend', 'jend', 'mesh', 'ione', 'nx', 'ny', 'nz', 'iparent', 'ilevel', 'EPAD'], formats=['>i'] * 13)) __time_hdr_fmt = dtype(dict(names=['time', 'date'], formats=['>f', '>i'])) __spc_fmt = dtype(">10S") __ione = 1 __idum = 0 __rdum = 0. __buffersize = 4 def __init__(self, rf, mode='r'): """ Initialization included reading the header and learning about the format. see __readheader and __gettimestep() for more info """ self.__rffile = rf # Establish dimensions self.createDimension('DATE-TIME', 2) self.__readheader(mode) # Add IOAPI metavariables self.NLAYS = len(self.dimensions['LAY']) self.NROWS = len(self.dimensions['ROW']) self.NCOLS = len(self.dimensions['COL']) self.NVARS = len(self.dimensions['VAR']) self.NSTEPS = len(self.dimensions['TSTEP']) varlist = "".join([i.ljust(16) for i in self.__var_names__]) setattr(self, 'VAR-LIST', varlist) self.GDTYP = 2 # Create variables self.variables = PseudoNetCDFVariables( self.__variables, self.__var_names__ + ['TFLAG']) # Initialize time maps date = self.__memmap__['DATE']['DATE'] time = self.__memmap__['DATE']['TIME'] self.variables['TFLAG'] = ConvertCAMxTime(date, time, self.NVARS) self.SDATE, self.STIME = self.variables['TFLAG'][0, 0, :] def __checkfilelen(self): f = open(self.__rffile, 'rb') f.seek(0, 2) flen = f.tell() f.close() return flen def __readheader(self, mode): f = open(self.__rffile) f.seek(92) self.NNEST, self.NSPEC = fromfile(f, '>i', 2) f.seek(108) self.SPECIES = fromfile(f, self.__spc_fmt, self.NSPEC) offset = f.tell() + 4 f.close() del f self.__var_names__ = [ i.decode().strip() for i in self.SPECIES.tolist() ] self.NEST_PARMS = memmap( self.__rffile, dtype=self.__nestparmsfmt, mode=mode, offset=offset, shape=self.NNEST) offset += self.__nestparmsfmt.itemsize * self.NNEST date_fmt = dtype( dict(names=['SPAD', 'TIME', 'DATE', 'EPAD'], formats=['>i', '>f', '>i', '>i'])) spc_1_lay_fmt = [] for i in range(self.NNEST): nny = self.NEST_PARMS[i]['ny'] nnx = self.NEST_PARMS[i]['nx'] nnfmts = ['>i', '(%d,%d)>f' % (nny, nnx), '>i'] spc_1_lay_fmt.append(dtype(dict(names=['SPAD', 'DATA', 'EPAD'], formats=nnfmts))) grid_fmt = [] for i in range(self.NNEST): gfmts = [dtype((spc_1_lay_fmt[i], (int(self.NEST_PARMS[i]['nz']), )))] * self.NSPEC grid_fmt.append(dtype(dict(names=self.__var_names__, formats=gfmts))) mmnames = ['DATE'] + ['grid%d' % i for i in range(self.NNEST)] mmdt = dtype(dict(names=mmnames, formats=[date_fmt] + grid_fmt)) self.__memmap__ = memmap(self.__rffile, mode=mode, offset=offset, dtype=mmdt) ntimes = self.__memmap__.shape[0] if int(ntimes) != ntimes: raise ValueError("Not an even number of times") self.createDimension('TSTEP', ntimes) self.createDimension('VAR', self.NSPEC) self.chooseGrid(0)
[docs] def chooseGrid(self, ngrid): self.createDimension('LAY', self.NEST_PARMS[ngrid]['nz']) self.createDimension('COL', self.NEST_PARMS[ngrid]['nx']) self.createDimension('ROW', self.NEST_PARMS[ngrid]['ny']) self.CURRENT_GRID = 'grid%d' % ngrid
def __variables(self, k): dimensions = ('TSTEP', 'LAY', 'ROW', 'COL') outvals = self.__memmap__[self.CURRENT_GRID][k]['DATA'][:, :, :, :] unit = get_uamiv_units('INSTANT ', k) return PseudoNetCDFVariable(self, k, 'f', dimensions, values=outvals, units=unit)
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testFinst(self): import PseudoNetCDF.testcase finstfile = finst(PseudoNetCDF.testcase.camxfiles_paths['finst']) vars = [i for i in finstfile.variables.keys() if i != 'TFLAG'] for var in vars: v = finstfile.variables[var] warn("Test case is not fully implemented. " + "Review values for 'reasonability.'") datetime = finstfile.variables['TFLAG'] minv = v.min(1).min(1).min(1) meanv = v.mean(1).mean(1).mean(1) maxv = v.max(1).max(1).max(1) for i, (d, t) in enumerate(datetime[:, 0, :]): print(var, d, t, minv[i], meanv[i], maxv[i]) if __name__ == '__main__': unittest.main()