Source code for PseudoNetCDF.camxfiles.irr.Memmap

__all__ = ['irr']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- irr Memmap interface
============================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
              irr files.  See PseudoNetCDF.sci_var.PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""

# Distribution packages
import unittest
import struct
from warnings import warn
from collections import defaultdict
from functools import partial

# Site-Packages
from numpy import zeros, memmap, dtype

# This Package modules
from PseudoNetCDF.camxfiles.timetuple import timeadd, timerange
from PseudoNetCDF.camxfiles.FortranFileUtil import OpenRecordFile
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


def _isMine(path):
    testf = OpenRecordFile(path)
    if testf.record_size == 80:
        testf.next()
        if testf.record_size == 16:
            testf.next()
            if testf.record_size == 28:
                return True
    return False


[docs] class irr(PseudoNetCDFFile): """ irr provides a PseudoNetCDF interface for CAMx irr files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> irr_path = 'camx_irr.bin' >>> rows,cols = 65,83 >>> irrfile = irr(irr_path,rows,cols) >>> irrfile.variables.keys() ['TFLAG', 'RXN_01', 'RXN_02', 'RXN_03', ...] >>> v = irrfile.variables['RXN_01'] >>> tflag = irrfile.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> irrfile.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ id_fmt = "if5i" data_fmt = "f"
[docs] @classmethod def isMine(self, path): return _isMine(path)
def __init__(self, rf, multi=False): """ Initialization included reading the header and learning about the format. see __readheader and __gettimestep() for more info """ self.__rffile = OpenRecordFile(rf) self.__readheader() irr_record_type = dtype( dict(names=(['SPAD', 'DATE', 'TIME', 'PAGRID', 'NEST', 'I', 'J', 'K'] + ['RXN_%02d' % i for i in range(1, self.nrxns + 1)] + ['EPAD']), formats=(['>i', '>i', '>f', '>i', '>i', '>i', '>i', '>i'] + ['>f' for i in range(1, self.nrxns + 1)] + ['>i']))) varkeys = [i for i in irr_record_type.names[8:-1]] + ['TFLAG'] self.groups = defaultdict(PseudoNetCDFFile) padatatype = [] pavarkeys = [] self.createDimension('TSTEP', self.time_step_count) self.createDimension('VAR', len(varkeys) - 1) self.createDimension('DATE-TIME', 2) for di, domain in enumerate(self._padomains): dk = 'PA%02d' % (di + 1) prefix = dk + '_' pavarkeys.extend([prefix + k for k in varkeys]) grp = self.groups[dk] for propk, propv in domain.items(): setattr(grp, propk, propv) grp.createDimension('TSTEP', self.time_step_count) grp.createDimension('VAR', len(varkeys) - 1) grp.createDimension('DATE-TIME', 2) grp.createDimension('COL', domain['iend'] - domain['istart'] + 1) grp.createDimension('ROW', domain['jend'] - domain['jstart'] + 1) grp.createDimension('LAY', domain['tlay'] - domain['blay'] + 1) padatatype.append((dk, irr_record_type, (len(grp.dimensions['ROW']), len(grp.dimensions['COL']), len(grp.dimensions['LAY'])))) if len(self._padomains) == 1: pncols = domain['iend'] - domain['istart'] + 1 pnrows = domain['jend'] - domain['jstart'] + 1 pnlays = domain['tlay'] - domain['blay'] + 1 self.createDimension('COL', pncols) self.createDimension('ROW', pnrows) self.createDimension('LAY', pnlays) for propk, propv in domain.items(): setattr(grp, propk, propv) varget = partial(self.__variables, dk) if len(self._padomains) == 1: self.variables = PseudoNetCDFVariables(varget, varkeys) else: grp.variables = PseudoNetCDFVariables(varget, varkeys) self.__memmaps = memmap(self.__rffile.infile.name, dtype( padatatype), 'r', self.data_start_byte) def __del__(self): try: del self.__memmaps except Exception: pass def __decorator(self, name, pncfv): def decor(k): return dict(units='ppm/hr', var_desc=k.ljust(16), long_name=k.ljust(16)) for k, v in decor(name).items(): setattr(pncfv, k, v) return pncfv def __variables(self, pk, rxn): pncvar = PseudoNetCDFVariable if rxn == 'TFLAG': return ConvertCAMxTime(self.__memmaps[pk][:, 0, 0, 0]['DATE'], self.__memmaps[pk][:, 0, 0, 0]['TIME'], len(self.groups[pk].dimensions['VAR'])) tmpvals = self.__memmaps[pk][rxn].swapaxes(1, 3).swapaxes(2, 3) return self.__decorator(rxn, pncvar(self, rxn, 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=tmpvals)) def __readheader(self): """ __readheader reads the header section of the ipr file it initializes each header field (see CAMx Users Manual for a list) as properties of the ipr class """ assert (self.__rffile.record_size == 80) self.runmessage = self.__rffile.read("80s") self.start_date, self.start_time, self.end_date, self.end_time = \ self.__rffile.read("ifif") self.time_step = 100. self.time_step_count = len([i for i in self.timerange()]) self._grids = [] for grid in range(self.__rffile.read("i")[-1]): self._grids.append( dict( zip( ['orgx', 'orgy', 'ncol', 'nrow', 'xsize', 'ysize', 'iutm'], self.__rffile.read("iiiiiii") ) ) ) self._padomains = [] for padomain in range(self.__rffile.read("i")[-1]): self._padomains.append( dict( zip( ['grid', 'istart', 'iend', 'jstart', 'jend', 'blay', 'tlay'], self.__rffile.read("iiiiiii") ) ) ) self.nrxns = self.__rffile.read('i')[-1] self.data_start_byte = self.__rffile.record_start self.record_fmt = self.id_fmt + str(self.nrxns) + self.data_fmt self.record_size = self.__rffile.record_size self.padded_size = self.record_size + 8 domain = self._padomains[0] self.records_per_time = (domain['iend'] - domain['istart'] + 1) *\ (domain['jend'] - domain['jstart'] + 1) *\ (domain['tlay'] - domain['blay'] + 1) self.time_data_block = self.padded_size * self.records_per_time self.time_step = 100.
[docs] def timerange(self): return timerange((self.start_date, self.start_time + self.time_step), timeadd((self.end_date, self.end_time), (0, self.time_step)), self.time_step)
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testIRR(self): warn('Test not implemented; data too big for distribution') if __name__ == '__main__': unittest.main()