Source code for PseudoNetCDF.camxfiles.one3d.Memmap

__all__ = ['one3d']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- one3d Memmap interface
============================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx generic
              one 3d variable files.  See PseudoNetCDF.sci_var.PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""
# Distribution packages
import unittest
import struct

# Site-Packages
from numpy import zeros, array, where, memmap, newaxis

# This Package modules
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


[docs] class one3d(PseudoNetCDFFile): """ one3d provides a PseudoNetCDF interface for CAMx one3d files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> one3d_path = 'camx_one3d.bin' >>> rows,cols = 65,83 >>> one3dfile = one3d(one3d_path,rows,cols) >>> one3dfile.variables.keys() ['TFLAG', 'UNKNOWN'] >>> tflag = one3dfile.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v = one3dfile.variables['UNKNOWN'] >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> one3dfile.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ id_fmt = "fi" data_fmt = "f" var_name = "UNKNOWN" units = "UNKNOWN"
[docs] @classmethod def isMine(cls, path): try: f = one3d(path) except Exception: return False return f._checkfile()
def _checkfile(self): if self.__memmap.size != (self.__records * self.__record_items): return False pad0 = self.__memmap.reshape( self.__records, self.__record_items).view('i')[:, 0] pad1 = self.__memmap.reshape( self.__records, self.__record_items).view('i')[:, -1] return (pad0 == pad1).all() def __init__(self, rf, rows=None, cols=None): """ Initialization included reading the header and learning about the format. rows - number of rows in domain (defaults: 1) cols - number of columns in domain (defaults: size) """ self.rffile = rf self.__memmap = memmap(self.rffile, '>f', 'r', offset=0) if rows is None and cols is None: rows = 1 cols = self.__memmap[[-1]].view('>i')[0] // 4 - 2 self.__record_items = rows * cols + 4 self.__records = self.__memmap.shape[0] // self.__record_items time_date = array(self.__memmap.reshape( self.__records, self.__record_items)[:, 1:3]) lays = where(time_date != time_date[newaxis, 0])[0][0] new_hour = slice(0, None, lays) dates = time_date[:, 1].view('>i') times = time_date[:, 0] self.__tflag = array( [dates[new_hour], times[new_hour]], dtype='>f').swapaxes(0, 1) time_steps = self.__records / lays self.createDimension('VAR', 1) self.createDimension('TSTEP', time_steps) self.createDimension('COL', cols) self.createDimension('ROW', rows) self.createDimension('LAY', lays) self.createDimension('DATE-TIME', 2) self.FTYPE = 1 self.NVARS = 1 self.NCOLS = cols self.NROWS = rows self.NLAYS = lays self.NTHIK = 1 self.variables = PseudoNetCDFVariables( self.__variables, [self.var_name]) v = self.variables['TFLAG'] = ConvertCAMxTime( self.__tflag[:, 0], self.__tflag[:, 1], 1) self.SDATE, self.STIME = v[0, 0, :] def __decorator(self, name, pncfv): props = dict(units=self.units, var_desc=self.var_name.ljust(16), long_name=self.var_name.ljust(16)) for k, v in props.items(): setattr(pncfv, k, v) return pncfv def __variables(self, k): pncvar = PseudoNetCDFVariable tsteps = len(self.dimensions['TSTEP']) lays = len(self.dimensions['LAY']) rows = len(self.dimensions['ROW']) cols = len(self.dimensions['COL']) tmpvals = self.__memmap\ .reshape(self.__records, self.__record_items)[:, 3:-1]\ .reshape(tsteps, lays, rows, cols) return self.__decorator(k, pncvar(self, k, 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=tmpvals))
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testKV(self): import PseudoNetCDF.testcase inpath = PseudoNetCDF.testcase.camxfiles_paths['vertical_diffusivity'] vdfile = one3d(inpath, 4, 5) vdfile.variables['TFLAG'] checkv = array([1.00000000e+00, 4.76359320e+00, 1.92715893e+01, 1.52158489e+01, 7.20601225e+00, 1.84097159e+00, 2.63084507e+01, 1.27621298e+01, 1.06348248e+01, 2.22587357e+01, 1.00000000e+00, 1.69009724e+01, 1.00000000e+00, 2.23075104e+01, 1.27485418e+01, 1.45508013e+01, 1.45637455e+01, 2.95294094e+01, 3.02676849e+01, 2.84974957e+01, 1.00000000e+00, 6.93706131e+00, 3.07418957e+01, 3.41300621e+01, 1.66266994e+01, 1.00000000e+00, 2.53920174e+01, 1.92539787e+01, 2.32906532e+01, 5.96702042e+01, 1.00000000e+00, 2.24458847e+01, 1.00000000e+00, 5.45038452e+01, 3.45825729e+01, 3.43578224e+01, 3.76071548e+01, 6.76799850e+01, 7.33648529e+01, 7.35801239e+01, 1.00000000e+00, 6.38178444e+00, 4.39327278e+01, 5.00754166e+01, 2.44474106e+01, 1.00000000e+00, 3.51700935e+01, 2.71137428e+01, 3.40312347e+01, 8.85775909e+01, 1.00000000e+00, 3.13994522e+01, 1.00000000e+00, 8.07169266e+01, 5.12892876e+01, 5.05734329e+01, 5.56603966e+01, 1.00394188e+02, 1.08980370e+02, 1.09251083e+02, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.64916098e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.22205174e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.05408611e+01, 1.26687222e+01, 1.21386652e+01, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.93040049e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.39350021e+00, 1.02697349e+00, 1.00000000e+00, 1.00000000e+00, 1.82250175e+01, 2.90407104e+01, 2.83827496e+01, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 2.02609706e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.44422662e+00, 1.02998519e+00, 1.00000000e+00, 1.00000000e+00, 2.60322971e+01, 4.26534195e+01, 4.17046585e+01], dtype='f').reshape(2, 3, 4, 5) self.assertTrue((vdfile.variables['UNKNOWN'] == checkv).all()) def testNCF2KV(self): import PseudoNetCDF.testcase from PseudoNetCDF.pncgen import pncgen import os inpath = PseudoNetCDF.testcase.camxfiles_paths['vertical_diffusivity'] outpath = inpath + '.check' infile = one3d(inpath, 4, 5) pncgen(infile, outpath, format='camxfiles.vertical_diffusivity') orig = open(inpath, 'rb').read() new = open(outpath, 'rb').read() assert (orig == new) os.remove(outpath) if __name__ == '__main__': unittest.main()