Source code for PseudoNetCDF.camxfiles.ipr.Memmap

__all__ = ['ipr']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- ipr Memmap interface
============================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
              ipr files.  See PseudoNetCDF.sci_var.PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""
# Distribution packages
import unittest
import struct
import functools
from warnings import warn

# Site-Packages
from numpy import zeros, dtype, memmap, char

# This Package modules
from PseudoNetCDF.conventions.ioapi import add_cf_from_ioapi
from PseudoNetCDF.camxfiles.timetuple import timeadd, timerange
from PseudoNetCDF.camxfiles.util import cartesian
from PseudoNetCDF.camxfiles.units import get_uamiv_units
from PseudoNetCDF.camxfiles.FortranFileUtil import OpenRecordFile
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


[docs] class ipr(PseudoNetCDFFile): """ ipr provides a PseudoNetCDF interface for CAMx ipr files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> ipr_path = 'camx_ipr.bin' >>> iprfile = ipr(ipr_path) >>> iprfile.variables.keys() ['TFLAG', 'SPAD_O3', 'DATE_O3', 'TIME_O3', 'SPC_O3', 'PAGRID_O3', 'NEST_O3', 'I_O3', 'J_O3', 'K_O3', 'INIT_O3', 'CHEM_O3', 'EMIS_O3', 'PTEMIS_O3', 'PIG_O3', 'WADV_O3', 'EADV_O3', 'SADV_O3', 'NADV_O3', 'BADV_O3', 'TADV_O3', 'DIL_O3', 'WDIF_O3', 'EDIF_O3', 'SDIF_O3', 'NDIF_O3', 'BDIF_O3', 'TDIF_O3', 'DDEP_O3', 'WDEP_O3', 'INORGACHEM_O3', 'ORGACHEM_O3', 'AQACHEM_O3', 'FCONC_O3', 'UCNV_O3', 'AVOL_O3', 'EPAD_O3'] >>> v = iprfile.variables['CHEM_O3'] >>> tflag = iprfile.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> iprfile.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ id_fmt = "if10s5i" dt_fmt = "if" data_fmt = "f" def __init__(self, rf, multi=False, **props): """ Initialization included reading the header and learning about the format. see __readheader and __gettimestep() for more info Keywords (i.e., props) for projection: P_ALP, P_BET, P_GAM, XCENT, YCENT, XORIG, YORIG, XCELL, YCELL """ self.__rffile = OpenRecordFile(rf) self.__readheader() self.__ipr_record_type = { 24: dtype( dict( names=['SPAD', 'DATE', 'TIME', 'SPC', 'PAGRID', 'NEST', 'I', 'J', 'K', 'INIT', 'CHEM', 'EMIS', 'PTEMIS', 'PIG', 'WADV', 'EADV', 'SADV', 'NADV', 'BADV', 'TADV', 'DIL', 'WDIF', 'EDIF', 'SDIF', 'NDIF', 'BDIF', 'TDIF', 'DDEP', 'WDEP', 'AERCHEM', 'FCONC', 'UCNV', 'AVOL', 'EPAD'], formats=['>i', '>i', '>f', '>S10', '>i', '>i', '>i', '>i', '>i', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>i'])), 26: dtype( dict( names=['SPAD', 'DATE', 'TIME', 'SPC', 'PAGRID', 'NEST', 'I', 'J', 'K', 'INIT', 'CHEM', 'EMIS', 'PTEMIS', 'PIG', 'WADV', 'EADV', 'SADV', 'NADV', 'BADV', 'TADV', 'DIL', 'WDIF', 'EDIF', 'SDIF', 'NDIF', 'BDIF', 'TDIF', 'DDEP', 'WDEP', 'INORGACHEM', 'ORGACHEM', 'AQACHEM', 'FCONC', 'UCNV', 'AVOL', 'EPAD'], formats=['>i', '>i', '>f', '>S10', '>i', '>i', '>i', '>i', '>i', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>f', '>i'])) }[len(self.prcnames)] aeroprocs = {24: ['AERCHEM'], 26: ['INORGACHEM', 'ORGACHEM', 'AQACHEM']}[len(self.prcnames)] prcs = (['SPAD', 'DATE', 'TIME', 'PAGRID', 'NEST', 'I', 'J', 'K', 'INIT', 'CHEM', 'EMIS', 'PTEMIS', 'PIG', 'WADV', 'EADV', 'SADV', 'NADV', 'BADV', 'TADV', 'DIL', 'WDIF', 'EDIF', 'SDIF', 'NDIF', 'BDIF', 'TDIF', 'DDEP', 'WDEP'] + aeroprocs + ['FCONC', 'UCNV', 'AVOL', 'EPAD']) varkeys = ['_'.join(i) for i in cartesian(prcs, self.spcnames)] varkeys += ['SPAD', 'DATE', 'TIME', 'PAGRID', 'NEST', 'I', 'J', 'K', 'TFLAG'] self.groups = {} NSTEPS = len([i_ for i_ in self.timerange()]) NVARS = len(varkeys) self.createDimension('VAR', NVARS) self.createDimension('DATE-TIME', 2) self.createDimension('TSTEP', NSTEPS) padatatype = [] pavarkeys = [] for di, domain in enumerate(self.padomains): dk = 'PA%02d' % di prefix = dk + '_' if len(self.padomains) == 1: grp = self.groups[dk] = self else: grp = self.groups[dk] = PseudoNetCDFFile() pavarkeys.extend([prefix + k for k in varkeys]) grp.createDimension('VAR', NVARS) grp.createDimension('DATE-TIME', 2) grp.createDimension('TSTEP', NSTEPS) grp.createDimension('COL', domain['iend'] - domain['istart'] + 1) grp.createDimension('ROW', domain['jend'] - domain['jstart'] + 1) grp.createDimension('LAY', domain['tlay'] - domain['blay'] + 1) padatatype.append((dk, self.__ipr_record_type, (len(grp.dimensions['ROW']), len(grp.dimensions['COL']), len(grp.dimensions['LAY'])))) grp.varget = functools.partial(self.__variables, dk) # grp.varget = eval("lambda k: self._ipr__variables('%s', k)""" % # dk, dict(self=self), locals()) if len(self.padomains) == 1: self.variables = PseudoNetCDFVariables(grp.varget, varkeys) else: grp.variables = PseudoNetCDFVariables(grp.varget, varkeys) nspc = len(self.spcnames) self.__memmaps = memmap(self.__rffile.infile.name, dtype(padatatype), 'r', self.data_start_byte).reshape(NSTEPS, nspc) for k, v in props.items(): setattr(self, k, v) try: add_cf_from_ioapi(self) except Exception: pass def __del__(self): try: del self.__memmaps except Exception: pass def __decorator(self, name, pncfv): spc = name.split('_')[-1] prc = name.split('_')[0] # IPR units are consistent with 'IPR' if prc == 'UCNV': units = 'm**3/mol' elif prc == 'AVOL': units = 'm**3' else: units = get_uamiv_units('IPR', spc) def decor(k): return dict(units=units, var_desc=k.ljust(16), long_name=k.ljust(16)) for k, v in decor(name).items(): setattr(pncfv, k, v) return pncfv def __variables(self, pk, proc_spc): if proc_spc in self.__ipr_record_type.names: proc = proc_spc proc_spc = proc_spc + '_' + self.spcnames[0] tmpvals = self.__memmaps[pk][:, 0, :, :, :][proc] tmpvals = tmpvals.swapaxes(1, 3).swapaxes(2, 3) return PseudoNetCDFVariable(self, proc_spc, 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=tmpvals) if proc_spc == 'TFLAG': thisdate = self.__memmaps[pk][:, 0, :, :, :]['DATE'].swapaxes( 1, 3).swapaxes(2, 3)[..., 0, 0, 0] thistime = self.__memmaps[pk][:, 0, :, :, :]['TIME'].swapaxes( 1, 3).swapaxes(2, 3)[..., 0, 0, 0] nvar = len(self.groups[pk].dimensions['VAR']) return ConvertCAMxTime(thisdate, thistime, nvar) for k in self.__ipr_record_type.names: proc = proc_spc[:len(k)] spc = proc_spc[len(k) + 1:] if proc == k and spc in self.spcnames: spc = self.spcnames.index(spc) dvals = self.__memmaps[pk][:, spc][proc].swapaxes( 1, 3).swapaxes(2, 3) tmpvar = PseudoNetCDFVariable(self, proc_spc, 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=dvals) return self.__decorator(proc_spc, tmpvar) raise KeyError("Bad!") def __readheader(self): """ __readheader reads the header section of the ipr file it initializes each header field (see CAMx Users Manual for a list) as properties of the ipr class """ self.runmessage = self.__rffile.read("80s") self.start_date, self.start_time, self.end_date, self.end_time = \ self.__rffile.read("ifif") self.grids = [] for grid in range(self.__rffile.read("i")[-1]): self.grids.append( dict( zip( ['orgx', 'orgy', 'ncol', 'nrow', 'xsize', 'ysize'], self.__rffile.read("iiiiii") ) ) ) self.spcnames = [] for spc in range(self.__rffile.read("i")[-1]): self.spcnames.append(char.decode( self.__rffile.read("10s")[-1]).strip()) self.nspec = len(self.spcnames) self.padomains = [] for padomain in range(self.__rffile.read("i")[-1]): self.padomains.append( dict( zip( ['grid', 'istart', 'iend', 'jstart', 'jend', 'blay', 'tlay'], self.__rffile.read("iiiiiii") ) ) ) self.activedomain = self.padomains[0] self.prcnames = [] for i in range(self.__rffile.read('i')[-1]): self.prcnames.append(self.__rffile.read('25s')[-1].strip()) self.data_start_byte = self.__rffile.record_start self.record_fmt = self.id_fmt + str(len(self.prcnames)) + self.data_fmt self.record_size = self.__rffile.record_size self.SDATE, self.STIME, dummy, dummy, dummy, dummy, dummy, dummy = \ self.__rffile.read(self.id_fmt) self.__rffile.previous() self.TSTEP = 100. self.padded_size = self.record_size + 8 domain = self.padomains[0] domi = (domain['iend'] - domain['istart'] + 1) domj = (domain['jend'] - domain['jstart'] + 1) doml = (domain['tlay'] - domain['blay'] + 1) self.records_per_time = self.nspec * domi * domj * doml self.time_data_block = self.padded_size * self.records_per_time self.time_step = 100.
[docs] def timerange(self): t1 = (self.start_date, self.start_time + self.time_step) t2 = timeadd((self.end_date, self.end_time), (0, self.time_step)) return timerange(t1, t2, self.time_step)
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testIPR(self): warn('Test not implemented; data too big for distribution') if __name__ == '__main__': unittest.main()