Source code for PseudoNetCDF.camxfiles.height_pressure.Memmap

__all__ = ['height_pressure']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- height/pressure Memmap interface
=================================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
              height/pressure files.  See PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""

# Distribution packages
import unittest
import struct

# Site-Packages
from numpy import zeros, array, memmap

# This Package modules
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


[docs] class height_pressure(PseudoNetCDFFile): """ height_pressure provides a PseudoNetCDF interface for CAMx height_pressure files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> height_pressure_path = 'camx_height_pressure.bin' >>> rows,cols = 65,83 >>> hpf = height_pressure(height_pressure_path,rows,cols) >>> hpf.variables.keys() ['TFLAG', 'HGHT', 'PRES'] >>> v = hpf.variables['V'] >>> tflag = hpf.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> hpf.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ id_fmt = 'fi' data_fmt = 'f' def __init__(self, rf, rows=None, cols=None): self.__memmap = memmap(rf, '>f', 'r', offset=0) rowsXcols = self.__memmap[0].view('i') // 4 - 2 record_length = rowsXcols + 4 records = self.__memmap.size // record_length times = self.__memmap.reshape(records, record_length)[:, 1:3] self.STIME, self.SDATE = times[0] for i, (t, d) in enumerate(times): if (t, d) != (self.STIME, self.SDATE): break self.SDATE = self.SDATE.view('i') self.createDimension('LAY', i / 2) self.createDimension('TSTEP', times.shape[0] / i) if rows is None and cols is None: rows = rowsXcols cols = 1 elif rows is None: rows = rowsXcols / cols elif cols is None: cols = rowsXcols / rows else: if cols * rows != rowsXcols: raise ValueError(("The product of cols (%d) and rows (%d) " + "must equal cells (%d)") % ( cols, rows, rowsXcols)) self.createDimension('ROW', rows) self.createDimension('COL', cols) self.createDimension('DATE-TIME', 2) self.createDimension('VAR', 2) self.NROWS = rows self.NCOLS = cols self.NLAYS = len(self.dimensions['LAY']) self.NVARS = 2 self.NTHIK = 1 setattr(self, 'VAR-LIST', 'HGHT'.ljust(16) + 'PRES'.ljust(16)) self.variables = PseudoNetCDFVariables( self.__var_get, ['HGHT', 'PRES', 'TFLAG']) def __var_get(self, key): lays = len(self.dimensions['LAY']) times = len(self.dimensions['TSTEP']) rows = len(self.dimensions['ROW']) cols = len(self.dimensions['COL']) hght = 1 pres = 2 time = 3 date = 4 out_idx = zeros(self.__memmap.shape, dtype='b').reshape( times, lays, 2, rows * cols + 4) out_idx[:, :, 0, 3:-1] = hght out_idx[:, :, 1, 3:-1] = pres out_idx[:, :, :, 1] = time out_idx[:, :, :, 2] = date out_idx = out_idx.ravel() buf = self.__memmap[out_idx == 0].reshape(lays * 2 * times, 2) if not (buf[:, 0] == buf[:, 1]).all(): raise ValueError("Buffer") tmpvals = self.__memmap[out_idx == 1].reshape(times, lays, rows, cols) v = self.variables['HGHT'] = PseudoNetCDFVariable( self, 'HGHT', 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=tmpvals) v.units = 'm' v.long_name = 'HGHT'.ljust(16) v.var_desc = 'Top Height' tmpvals = self.__memmap[out_idx == 2].reshape(times, lays, rows, cols) v = self.variables['PRES'] = PseudoNetCDFVariable( self, 'PRES', 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=tmpvals) v.units = 'hPA' v.long_name = 'PRES'.ljust(16) v.var_desc = 'Pressure at center' tslice = slice(None, None, len(self.dimensions['LAY']) * 2) dateblock = self.__memmap[out_idx == 4][tslice].view('>i') hourblock = self.__memmap[out_idx == 3][tslice] self.variables['TFLAG'] = ConvertCAMxTime(dateblock, hourblock, len(self.dimensions['VAR'])) return self.variables[key]
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testHP(self): import PseudoNetCDF.testcase hpfile = height_pressure( PseudoNetCDF.testcase.camxfiles_paths['height_pressure'], 4, 5) hpfile.variables['TFLAG'] checkv = array([3.38721924e+01, 3.40657959e+01, 3.41392822e+01, 3.42358398e+01, 3.42543945e+01, 3.38868408e+01, 3.40622559e+01, 3.42358398e+01, 3.44768066e+01, 3.46112061e+01, 3.37558594e+01, 3.39323730e+01, 3.42663574e+01, 3.46854248e+01, 3.48144531e+01, 3.39472656e+01, 3.41900635e+01, 3.46160889e+01, 3.48209229e+01, 3.47874756e+01, 6.78652344e+01, 6.82532959e+01, 6.84020996e+01, 6.85950928e+01, 6.86304932e+01, 6.78945312e+01, 6.82465820e+01, 6.85941162e+01, 6.90783691e+01, 6.93474121e+01, 6.76313477e+01, 6.79859619e+01, 6.86558838e+01, 6.94960938e+01, 6.97552490e+01, 6.80159912e+01, 6.85028076e+01, 6.93570557e+01, 6.97674561e+01, 6.97009277e+01, 1.01980713e+02, 1.02563843e+02, 1.02787109e+02, 1.03077759e+02, 1.03131104e+02, 1.02022949e+02, 1.02553101e+02, 1.03076904e+02, 1.03804565e+02, 1.04208984e+02, 1.01628662e+02, 1.02162842e+02, 1.03169922e+02, 1.04433838e+02, 1.04823120e+02, 1.02206909e+02, 1.02940186e+02, 1.04224609e+02, 1.04841797e+02, 1.04740723e+02, 3.38721924e+01, 3.40657959e+01, 3.41392822e+01, 3.42358398e+01, 3.42543945e+01, 3.38868408e+01, 3.40622559e+01, 3.42358398e+01, 3.44768066e+01, 3.46112061e+01, 3.37558594e+01, 3.39323730e+01, 3.42663574e+01, 3.46854248e+01, 3.48144531e+01, 3.39472656e+01, 3.41900635e+01, 3.46160889e+01, 3.48209229e+01, 3.47874756e+01, 6.78652344e+01, 6.82532959e+01, 6.84020996e+01, 6.85950928e+01, 6.86304932e+01, 6.78945312e+01, 6.82465820e+01, 6.85941162e+01, 6.90783691e+01, 6.93474121e+01, 6.76313477e+01, 6.79859619e+01, 6.86558838e+01, 6.94960938e+01, 6.97552490e+01, 6.80159912e+01, 6.85028076e+01, 6.93570557e+01, 6.97674561e+01, 6.97009277e+01, 1.01980713e+02, 1.02563843e+02, 1.02787109e+02, 1.03077759e+02, 1.03131104e+02, 1.02022949e+02, 1.02553101e+02, 1.03076904e+02, 1.03804565e+02, 1.04208984e+02, 1.01628662e+02, 1.02162842e+02, 1.03169922e+02, 1.04433838e+02, 1.04823120e+02, 1.02206909e+02, 1.02940186e+02, 1.04224609e+02, 1.04841797e+02, 1.04740723e+02], dtype='f').reshape(2, 3, 4, 5) self.assertTrue((hpfile.variables['HGHT'] == checkv).all()) def testNCF2HP(self): import PseudoNetCDF.testcase from PseudoNetCDF.pncgen import pncgen import os inpath = PseudoNetCDF.testcase.camxfiles_paths['height_pressure'] outpath = inpath + '.check' infile = height_pressure(inpath, 4, 5) pncgen(infile, outpath, format='camxfiles.height_pressure') orig = open(inpath, 'rb').read() new = open(outpath, 'rb').read() assert (orig == new) os.remove(outpath) if __name__ == '__main__': unittest.main()