Source code for PseudoNetCDF.camxfiles.cloud_rain.Memmap

__all__ = ['cloud_rain']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- cloud_rain Memmap interface
============================================

.. module:: Memmap
   :platform: Unix, Windows
   :synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
              cloud/rain files.  See PseudoNetCDF.sci_var.PseudoNetCDFFile
              for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""

# Distribution packages
import unittest
import struct
from warnings import warn

# Site-Packages
from numpy import zeros, array, memmap
import numpy as np

# This Package modules
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime

# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1, ), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]


[docs] class cloud_rain(PseudoNetCDFFile): """ cloud_rain provides a PseudoNetCDF interface for CAMx cloud_rain files. Where possible, the inteface follows IOAPI conventions (see www.baronams.com). ex: >>> cloud_rain_path = 'cloud_rain.bin' >>> rows, cols = 65, 83 >>> cloud_rainfile = cloud_rain(cloud_rain_path, rows, cols) >>> cloud_rainfile.variables.keys() ['CLOUD', 'RAIN', 'SNOW', 'GRAUPEL', 'COD', 'TFLAG'] >>> v = cloud_rainfile.variables['CLOUD'] >>> tflag = cloud_rainfile.variables['TFLAG'] >>> tflag.dimensions ('TSTEP', 'VAR', 'DATE-TIME') >>> tflag[0, 0, :] array([2005185, 0]) >>> tflag[-1, 0, :] array([2005185, 240000]) >>> v.dimensions ('TSTEP', 'LAY', 'ROW', 'COL') >>> v.shape (25, 28, 65, 83) >>> cloud_rainfile.dimensions {'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83} """ def __init__(self, rf, rows=None, cols=None): f = open(rf, 'rb') f.seek(0, 2) flen = f.tell() offset = struct.unpack('>i', open(rf, 'rb').read(4))[0] + 8 self.__memmap = memmap(rf, dtype='>f', mode='r', offset=offset) cldhdrlen = offset - 20 line1fmt = '>i%dciiii' % cldhdrlen ncols, nrows, nlays = struct.unpack(line1fmt, open(rf, 'rb').read(offset))[-4:-1] self.createDimension('COL', ncols) self.createDimension('ROW', nrows) self.createDimension('LAY', nlays) mydt = '>i4,S%d,>i4,>i4,>i4,>i4,>i4,>f4,>i4' % cldhdrlen header = np.fromfile(rf, dtype=mydt, count=1)[0] self.FILEDESC = ''.join(header[1].decode()) self.STIME, self.SDATE = header.tolist()[-2:] if self.SDATE < 10000: self.SDATE += 2000000 if (((ncols != cols and cols is not None) or (rows != rows and rows is not None))): warn(('Files says cols = %d, rows = %d, and lays = %d; ' + 'you said cols = %d and rows = %d') % ( ncols, nrows, nlays, cols, rows)) self.createDimension('DATE-TIME', 2) datasize = (flen - offset) # Try 5 first (contemporary) # Try 3 second (old) # end on 5 as a failsafe for nvars in [5, 3, 5]: timesize = (nvars * nlays * (nrows * ncols + 2) * 4 + 16) if (datasize % timesize) == 0: break else: warn( 'File appears incomplete using 3 (v4.2) or 5 ' + 'variables (>=v4.3); expected to fail' ) if nvars < 5: self.VERSION = '<4.3' varkeys = ['CLOUD', 'PRECIP', 'COD', 'TFLAG'] else: self.VERSION = '>=4.3' varkeys = ['CLOUD', 'RAIN', 'SNOW', 'GRAUPEL', 'COD', 'TFLAG'] ntimes = datasize // timesize self.createDimension('TSTEP', ntimes) self.createDimension('VAR', len(varkeys) - 1) self.NVARS = len(self.dimensions['VAR']) self.NLAYS = len(self.dimensions['LAY']) self.NROWS = len(self.dimensions['ROW']) self.NCOLS = len(self.dimensions['COL']) self.FTYPE = 1 self.variables = PseudoNetCDFVariables(self.__var_get, varkeys) self.SDATE, self.STIME = self.variables['TFLAG'][0, 0, :] def __set_var(self, key, vals_idx): times = len(self.dimensions['TSTEP']) lays = len(self.dimensions['LAY']) rows = len(self.dimensions['ROW']) cols = len(self.dimensions['COL']) vals = self.__memmap[vals_idx].reshape(times, lays, rows, cols) v = PseudoNetCDFVariable(self, key, 'f', ('TSTEP', 'LAY', 'ROW', 'COL'), values=vals) v.units = {'COD': 'None'}.get(key, 'g/m**3') v.long_name = key v.var_desc = key self.variables[key] = v def __var_get(self, key): times = len(self.dimensions['TSTEP']) rows = len(self.dimensions['ROW']) cols = len(self.dimensions['COL']) lays = len(self.dimensions['LAY']) vars = len(list(self.variables.keys())) - 1 hour = 1 date = 2 cloud = 3 rain = 4 snow = 5 graupel = 6 cod = 7 # stagger = 8 out_idx = zeros(self.__memmap.shape, dtype='b') out_idx.reshape(times, lays * vars * (rows * cols + 2) + 4)[:, 1] = hour out_idx.reshape(times, lays * vars * (rows * cols + 2) + 4)[:, 2] = date dateblock = self.__memmap[out_idx == date].view('>i') hourblock = self.__memmap[out_idx == hour] nvars = len(self.dimensions['VAR']) self.variables['TFLAG'] = ConvertCAMxTime(dateblock, hourblock, nvars) newshape1 = (times, lays * vars * (rows * cols + 2) + 4) newshape2 = (times, lays, vars, rows * cols + 2) newshape3 = (times, lays, vars, rows, cols) val_shape = out_idx.reshape(*newshape1)[:, 4:] val_shape = val_shape.reshape(*newshape2)[:, :, :, 1:-1] val_shape = val_shape.reshape(*newshape3) if self.VERSION == '<4.3': val_shape[:, :, 0, :, :] = cloud val_shape[:, :, 1, :, :] = rain val_shape[:, :, 2, :, :] = cod self.__set_var('CLOUD', out_idx == cloud) self.__set_var('PRECIP', out_idx == rain) self.__set_var('COD', out_idx == cod) else: val_shape[:, :, 0, :, :] = cloud val_shape[:, :, 1, :, :] = rain val_shape[:, :, 2, :, :] = snow val_shape[:, :, 3, :, :] = graupel val_shape[:, :, 4, :, :] = cod self.__set_var('CLOUD', out_idx == cloud) self.__set_var('RAIN', out_idx == rain) self.__set_var('SNOW', out_idx == snow) self.__set_var('GRAUPEL', out_idx == graupel) self.__set_var('COD', out_idx == cod) buf = self.__memmap[out_idx == 0].reshape( vars * times * lays + times, 2) if not (buf[:, 0] == buf[:, 1]).all(): raise ValueError("Buffer") return self.variables[key]
class TestMemmap(unittest.TestCase): def runTest(self): pass def setUp(self): pass def testCR(self): import PseudoNetCDF.testcase crfile = cloud_rain( PseudoNetCDF.testcase.camxfiles_paths['cloud_rain'], 4, 5) crfile.variables['TFLAG'] checkv = array([1.25412483e+01, 1.77024829e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.38041372e+01, 1.94885385e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.41415815e+01, 1.67501605e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.34467077e+01, 1.99459922e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.25412483e+01, 1.77024829e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.38041372e+01, 1.94885385e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.41415815e+01, 1.67501605e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.34467077e+01, 1.99459922e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.25412483e+01, 1.77024829e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.38041372e+01, 1.94885385e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.41415815e+01, 1.67501605e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.34467077e+01, 1.99459922e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.96655331e+01, 2.05677104e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.14273071e+01, 2.09934115e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.21391239e+01, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.26519203e+01, 4.96763992e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.96655331e+01, 2.05677104e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.14273071e+01, 2.09934115e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.21391239e+01, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.26519203e+01, 4.96763992e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.96655331e+01, 2.05677104e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.14273071e+01, 2.09934115e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.21391239e+01, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 2.26519203e+01, 4.96763992e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00], dtype='f').reshape(2, 3, 4, 5) self.assertTrue((crfile.variables['COD'] == checkv).all()) def testNCF2CR(self): import PseudoNetCDF.testcase from PseudoNetCDF.pncgen import pncgen import os inpath = PseudoNetCDF.testcase.camxfiles_paths['cloud_rain'] outpath = inpath + '.check' infile = cloud_rain(inpath, 4, 5) pncgen(infile, outpath, format='camxfiles.cloud_rain') orig = open(inpath, 'rb').read() new = open(outpath, 'rb').read() assert (orig == new) os.remove(outpath) if __name__ == '__main__': unittest.main()