Source code for PseudoNetCDF.camxfiles.wind.Memmap

__all__ = ['wind']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- wind Memmap interface
============================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
              wind files.  See PseudoNetCDF.sci_var.PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""

# Distribution packages
import unittest
import struct

# Site-Packages
from numpy import zeros, array, memmap, nan

# This Package modules
from PseudoNetCDF.camxfiles.FortranFileUtil import OpenRecordFile
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from collections import OrderedDict
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


[docs] class wind(PseudoNetCDFFile): """ wind provides a PseudoNetCDF interface for CAMx wind files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> wind_path = 'camx_wind.bin' >>> rows,cols = 65,83 >>> windfile = wind(wind_path,rows,cols) >>> windfile.variables.keys() ['TFLAG', 'U', 'V'] >>> v = windfile.variables['V'] >>> tflag = windfile.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0,0,:] array([2005185, 0]) >>> tflag[-1,0,:] array([2005185, 240000]) >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> windfile.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ __units = 'm/s' def __init__(self, rffile, rows, cols): rf = OpenRecordFile(rffile) self.__time_hdr_fmts = {12: "fii", 8: "fi"}[rf.record_size] self.__time_hdr_fmts_size = rf.record_size self.STIME, self.SDATE = rf.unpack("fi") rf.next() lays = 1 record_size = rf.record_size while rf.record_size == record_size: lays += 1 rf.next() self.__dummy_length = (rf.record_size + 8) // 4 lays //= 2 record = rows * cols * 4 + 8 total_size = self.__dummy_length times = 0 while total_size < rf.length: times += 1 total_size += record * 2 * lays + self.__time_hdr_fmts_size + 8 times -= 1 self.variables = OrderedDict del rf self.createDimension('TSTEP', times) self.createDimension('DATE-TIME', 2) self.createDimension('LAY', lays) self.createDimension('ROW', rows) self.createDimension('COL', cols) self.createDimension('VAR', 2) self.NVARS = len(self.dimensions['VAR']) self.NLAYS = len(self.dimensions['LAY']) self.NROWS = len(self.dimensions['ROW']) self.NCOLS = len(self.dimensions['COL']) self.FTYPE = 1 self.__memmap = memmap(rffile, '>f', 'r', offset=0) if self.__time_hdr_fmts_size == 12: self.LSTAGGER = self.__memmap[3].view('i') else: self.LSTAGGER = nan self.variables = PseudoNetCDFVariables( self.__variables, ['TFLAG', 'U', 'V']) def __variables(self, k): self.__add_variables() return self.variables[k] def __decorator(self, k, pncfv): decor = {'TFLAG': {'units': 'DATE-TIME', 'long_name': 'TFLAG', 'var_desc': 'Time flag'}} for k, v in decor.get(k, {'units': 'm/s', 'long_name': k, 'var_desc': k}).items(): setattr(pncfv, k, v) return pncfv def __add_variables(self): tsteps = len(self.dimensions['TSTEP']) lays = len(self.dimensions['LAY']) rows = len(self.dimensions['ROW']) cols = len(self.dimensions['COL']) offset = len(self.__time_hdr_fmts) + 2 block = (rows * cols + 2) * 2 * lays out_idx = zeros(self.__memmap.shape, 'b') for t in range(tsteps): start = (t + 1) * offset + t * block + t * self.__dummy_length stop = start + block nlay2 = lays * 2 nmatrix = rows * cols out_idx[start:stop].reshape(nlay2, nmatrix + 2)[:, 1:-1] = 1 out_idx[start:stop].reshape(nlay2, nmatrix + 2)[:, [0, -1]] = 2 out_idx[start - offset:start] = 3 buffer = self.__memmap[out_idx == 2].reshape(tsteps, lays, 2, 2) if not (buffer[:, :, :, 0] == buffer[:, :, :, 1]).all(): raise ValueError('Fortran unformatted record start and end ' + 'padding do not match.') date = self.__memmap[out_idx == 3].reshape( tsteps, (out_idx == 3).sum() // tsteps)[:, 2].view('>i') time = self.__memmap[out_idx == 3].reshape( tsteps, (out_idx == 3).sum() // tsteps)[:, 1] self.variables['TFLAG'] = ConvertCAMxTime(date, time, 2) uvals = self.__memmap[out_idx == 1].reshape(tsteps, lays, 2, rows, cols)[:, :, 0, :, :] self.variables['U'] = self.__decorator('U', PseudoNetCDFVariable( self, 'U', 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=uvals)) vvals = self.__memmap[out_idx == 1].reshape(tsteps, lays, 2, rows, cols)[:, :, 1, :, :] self.variables['V'] = self.__decorator('V', PseudoNetCDFVariable( self, 'V', 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=vvals))
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testWD(self): import PseudoNetCDF.testcase wdfile = wind(PseudoNetCDF.testcase.camxfiles_paths['wind'], 4, 5) wdfile.variables['TFLAG'] checkv = array([ -1.73236704e+00, -1.99612117e+00, -3.00912833e+00, -3.92667437e+00, -3.49521232e+00, 2.04542422e+00, 8.57666790e-01, -1.71201074e+00, -4.24386787e+00, -5.37704515e+00, 1.85697508e+00, 6.34313405e-01, -1.21529281e+00, -3.03180861e+00, -4.36278439e+00, -1.90753967e-01, -1.08261776e+00, -1.73634803e+00, -2.10829663e+00, -2.28424144e+00, -1.88443780e+00, -2.02582169e+00, -3.09955978e+00, -4.14587784e+00, -3.72402787e+00, 2.16277528e+00, 8.94082963e-01, -1.86343944e+00, -4.58147812e+00, -5.81837606e+00, 1.97949493e+00, 6.12511635e-01, -1.35096896e+00, -3.25313163e+00, -4.67790413e+00, -1.89851984e-01, -1.16381800e+00, -1.84269297e+00, -2.21348834e+00, -2.40952253e+00, -2.04972148e+00, -2.11795568e+00, -3.06094027e+00, -4.11207581e+00, -3.74964952e+00, 2.09780049e+00, 8.01259458e-01, -1.90404522e+00, -4.59170580e+00, -5.83114100e+00, 1.97475648e+00, 5.54396451e-01, -1.41695607e+00, -3.28227353e+00, -4.67724609e+00, -1.94723800e-01, -1.18353117e+00, -1.86556363e+00, -2.22842574e+00, -2.42080784e+00, -1.65720737e+00, -1.58054411e+00, -2.25336742e+00, -3.06462526e+00, -2.47261453e+00, 1.37642264e+00, 1.16142654e+00, -6.82058990e-01, -2.68112469e+00, -3.38680530e+00, 1.80796599e+00, 1.48641026e+00, -1.71508826e-02, -1.68607295e+00, -2.89399385e+00, 3.40398103e-01, 3.25049832e-02, -5.91206312e-01, -1.19038010e+00, -1.52301860e+00, -1.83006203e+00, -1.74505961e+00, -2.50190806e+00, -3.29507184e+00, -2.65367699e+00, 1.55719578e+00, 1.25234461e+00, -9.14537191e-01, -3.16307521e+00, -4.00584650e+00, 2.07018161e+00, 1.60957754e+00, -1.46312386e-01, -2.04018188e+00, -3.40377665e+00, 4.11731720e-01, -2.29119677e-02, -7.27373540e-01, -1.35116744e+00, -1.70711970e+00, -1.72859466e+00, -1.73683071e+00, -2.65253377e+00, -3.43689489e+00, -2.75470304e+00, 1.58366191e+00, 1.19656324e+00, -1.09935236e+00, -3.38544369e+00, -4.26436615e+00, 2.04826832e+00, 1.53576791e+00, -2.60809243e-01, -2.18679833e+00, -3.59082842e+00, 3.77060443e-01, -1.05680525e-01, -8.10511589e-01, -1.40993130e+00, -1.76300752e+00], dtype='f').reshape(2, 3, 4, 5) self.assertTrue((wdfile.variables['V'][:] == checkv).all()) def testNCF2WD(self): import PseudoNetCDF.testcase from PseudoNetCDF.pncgen import pncgen import os inpath = PseudoNetCDF.testcase.camxfiles_paths['wind'] outpath = PseudoNetCDF.testcase.camxfiles_paths['wind'] + '.check' infile = wind(inpath, 4, 5) pncgen(infile, outpath, format='camxfiles.wind') orig = open(inpath, 'rb').read() new = open(outpath, 'rb').read() os.remove(outpath) assert (orig == new) # TestSuite = unittest.makeSuite(TestMemmap, 'test') if __name__ == '__main__': unittest.main()