__all__ = ['landuse']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- landuse Memmap interface
============================================
.. module:: Memmap
:platform: Unix, Windows
:synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
landuse files. See PseudoNetCDF.sci_var.PseudoNetCDFFile
for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""
# Distribution packages
import unittest
import struct
# Site-Packages
from numpy import zeros, array, memmap, dtype
# This Package modules
from PseudoNetCDF.camxfiles.FortranFileUtil import OpenRecordFile
from PseudoNetCDF.sci_var import PseudoNetCDFFile
# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1, ), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]
[docs]
class landuse(PseudoNetCDFFile):
"""
landuse provides a PseudoNetCDF interface for CAMx
landuse files. Where possible, the inteface follows
IOAPI conventions (see www.baronams.com).
ex:
>>> landuse_path = 'camx_landuse.bin'
>>> rows, cols = 65, 83
>>> landusefile = landuse(landuse_path, rows, cols)
>>> landusefile.variables.keys()
['TFLAG', 'FLAND', 'TOPO']
>>> tflag = landusefile.variables['TFLAG']
>>> tflag.dimensions
('TSTEP', 'VAR', 'DATE-TIME')
>>> tflag[0, 0, :]
array([2005185, 0])
>>> tflag[-1, 0, :]
array([2005185, 240000])
>>> v = landusefile.variables['FLAND']
>>> v.dimensions
('LANDUSE', 'ROW', 'COL')
>>> v.shape
(25, 28, 65, 83)
>>> landusefile.dimensions
{'LANDUSE': 11, 'ROW': 65, 'COL': 83, 'VAR': 2}
"""
def __init__(self, rf, rows, cols, mode='r'):
self.__mode = mode
self._rffile = OpenRecordFile(rf)
self._rffile.infile.seek(0, 2)
self.__rf = rf
self._rffile._newrecord(0)
self.createDimension('ROW', rows)
self.createDimension('COL', cols)
first_line, = self._rffile.read('8s')
if first_line == 'LUCAT11 ':
self.createDimension('LANDUSE', 11)
self._newstyle = True
elif first_line == 'LUCAT26 ':
self.createDimension('LANDUSE', 26)
self._newstyle = True
else:
self.createDimension('LANDUSE', 11)
self._newstyle = False
nland = len(self.dimensions['LANDUSE'])
nrows = len(self.dimensions['ROW'])
ncols = len(self.dimensions['COL'])
if self._newstyle:
self.__fland_dtype = dtype(dict(
names=['SPAD1', 'KEY', 'EPAD1', 'SPAD2', 'DATA', 'EPAD2'],
formats=['>i', '8>S', '>i', '>i',
'(%d, %d, %d)>f' % (nland, nrows, ncols), '>i']))
self.__other_dtype = dtype(dict(
names=['SPAD1', 'KEY', 'EPAD1', 'SPAD2', 'DATA', 'EPAD2'],
formats=['>i', '8>S', '>i', '>i',
'(%d, %d)>f' % (nrows, ncols), '>i']))
else:
self.__fland_dtype = dtype(dict(
names=['SPAD2', 'DATA', 'EPAD2'],
formats=['>i', '(%d, %d, %d)>f' % (nland, nrows, ncols),
'>i']))
self.__other_dtype = dtype(dict(
names=['SPAD2', 'DATA', 'EPAD2'],
formats=['>i', '(%d, %d)>f' % (nrows, ncols), '>i']))
self.__addvars()
if self._newstyle:
self.__keys = [first_line]
else:
self.__keys = ['LUCAT11']
def __addvars(self):
self._rffile.infile.seek(0, 2)
rflen = self._rffile.infile.tell()
fland_dtype = self.__fland_dtype
other_dtype = self.__other_dtype
nfland = fland_dtype.itemsize
nfland1opt = nfland + other_dtype.itemsize
nfland2opt = nfland + other_dtype.itemsize * 2
if rflen == nfland:
file_dtype = dtype(dict(names=['FLAND'], formats=[fland_dtype]))
elif rflen == nfland1opt:
file_dtype = dtype(dict(
names=['FLAND', 'VAR1'],
formats=[fland_dtype, other_dtype]))
elif rflen == nfland2opt:
file_dtype = dtype(dict(
names=['FLAND', 'LAI', 'TOPO'],
formats=[fland_dtype, other_dtype, other_dtype]))
else:
raise IOError('File size is expected to be ' +
'%d, %d, or %d; was %d' %
(nfland, nfland1opt, nfland2opt, rflen))
data = memmap(self.__rf, mode=self.__mode, dtype=file_dtype, offset=0)
if not self._newstyle:
varkeys = ['FLAND', 'TOPO']
else:
varkeys = [data[k]['KEY'][0].strip() for k in file_dtype.names]
varkeys = [k.decode() if hasattr(k, 'decode')
else k for k in varkeys]
for varkey, dkey in zip(varkeys, file_dtype.names):
var = self.createVariable(varkey, 'f', {'FLAND': (
'LANDUSE', 'ROW', 'COL')}.get(dkey, ('ROW', 'COL')))
var[:] = data[dkey]['DATA']
var.var_desc = varkey.ljust(16)
var.units = {'FLAND': 'Fraction'}.get(dkey, '')
var.long_name = varkey.ljust(16)
class TestMemmap(unittest.TestCase):
def runTest(self):
pass
def setUp(self):
pass
def testLU(self):
from numpy import testing
import PseudoNetCDF.testcase
aassert = testing.assert_array_almost_equal
lufile = landuse(
PseudoNetCDF.testcase.camxfiles_paths['landuse'], 4, 5)
checkv = array([
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 6.25000000e-02, 0.00000000e+00, 6.25000000e-02,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 6.25000000e-02, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
1.00000000e+00, 9.37500000e-01, 1.00000000e+00, 9.37500000e-01,
1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00,
1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00,
1.00000000e+00, 9.37500000e-01, 6.87500000e-01, 1.00000000e+00,
1.00000000e+00, 1.00000000e+00, 1.00000000e+00, 1.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 6.25000000e-02, 2.50000000e-01, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 0.00000000e+00],
dtype='f').reshape(11, 4, 5)
aassert(lufile.variables['LUCAT11'], checkv)
def testNCF2LU(self):
import PseudoNetCDF.testcase
from PseudoNetCDF.pncgen import pncgen
import os
inpath = PseudoNetCDF.testcase.camxfiles_paths['landuse']
outpath = PseudoNetCDF.testcase.camxfiles_paths['landuse'] + '.check'
infile = landuse(inpath, 4, 5)
pncgen(infile, outpath, format='camxfiles.landuse')
orig = open(inpath, 'rb').read()
new = open(outpath, 'rb').read()
assert (orig == new)
os.remove(outpath)
if __name__ == '__main__':
unittest.main()