__all__ = ['temperature']
__doc__ = """
.. _Memmap
:mod:`Memmap` -- temperature Memmap interface
=============================================
.. module:: Memmap
:platform: Unix, Windows
:synopsis: Provides :ref:`PseudoNetCDF` memory map for CAMx
temperature files. See PseudoNetCDF.sci_var.PseudoNetCDFFile
for interface details
.. moduleauthor:: Barron Henderson <barronh@unc.edu>
"""
# Distribution packages
import unittest
import struct
# Site-Packages
from numpy import zeros, array, memmap
# This Package modules
from PseudoNetCDF.sci_var import PseudoNetCDFFile, PseudoNetCDFVariable
from PseudoNetCDF.sci_var import PseudoNetCDFVariables
from PseudoNetCDF.ArrayTransforms import ConvertCAMxTime
# for use in identifying uncaught nan
listnan = struct.unpack('>f', b'\xff\xc0\x00\x00')[0]
checkarray = zeros((1,), 'f')
checkarray[0] = listnan
array_nan = checkarray[0]
[docs]
class temperature(PseudoNetCDFFile):
"""
temperature provides a PseudoNetCDF interface for CAMx
temperature files. Where possible, the inteface follows
IOAPI conventions (see www.baronams.com).
ex:
>>> temperature_path = 'camx_temperature.bin'
>>> rows,cols = 65,83
>>> temperaturefile = temperature(temperature_path,rows,cols)
>>> temperaturefile.variables.keys()
['TFLAG', 'AIRTEMP', 'SURFTEMP']
>>> tflag = temperaturefile.variables['TFLAG']
>>> tflag.dimensions
('TSTEP', 'VAR', 'DATE-TIME')
>>> tflag[0,0,:]
array([2005185, 0])
>>> tflag[-1,0,:]
array([2005185, 240000])
>>> v = temperaturefile.variables['SURFTEMP']
>>> v.dimensions
('TSTEP', 'ROW', 'COL')
>>> v.shape
(25, 65, 83)
>>> v = temperaturefile.variables['AIRTEMP']
>>> v.dimensions
('TSTEP', 'LAY', 'ROW', 'COL')
>>> v.shape
(25, 28, 65, 83)
>>> temperaturefile.dimensions
{'TSTEP': 25, 'LAY': 28, 'ROW': 65, 'COL': 83}
"""
id_fmt = 'fi'
data_fmt = 'f'
def __init__(self, rf, rows=None, cols=None):
self.__memmap = memmap(rf, '>f', 'r', offset=0)
rowsXcols = self.__memmap[0].view('i') // 4 - 2
record_length = rowsXcols + 4
records = self.__memmap.size // record_length
times = self.__memmap.reshape(records, record_length)[:, 1:3]
self.STIME, self.SDATE = times[0]
for i, (t, d) in enumerate(times):
if (t, d) != (self.STIME, self.SDATE):
break
self.SDATE = self.SDATE.view('i')
self.createDimension('LAY', i - 1)
self.createDimension('TSTEP', times.shape[0] / i)
if rows is None and cols is None:
rows = rowsXcols
cols = 1
elif rows is None:
rows = rowsXcols / cols
elif cols is None:
cols = rowsXcols / rows
else:
if cols * rows != rowsXcols:
raise ValueError(("The product of cols (%d) and rows (%d) " +
"must equal cells (%d)") % (
cols, rows, rowsXcols))
self.createDimension('ROW', rows)
self.createDimension('COL', cols)
self.createDimension('DATE-TIME', 2)
self.createDimension('VAR', 2)
self.NVARS = len(self.dimensions['VAR'])
self.NLAYS = len(self.dimensions['LAY'])
self.NROWS = len(self.dimensions['ROW'])
self.NCOLS = len(self.dimensions['COL'])
self.FTYPE = 1
self.variables = PseudoNetCDFVariables(
self.__var_get, ['AIRTEMP', 'SURFTEMP', 'TFLAG'])
def __var_get(self, key):
lays = len(self.dimensions['LAY'])
times = len(self.dimensions['TSTEP'])
rows = len(self.dimensions['ROW'])
cols = len(self.dimensions['COL'])
surf = 1
air = 2
time = 3
date = 4
out_idx = zeros(self.__memmap.shape, dtype='b').reshape(
times, lays + 1, rows * cols + 4)
out_idx[:, 0, 3:-1] = surf
out_idx[:, 1:, 3:-1] = air
out_idx[:, :, 1] = time
out_idx[:, :, 2] = date
out_idx = out_idx.ravel()
buf = self.__memmap[out_idx == 0].reshape((lays + 1) * times, 2)
if not (buf[:, 0] == buf[:, 1]).all():
raise ValueError("Buffer")
tmpvals = self.__memmap[out_idx == 1].reshape(times, rows, cols)
v = PseudoNetCDFVariable(self, 'SURFTEMP', 'f',
('TSTEP', 'ROW', 'COL'),
values=tmpvals)
self.variables['SURFTEMP'] = v
v.units = 'K'
v.long_name = 'SURFTEMP'
v.var_desc = 'SURFTEMP'
tmpvals = self.__memmap[out_idx == 2].reshape(times, lays, rows, cols)
v = PseudoNetCDFVariable(self, 'AIRTEMP', 'f',
('TSTEP', 'LAY', 'ROW', 'COL'),
values=tmpvals)
self.variables['AIRTEMP'] = v
v.units = 'K'
v.long_name = 'AIRTEMP'
v.var_desc = 'AIRTEMP'
date = self.__memmap[out_idx == date].view('>i')[0:None:lays + 1]
time = self.__memmap[out_idx == time].view('>f')[0:None:lays + 1]
tmpvals = ConvertCAMxTime(date, time, 2)
v = PseudoNetCDFVariable(self, 'TFLAG', 'f',
('TSTEP', 'VAR', 'DATE-TIME'),
values=tmpvals)
self.variables['TFLAG'] = v
return self.variables[key]
class TestMemmap(unittest.TestCase):
def runTest(self):
pass
def setUp(self):
pass
def testTEMP(self):
import PseudoNetCDF.testcase
tempfile = temperature(
PseudoNetCDF.testcase.camxfiles_paths['temperature'], 4, 5)
tempfile.variables['TFLAG']
checkat = array([2.97762360e+02, 2.97261993e+02, 3.00761200e+02,
3.03811005e+02, 3.04561218e+02, 2.96350311e+02,
2.96676544e+02, 3.00992096e+02, 3.05474762e+02,
3.07840637e+02, 2.99522430e+02, 3.00271698e+02,
3.03738403e+02, 3.07201843e+02, 3.08288422e+02,
3.02957214e+02, 3.04927643e+02, 3.06630157e+02,
3.07726074e+02, 3.07380707e+02, 2.97516449e+02,
2.96920105e+02, 3.00340576e+02, 3.03413177e+02,
3.04202728e+02, 2.96074036e+02, 2.96250641e+02,
3.00632294e+02, 3.05113647e+02, 3.07390533e+02,
2.99310059e+02, 2.99901031e+02, 3.03344666e+02,
3.06782135e+02, 3.07819946e+02, 3.02657013e+02,
3.04522675e+02, 3.06167206e+02, 3.07235107e+02,
3.06883484e+02, 2.97677338e+02, 2.96919098e+02,
3.00031250e+02, 3.03082672e+02, 3.03850861e+02,
2.96460999e+02, 2.95947815e+02, 3.00303680e+02,
3.04781982e+02, 3.07048492e+02, 2.99246979e+02,
2.99508667e+02, 3.02997650e+02, 3.06450500e+02,
3.07478485e+02, 3.02246765e+02, 3.04192139e+02,
3.05832489e+02, 3.06897644e+02, 3.06546173e+02,
2.97428253e+02, 2.97174896e+02, 3.00208191e+02,
3.03096893e+02, 3.04174133e+02, 2.96558685e+02,
2.96706177e+02, 3.00862610e+02, 3.04807037e+02,
3.06937347e+02, 2.98850220e+02, 2.99482727e+02,
3.03085022e+02, 3.06456787e+02, 3.07406586e+02,
3.01888580e+02, 3.03996735e+02, 3.05916962e+02,
3.07113647e+02, 3.06539337e+02, 2.97645966e+02,
2.97326630e+02, 3.00117950e+02, 3.02804077e+02,
3.03801544e+02, 2.96783661e+02, 2.96694946e+02,
3.00722931e+02, 3.04501587e+02, 3.06560150e+02,
2.98854828e+02, 2.99314972e+02, 3.02861023e+02,
3.06150177e+02, 3.07073944e+02, 3.01700745e+02,
3.03746124e+02, 3.05626617e+02, 3.06770447e+02,
3.06172394e+02, 2.97927094e+02, 2.97691681e+02,
3.00104675e+02, 3.02464874e+02, 3.03398926e+02,
2.97336578e+02, 2.97074127e+02, 3.00716736e+02,
3.04132446e+02, 3.06129700e+02, 2.98817017e+02,
2.99221039e+02, 3.02649536e+02, 3.05787415e+02,
3.06698334e+02, 3.01333618e+02, 3.03411346e+02,
3.05317505e+02, 3.06446869e+02,
3.05815948e+02], dtype='f').reshape(2, 3, 4, 5)
self.assertTrue((tempfile.variables['AIRTEMP'] == checkat).all())
def testNCF2TEMP(self):
import PseudoNetCDF.testcase
from PseudoNetCDF.pncgen import pncgen
import os
inpath = PseudoNetCDF.testcase.camxfiles_paths['temperature']
outpath = inpath + '.check'
infile = temperature(inpath, 4, 5)
pncgen(infile, outpath, format='camxfiles.temperature')
orig = open(inpath, 'rb').read()
new = open(outpath, 'rb').read()
assert (orig == new)
os.remove(outpath)
if __name__ == '__main__':
unittest.main()