Source code for PseudoNetCDF.camxfiles.point_source.Memmap

from __future__ import unicode_literals
__all__ = ['point_source']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- point_source Memmap interface
============================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
              point_source files.  See PseudoNetCDF.sci_var.PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@gmail.com>
.. modulecontributor:: Yoann Long (little endian support)
"""

# Distribution packages
import unittest
import struct

# Site-Packages
from numpy import zeros, array, memmap, dtype
import numpy as np
# This Package modules
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


[docs] class point_source(PseudoNetCDFFile): """ point_source provides a PseudoNetCDF interface for CAMx point_source files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> point_source_path = 'camx_point_source.bin' >>> rows,cols = 65,83 >>> point_sourcefile = point_source(point_source_path,rows,cols) >>> point_sourcefile.variables.keys() ['TFLAG', 'ETFLAG', 'TFLAG', 'XSTK', 'YSTK', 'HSTK', 'DSTK', 'TSTK', 'VSTK', 'KCELL', 'FLOW', 'PLMHT', 'NSTKS', 'NO', 'NO2', ...] >>> tflag = point_sourcefile.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v = point_sourcefile.variables['XSTK'] >>> v.dimensions ('NSTK',) >>> v.shape (38452,) >>> v = point_sourcefile.variables['NO2'] >>> v.dimensions ('TSTEP', 'NSTK') >>> v.shape (25, 38452) >>> point_sourcefile.dimensions {'TSTEP': 25, 'NSTK': 38452} """
[docs] @classmethod def isMine(cls, path): try: point_source(path) return True except Exception: return False
def __init__(self, rf, endian='big', mode='r', **kwds): """ Initialization included reading the header and learning about the format. see __readheader and __gettimestep() for more info """ self.__rffile = rf ep = self.__endianprefix = dict(big='>', little='<')[endian] self.variables = {} for key, val in kwds.items(): setattr(self, key, val) self.__memmap = memmap(self.__rffile, ep + 'f', mode) self.__emiss_hdr_fmt = dtype(dict( names=['SPAD', 'name', 'note', 'itzon', 'nspec', 'ibdate', 'btime', 'iedate', 'etime', 'EPAD'], formats=['i', '(10,4)S1', '(60,4)S1', 'i', 'i', 'i', 'f', 'i', 'f', 'i'])).newbyteorder(ep) self.__grid_hdr_fmt = dtype(dict( names=['SPAD', 'plon', 'plat', 'iutm', 'xorg', 'yorg', 'delx', 'dely', 'nx', 'ny', 'nz', 'iproj', 'istag', 'tlat1', 'tlat2', 'rdum5', 'EPAD'], formats=['i', 'f', 'f', 'i', 'f', 'f', 'f', 'f', 'i', 'i', 'i', 'i', 'i', 'f', 'f', 'f', 'i'])).newbyteorder(ep) self.__cell_hdr_fmt = dtype(dict( names=['SPAD', 'ione1', 'ione2', 'nx', 'ny', 'EPAD'], formats=['i', 'i', 'i', 'i', 'i', 'i'])).newbyteorder(ep) self.__spc_hdr_fmt = dtype('(10,4)' + 'S1').newbyteorder(ep) self.__time_hdr_fmt = dtype(dict( names=['SPAD', 'ibdate', 'btime', 'iedate', 'etime', 'EPAD'], formats=['i', 'i', 'f', 'i', 'f', 'i'])).newbyteorder(ep) self.__nstk_hdr_fmt = dtype(dict( names=['SPAD', 'ione', 'nstk', 'EPAD'], formats=['i', 'i', 'i', 'i'])).newbyteorder(ep) self.__stk_prop_fmt = dtype(dict( names=['XSTK', 'YSTK', 'HSTK', 'DSTK', 'TSTK', 'VSTK'], formats=['f', 'f', 'f', 'f', 'f', 'f'])).newbyteorder(ep) self.__stk_time_prop_fmt = dtype(dict( names=['IONE', 'ITWO', 'KCELL', 'FLOW', 'PLMHT'], formats=['i', 'i', 'f', 'f', 'f'])).newbyteorder(ep) self.__globalheader() varkeys = ['ETFLAG', 'TFLAG', 'XSTK', 'YSTK', 'HSTK', 'DSTK', 'TSTK', 'VSTK', 'IONE', 'ITWO', 'KCELL', 'FLOW', 'PLMHT', 'NSTKS'] + [i.strip() for i in self.__spc_names] self.SPC_NAMES = ''.join([s.ljust(16) for s in self.__spc_names]) setattr(self, 'VAR-LIST', "".join([i.ljust(16) for i in varkeys])) self.variables = PseudoNetCDFVariables(self.__variables, varkeys) self.__time_stks() self.createDimension('NSTK', self.__nstk_hdr['nstk']) def __globalheader(self): """ __readheader reads the header section of the ipr file it initializes each header field (see CAMx Users Manual for a list) as properties of the ipr class """ ep = self.__endianprefix offset = 0 stop = offset + self.__emiss_hdr_fmt.itemsize // 4 self.__emiss_hdr = self.__memmap[offset:stop]\ .view(self.__emiss_hdr_fmt) offset += self.__emiss_hdr.nbytes // 4 assert ((self.__emiss_hdr['SPAD'] == self.__emiss_hdr['EPAD']).all()) stop = offset + self.__grid_hdr_fmt.itemsize // 4 self.__grid_hdr = self.__memmap[offset:stop].view(self.__grid_hdr_fmt) offset += self.__grid_hdr.nbytes // 4 assert ((self.__grid_hdr['SPAD'] == self.__grid_hdr['EPAD']).all()) self.NAME = self.__emiss_hdr['name'][0, :, 0].copy().view('S10')[ 0].decode() self.NOTE = self.__emiss_hdr['note'][0, :, 0].copy().view('S60')[ 0].decode() self.XORIG = self.__grid_hdr['xorg'][0] self.YORIG = self.__grid_hdr['yorg'][0] self.XCELL = self.__grid_hdr['delx'][0] self.YCELL = self.__grid_hdr['dely'][0] self.NCOLS = self.__grid_hdr['nx'][0] self.NROWS = self.__grid_hdr['ny'][0] self.NLAYS = self.__grid_hdr['nz'][0] self.PLON = self.__grid_hdr['plon'][0] self.PLAT = self.__grid_hdr['plat'][0] self.TLAT1 = self.__grid_hdr['tlat1'][0] self.TLAT2 = self.__grid_hdr['tlat2'][0] self.IUTM = self.__grid_hdr['iutm'][0] self.ISTAG = self.__grid_hdr['istag'][0] self.CPROJ = self.__grid_hdr['iproj'][0] stop = offset + self.__cell_hdr_fmt.itemsize // 4 self.__cell_hdr = self.__memmap[offset:stop].view(self.__cell_hdr_fmt) offset += self.__cell_hdr.nbytes // 4 + 1 assert ((self.__cell_hdr['SPAD'] == self.__cell_hdr['EPAD']).all()) nspec = self.__emiss_hdr['nspec'][0] self.ITZON = self.__emiss_hdr['itzon'][0] stop = offset + self.__spc_hdr_fmt.itemsize // 4 * nspec self.__spc_hdr = self.__memmap[offset:stop]\ .view(ep + 'S1').reshape(nspec, 10, 4) offset += self.__spc_hdr.nbytes // 4 + 1 spc_names = [np.char.strip(spc[:, 0].copy().view('S10'))[ 0] for spc in self.__spc_hdr] spc_names = [spc.decode() if hasattr(spc, 'decode') else spc for spc in spc_names] self.__spc_names = spc_names stop = offset + self.__nstk_hdr_fmt.itemsize // 4 self.__nstk_hdr = self.__memmap[offset:stop].view(self.__nstk_hdr_fmt) offset += self.__nstk_hdr.nbytes // 4 + 1 assert ((self.__nstk_hdr['SPAD'] == self.__nstk_hdr['EPAD']).all()) nstk = self.__nstk_hdr['nstk'][0] self.createDimension('NSTK', nstk) stop = offset + self.__stk_prop_fmt.itemsize // 4 * nstk self.__stk_props = self.__memmap[offset:stop].reshape( nstk, self.__stk_prop_fmt.itemsize // 4).view(self.__stk_prop_fmt) offset += self.__stk_props.nbytes // 4 + 1 self.__data_start = offset self.SDATE = self.__emiss_hdr['ibdate'] self.STIME = self.__emiss_hdr['btime'] self.EDATE = self.__emiss_hdr['iedate'] self.ETIME = self.__emiss_hdr['etime'] def __getspcidx(self, spc): return self.__spc_names.index(spc) def __time_stks(self): ep = self.__endianprefix nspcs = len(self.__spc_names) nstks = len(self.dimensions['NSTK']) date_block_size = 6 stk_block_size = 4 stk_props_size = 2 + nstks * 5 emiss_block_size = nspcs * (nstks + 13) hour_block_size = (date_block_size + stk_block_size + stk_props_size + emiss_block_size) data = self.__memmap[self.__data_start:] data = data.reshape(data.size // hour_block_size, hour_block_size) ntimes = data.shape[0] self.createDimension('TSTEP', ntimes) self.createDimension('DATE-TIME', 2) start = 0 end = date_block_size date_times = data[:, start:end] dates = date_times[:, [1, 3]].view(ep + 'i') times = date_times[:, [2, 4]] start = end end = start + stk_block_size nstk_hdr = data[:, start:end].view(ep + 'i') if not (nstks == nstk_hdr[:, 2:3]).all(): raise ValueError("Number of stacks varies with time") start = end end = start + stk_props_size self.__hourly_stk_props = \ data[:, start:end][:, 1:-1].reshape(ntimes, nstks, 5) start = end end = start + emiss_block_size if not end == data.shape[1]: raise ValueError("Incorrect shape") self.__emiss_data = data[:, start:].reshape( ntimes, nspcs, 13 + nstks)[:, :, 12:-1] bdates = dates[:, 0] btimes = times[:, 0] edates = dates[:, 1] etimes = times[:, 1] self.NSTEPS = ntimes self.createDimension('VAR', len(self.__spc_names) + 3) self.variables['TFLAG'] = ConvertCAMxTime( bdates, btimes, len(self.dimensions['VAR'])) self.variables['ETFLAG'] = ConvertCAMxTime( edates, etimes, len(self.dimensions['VAR'])) v = self.variables['NSTKS'] = PseudoNetCDFVariable( self, 'NSTKS', 'i', ('TSTEP',), values=array(nstk_hdr[:, 2])) v.units = '#'.ljust(16) v.long_name = 'NSTKS'.ljust(16) v.var_desc = v.long_name def __variables(self, k): if k in ['TFLAG', 'ETFLAG', 'NSTKS']: return self.variables[k] elif k in ['XSTK', 'YSTK', 'HSTK', 'DSTK', 'TSTK', 'VSTK']: v = PseudoNetCDFVariable( self, k, 'f', ('NSTK',), values=self.__stk_props[k].ravel()) v.units = {'XSTK': 'm', 'YSTK': 'm', 'HSTK': 'm', 'DSTK': 'm', 'TSTK': 'K', 'VSTK': 'm/h'}[k] v.long_name = k.ljust(16) v.var_desc = k.ljust(16) return v elif k in ['IONE', 'ITWO', 'KCELL', 'FLOW', 'PLMHT']: data_type = {'IONE': 'i', 'ITWO': 'i', 'KCELL': 'i', 'FLOW': 'f', 'PLMHT': 'f'}[k] v = self.createVariable(k, data_type, ('TSTEP', 'NSTK')) v.units = {'IONE': '#', 'ITWO': '#', 'KCELL': '#', 'FLOW': 'm**3/hr', 'PLMHT': 'm'}[k] v.long_name = k.ljust(16) v.var_desc = k.ljust(16) vals = self.__hourly_stk_props[:, :, [ 'IONE', 'ITWO', 'KCELL', 'FLOW', 'PLMHT'].index(k)] v[:] = vals.view('>' + data_type) return v elif k in self.__spc_names: vals = self.__emiss_data[:, self.__getspcidx(k), :] v = PseudoNetCDFVariable(self, k, 'f', ('TSTEP', 'NSTK'), values=vals) v.units = 'mole/hr'.ljust(16) v.long_name = k.ljust(16) v.var_desc = k.ljust(16) return v else: raise KeyError("Unknown key %s" % k)
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testPT(self): import PseudoNetCDF.testcase emissfile = point_source( PseudoNetCDF.testcase.camxfiles_paths['point_source']) v = emissfile.variables['NO2'] self.assertTrue((v[:] == np.array( [0.00000000e+00, 3.12931000e+02, 1.23599997e+01, 0.00000000e+00, 5.27999992e+01, 0.00000000e+00, 3.12931000e+02, 1.23599997e+01, 0.00000000e+00, 5.27999992e+01], dtype='f').reshape(2, 5)).all()) def testNCF2PT(self): import PseudoNetCDF.testcase from PseudoNetCDF.pncgen import pncgen import os inpath = PseudoNetCDF.testcase.camxfiles_paths['point_source'] outpath = inpath + '.check' infile = point_source(inpath) pncgen(infile, outpath, format='camxfiles.point_source') orig = open(inpath, 'rb').read() new = open(outpath, 'rb').read() # outfile=point_source(outpath) # for k, v0 in infile.variables.items(): # v1 = outfile.variables[k] # print(k, (v0[:] == v1[:]).all()) assert (orig == new) os.remove(outpath) if __name__ == '__main__': unittest.main()