Source code for PseudoNetCDF.camxfiles.irr.Read

__all__ = ['irr']
__doc__ = """
.. _Read
:mod:`Read` -- irr Read interface
============================================

.. module:: Read
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` random access read for CAMx
              irr files.  See PseudoNetCDF.sci_var.PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""

# Distribution packages
from datetime import datetime
import unittest
import struct
from warnings import warn
from math import ceil

# Site-Packages
from numpy import zeros, array, dtype, fromfile, arange

# This Package modules
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
pncvar = PseudoNetCDFVariable

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


[docs] class irr(PseudoNetCDFFile): """ irr provides a PseudoNetCDF interface for CAMx irr files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> irr_path = 'camx_irr.bin' >>> irrfile = irr(irr_path) >>> irrfile.variables.keys() ['TFLAG', 'RXN_01', 'RXN_02', 'RXN_03', ...] >>> v = irrfile.variables['RXN_01'] >>> tflag = irrfile.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> irrfile.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ id_fmt = "ifiiiii" data_fmt = "f" def __init__(self, rf, units='ppm/hr', conva=None, nvarcache=None): """ Initialization included reading the header and learning about the format. see __readheader and __gettimestep() for more info """ rffile = self._rffile = open(rf) rffile.seek(0, 2) if rffile.tell() < 2147483648: warn("For greater speed on files <2GB use ipr_memmap") rffile.seek(0, 0) self.__readheader() self.__gettimestep() self.units = units # __conv is a conversion array that comes from ipr # if units is not ppm/hr, conv must be provided self.__conv = conva if self.units != 'ppm/hr' and self.__conv is None: raise ValueError("When units are provided, a conversion array " + "dim(t,z,x,y) must also be provided") varkeys = ['RXN_%02d' % i for i in range(1, self.NRXNS + 1)] domain = self.padomains[0] tdim = self.createDimension('TSTEP', int(self.NSTEPS)) tdim.setunlimited(True) self.createDimension('COL', int(domain['iend'] - domain['istart'] + 1)) self.createDimension('ROW', int(domain['jend'] - domain['jstart'] + 1)) self.createDimension('LAY', int(domain['tlay'] - domain['blay'] + 1)) self.createDimension('DATE-TIME', 2) self.createDimension('VAR', int(self.NRXNS)) self.variables = PseudoNetCDFVariables(self.__var_get, varkeys) self._nvarcache = nvarcache or int(ceil(self.NRXNS / 4.)) tflag = self.createVariable( 'TFLAG', 'i', ('TSTEP', 'VAR', 'DATE-TIME')) tflag.units = '<YYYYJJJ, HHMMSS>' tflag.var_desc = tflag.long_name = 'TFLAG'.ljust(16) tflag[:] = array(self.timerange(), dtype='i').reshape( self.NSTEPS, 1, 2) def __var_get(self, key): rxni = int(key.split('_')[1]) self.loadVars(rxni, self._nvarcache) tflag = self.createVariable( 'TFLAG', 'i', ('TSTEP', 'VAR', 'DATE-TIME')) tflag.units = '<YYYYJJJ, HHMMSS>' tflag.var_desc = tflag.long_name = 'TFLAG'.ljust(16) tflag[:] = array(self.timerange(), dtype='i').reshape( self.NSTEPS, 1, 2) return self.variables[key] def __readheader(self): """ __readheader reads the header section of the ipr file it initializes each header field (see CAMx Users Manual for a list) as properties of the ipr class """ rffile = self._rffile notedt = dtype([('SPAD', '>i'), ('NOTE', 'S80'), ('EPAD', '>i')]) self.runmessage = fromfile(rffile, dtype=notedt, count=1)[0]['NOTE'] datedt = dtype([('SPAD', '>i'), ('SDATE', '>i'), ('STIME', '>f'), ('EDATE', '>i'), ('ETIME', '>f'), ('EPAD', '>i')]) self.SDATE, self.STIME, self.EDATE, self.ETIME = \ fromfile(rffile, dtype=datedt, count=1)[0].tolist()[1:-1] self.grids = [] ngdt = dtype([('SPAD', '>i'), ('NGRIDS', '>i'), ('EPAD', '>i')]) gdefdt = dtype([('SPAD', '>i'), ('orgx', '>f'), ('orgy', '>f'), ('ncol', '>i'), ('nrow', '>i'), ('xsize', '>f'), ('ysize', '>f'), ('iutm', '>i'), ('EPAD', '>i')]) for grid in range(fromfile(rffile, dtype=ngdt, count=1)[0]['NGRIDS']): gdefval = fromfile(rffile, dtype=gdefdt, count=1)[0].tolist()[1:-1] self.grids.append( dict( zip( gdefdt.names[1:-1], gdefval ) ) ) self.padomains = [] paddt = dtype([('SPAD', '>i'), ('NPADOMAINS', '>i'), ('EPAD', '>i')]) padefdt = dtype([('SPAD', '>i'), ('grid', '>i'), ('istart', '>i'), ('iend', '>i'), ('jstart', '>i'), ('jend', '>i'), ('blay', '>i'), ('tlay', '>i'), ('EPAD', '>i')]) padn = fromfile(rffile, dtype=paddt, count=1)[0]['NPADOMAINS'] for padomain in range(padn): padef = fromfile(rffile, dtype=padefdt, count=1)[0].tolist()[1:-1] self.padomains.append( dict( zip( padefdt.names[1:-1], padef ) ) ) nrxndt = dtype([('SPAD', '>i'), ('NRXNS', '>i'), ('EPAD', '>i')]) self.NRXNS = fromfile(rffile, dtype=nrxndt, count=1)[0]['NRXNS'] self._data_start_byte = rffile.tell() self._record_dtype = dtype([('SPAD', '>i'), ('DATE', '>i'), ('TIME', '>f'), ('PAGRID', '>i'), ('NEST', '>i'), ('I', '>i'), ('J', '>i'), ('K', '>i'), ('IRRS', '>f', self.NRXNS), ('EPAD', '>i')]) self._padded_size = self._record_dtype.itemsize def __gettimestep(self): """ Header information provides start and end date, but does not indicate the increment between. This routine reads the first and second date/time and initializes variables indicating the timestep length and the anticipated number. """ self._activedomain = self.padomains[0] self._rffile.seek( self._data_start_byte + ( self.__jrecords(0, self.padomains[0]['jend']) * self._padded_size ), 0 ) date, time = fromfile(self._rffile, dtype=self._record_dtype, count=1)[ 0].tolist()[1:3] tstart = datetime.strptime('%05dT%04d' % ( self.SDATE, self.STIME), '%y%jT%H%M') tone = datetime.strptime('%05dT%04d' % (date, time), '%y%jT%H%M') tstep = tone - tstart self.time_step = self.TSTEP = int( (datetime.strptime('0000', '%H%M') + tstep).strftime('%H%M')) tend = datetime.strptime('%05dT%04d' % ( self.EDATE, self.ETIME), '%y%jT%H%M') tdiff = tend - tstart multiple = (tdiff.days * 24 * 3600. + tdiff.seconds) / \ (tstep.days * 24 * 3600. + tstep.seconds) self.NSTEPS = self.time_step_count = int(multiple) assert (multiple == int(multiple)) def __gridrecords(self, pagrid): """ routine returns the number of records to increment from the data start byte to find the pagrid """ ntime = self.__timerecords( pagrid, (self.EDATE, int(self.ETIME + self.TSTEP))) return ntime def __timerecords(self, pagrid, dt): """ routine returns the number of records to increment from the data start byte to find the first time """ d, t = dt nsteps = self.timerange().index((d, t)) nj = self.__jrecords(pagrid, self.padomains[pagrid]['jend'] + 1) return nsteps * nj def __irecords(self, pagrid, i): """ routine returns the number of records to increment from the data start byte to find the first icell """ ni = i - self._activedomain['istart'] nk = self.__krecords(pagrid, self._activedomain['tlay'] + 1) return ni * nk def __jrecords(self, pagrid, j): """ routine returns the number of records to increment from the data start byte to find the first jcell """ nj = j - self._activedomain['jstart'] ni = self.__irecords(pagrid, self._activedomain['iend'] + 1) return nj * ni def __krecords(self, pagrid, k): """ routine returns the number of records to increment from the data start byte to find the first kcell """ return k - self._activedomain['blay'] def __recordposition(self, pagrid, date, time, i, j, k): """ routine uses pagridrecords, timerecords,irecords, jrecords, and krecords multiplied by the fortran padded size to return the byte position of the specified record pagrid - integer date - integer time - float i - integer j - integer k - integer """ records = 0 for pag in range(pagrid): records += self.__gridrecords(pag) records += self.__timerecords(pagrid, (date, time)) records += self.__jrecords(pagrid, j) records += self.__irecords(pagrid, i) records += self.__krecords(pagrid, k) return records * self._padded_size + self._data_start_byte def _seek(self, pagrid=1, date=None, time=None, i=1, j=1, k=1): """ Move file cursor to beginning of specified record see __recordposition for a definition of variables """ if date is None: date = self.SDATE if time is None: time = self.STIME + self.TSTEP self._activedomain = self.padomains[pagrid] self._rffile.seek(self.__recordposition(pagrid, date, time, i, j, k))
[docs] def loadVars(self, start, n, pagrid=0): domain = self.padomains[pagrid] istart = domain['istart'] iend = domain['iend'] jstart = domain['jstart'] jend = domain['jend'] kstart = domain['blay'] kend = domain['tlay'] variables = self.variables nk = kend + 1 - kstart nj = jend + 1 - jstart ni = iend + 1 - istart nrec = nk * ni * nj temp = zeros((nrec, self.NRXNS), 'f') shape = (self.NSTEPS,) + \ tuple(eval('map(len, (LAY, ROW, COL))', None, self.dimensions)) variables.clear() end = min(start + n, self.NRXNS + 1) start = max(1, end - n) for rxn in range(start, end): key = 'RXN_%02d' % rxn variables[key] = pncvar(self, key, 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=zeros(shape, 'f'), units='ppm/hr', var_desk=key.ljust(16), long_name=key.ljust(16)) self._seek(pagrid=0, i=istart, j=jstart, k=kstart) for ti, (d, t) in enumerate(self.timerange()): record = fromfile( self._rffile, dtype=self._record_dtype, count=nrec) temp[:] = record['IRRS'] date = record['DATE'] time = record['TIME'] id = record['I'] jd = record['J'] kd = record['K'] assert ((id == arange(istart, iend + 1) [None, :, None].repeat(nk, 2).repeat(nj, 0).ravel()).all()) assert ((kd == arange(kstart, kend + 1) [None, None, :].repeat(ni, 1).repeat(nj, 0).ravel()).all()) assert ( ((jd == arange(jstart, jend + 1).repeat(ni * nk, 0))).all() ) assert ((date == d).all() and (time == t).all()) for rxn in range(start, end): variables['RXN_%02d' % rxn][ti, :, :, :] = \ temp[:, rxn - 1].reshape(nj, ni, nk).swapaxes(1, 2)\ .swapaxes(0, 1)
[docs] def timerange(self): tstart = datetime.strptime('%05dT%04d' % ( self.SDATE, self.STIME), '%y%jT%H%M') tdiff = datetime.strptime( '%04d' % self.TSTEP, '%H%M') - datetime.strptime('0000', '%H%M') dates = [tstart + (tdiff * i) for i in range(1, self.NSTEPS + 1)] return [(int(d.strftime('%y%j')), float(d.strftime('%H%M'))) for d in dates]
class TestRead(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testIRR(self): warn('Test not implemented; data too big for distribution') if __name__ == '__main__': unittest.main()