Source code for PseudoNetCDF.test.test_core

import unittest
import numpy as np
from PseudoNetCDF import PseudoNetCDFFile, PseudoNetCDFVariables
from PseudoNetCDF import PseudoNetCDFVariable, pncopen
from . import requires_basemap, requires_pyproj, requires_matplotlib
from . import compare_files
from PseudoNetCDF.pncwarn import warn


np_all_close = np.testing.assert_allclose


[docs] class PseudoNetCDFVariableTest(unittest.TestCase):
[docs] def setUp(self): self.myarray = np.array([ [1, 1, 1, 1, 1], [1, 2, 2, 2, 1], [1, 2, 3, 2, 1], [1, 2, 2, 2, 1], [1, 1, 1, 1, 1], ], dtype='f')
[docs] def testVar(self): var = PseudoNetCDFVariable( None, self.myarray, 'f', ('y', 'x'), values=self.myarray, units='unknown', long_name='unknown' ) assert ( var.getncatts() == { 'units': 'unknown', 'long_name': 'unknown', } ) assert (var.dtype.char == 'f') assert (var.dimensions == ('y', 'x')) np_all_close(var[:], self.myarray)
[docs] def testFromArray(self): var = PseudoNetCDFVariable.from_array( 'unknown', self.myarray, ('y', 'x'), units='unknown', long_name='unknown' ) assert ( var.getncatts() == { 'units': 'unknown', 'long_name': 'unknown', } ) assert (var.dtype.char == 'f') assert (var.dimensions == ('y', 'x')) np_all_close(var[:], self.myarray)
[docs] class PseudoNetCDFFileTest(unittest.TestCase):
[docs] def setUp(self): from datetime import datetime, timedelta self.testncf = self._makencf() self.mymeta = set([ 'time', 'time_bounds', 'latitude', 'longitude', 'latitude_bounds', 'longitude_bounds', 'lambert_conformal_conic' ]) self.myvars = self.mymeta.union(['O3']) self.mydims = ['TIME', 'LAY', 'ROW', 'COL', 'nv', 'tnv'] rtime = datetime.strptime( '1970-01-01 00:00:00+0000', '%Y-%m-%d %H:%M:%S%z') self.mytimes = np.array([rtime + timedelta(hours=i) for i in range(24)])
def _makencf(self): from numpy import arange tncf = PseudoNetCDFFile() tncf.createDimension('TSTEP', 24) tncf.createDimension('LAY', 4) tncf.createDimension('ROW', 5) tncf.createDimension('COL', 6) tncf.createDimension('nv', 4) tncf.createDimension('tnv', 2) tncf.str_one = '1' tncf.int_two = 2 tncf.float_threeptfive = 3.5 tncf.Conventions = 'CF-1.6' o3 = tncf.createVariable('O3', 'f', ('TSTEP', 'LAY', 'ROW', 'COL')) o3[:] = arange(24 * 4 * 5 * 6).reshape(24, 4, 5, 6) o3.units = 'ppbv' o3.grid_mapping = 'lambert_conformal_conic' time = tncf.createVariable('time', 'd', ('TSTEP',)) time.long_name = 'time' time.units = 'hours since 1970-01-01 00:00:00+0000' time[:] = np.arange(24) timeb = tncf.createVariable('time_bounds', 'd', ('TSTEP', 'tnv')) timeb.long_name = 'time_bounds' timeb.units = 'hours since 1970-01-01 00:00:00+0000' timeb[:, 0] = np.arange(0, 24) timeb[:, 1] = np.arange(1, 25) crs = tncf.createVariable('lambert_conformal_conic', 'i', ()) crs.grid_mapping_name = 'lambert_conformal_conic' crs.standard_parallel = np.array([30., 45.]) crs.longitude_of_central_meridian = -97. crs.latitude_of_projection_origin = 40. crs.false_northing = 1620000. crs.false_easting = 2412000. crs.semi_major_axis = 6371000. crs.semi_minor_axis = 6371000. lat = tncf.createVariable('latitude', 'f', ('ROW', 'COL')) lat.long_name = 'latitude' lat.units = 'degrees_north' lon = tncf.createVariable('longitude', 'f', ('ROW', 'COL')) lon.long_name = 'longitude' lon.units = 'degrees_east' latb = tncf.createVariable( 'latitude_bounds', 'f', ('ROW', 'COL', 'nv')) latb.long_name = 'latitude_bounds' latb.units = 'degrees_north' lonb = tncf.createVariable( 'longitude_bounds', 'f', ('ROW', 'COL', 'nv')) lonb.long_name = 'longitude_bounds' lonb.units = 'degrees_east' lon[:] = [ [-120.21161038333193, -120.21160114763147, -120.21159191193058, -120.21158267622918, -120.21157344052737, -120.21156420482505], [-120.21161271536134, -120.21160347966001, -120.21159424395826, -120.21158500825604, -120.21157577255335, -120.21156653685021], [-120.21161504739118, -120.21160581168901, -120.21159657598642, -120.21158734028334, -120.2115781045798, -120.2115688688758], [-120.21161737942151, -120.21160814371851, -120.21159890801503, -120.21158967231109, -120.21158043660672, -120.21157120090189], [-120.21161971145229, -120.21161047574842, -120.21160124004409, -120.21159200433934, -120.21158276863409, -120.21157353292838]] lat[:] = [ [22.748507533242535, 22.748509683865187, 22.74851183448702, 22.74851398510802, 22.748516135728206, 22.748518286347593], [22.74851605050742, 22.748518201130356, 22.748520351752475, 22.74852250237377, 22.74852465299425, 22.748526803613903], [22.74852456777256, 22.748526718395773, 22.748528869018187, 22.748531019639763, 22.748533170260536, 22.74853532088048], [22.748533085037966, 22.74853523566145, 22.74853738628417, 22.748539536906023, 22.7485416875271, 22.748543838147327], [22.74854160230359, 22.74854375292739, 22.748545903550376, 22.748548054172538, 22.748550204793883, 22.7485523554144]] lonb[:] = [ [[-120.21161038333193, -120.21160114763147, -120.21160347966001, -120.21161271536134], [-120.21160114763147, -120.21159191193058, -120.21159424395826, -120.21160347966001], [-120.21159191193058, -120.21158267622918, -120.21158500825604, -120.21159424395826], [-120.21158267622918, -120.21157344052737, -120.21157577255335, -120.21158500825604], [-120.21157344052737, -120.21156420482505, -120.21156653685021, -120.21157577255335], [-120.21156420482505, -120.2115549691223, -120.2115573011466, -120.21156653685021]], [[-120.21161271536134, -120.21160347966001, -120.21160581168901, -120.21161504739118], [-120.21160347966001, -120.21159424395826, -120.21159657598642, -120.21160581168901], [-120.21159424395826, -120.21158500825604, -120.21158734028334, -120.21159657598642], [-120.21158500825604, -120.21157577255335, -120.2115781045798, -120.21158734028334], [-120.21157577255335, -120.21156653685021, -120.2115688688758, -120.2115781045798], [-120.21156653685021, -120.2115573011466, -120.21155963317135, -120.2115688688758]], [[-120.21161504739118, -120.21160581168901, -120.21160814371851, -120.21161737942151], [-120.21160581168901, -120.21159657598642, -120.21159890801503, -120.21160814371851], [-120.21159657598642, -120.21158734028334, -120.21158967231109, -120.21159890801503], [-120.21158734028334, -120.2115781045798, -120.21158043660672, -120.21158967231109], [-120.2115781045798, -120.2115688688758, -120.21157120090189, -120.21158043660672], [-120.2115688688758, -120.21155963317135, -120.21156196519657, -120.21157120090189]], [[-120.21161737942151, -120.21160814371851, -120.21161047574842, -120.21161971145229], [-120.21160814371851, -120.21159890801503, -120.21160124004409, -120.21161047574842], [-120.21159890801503, -120.21158967231109, -120.21159200433934, -120.21160124004409], [-120.21158967231109, -120.21158043660672, -120.21158276863409, -120.21159200433934], [-120.21158043660672, -120.21157120090189, -120.21157353292838, -120.21158276863409], [-120.21157120090189, -120.21156196519657, -120.21156429722222, -120.21157353292838]], [[-120.21161971145229, -120.21161047574842, -120.21161280777879, -120.2116220434835], [-120.21161047574842, -120.21160124004409, -120.21160357207363, -120.21161280777879], [-120.21160124004409, -120.21159200433934, -120.21159433636801, -120.21160357207363], [-120.21159200433934, -120.21158276863409, -120.21158510066192, -120.21159433636801], [-120.21158276863409, -120.21157353292838, -120.21157586495535, -120.21158510066192], [-120.21157353292838, -120.21156429722222, -120.21156662924835, -120.21157586495535]]] latb[:] = [ [[22.748507533242535, 22.748509683865187, 22.748518201130356, 22.74851605050742], [22.748509683865187, 22.74851183448702, 22.748520351752475, 22.748518201130356], [22.74851183448702, 22.74851398510802, 22.74852250237377, 22.748520351752475], [22.74851398510802, 22.748516135728206, 22.74852465299425, 22.74852250237377], [22.748516135728206, 22.748518286347593, 22.748526803613903, 22.74852465299425], [22.748518286347593, 22.748520436966125, 22.748528954232754, 22.748526803613903]], [[22.74851605050742, 22.748518201130356, 22.748526718395773, 22.74852456777256], [22.748518201130356, 22.748520351752475, 22.748528869018187, 22.748526718395773], [22.748520351752475, 22.74852250237377, 22.748531019639763, 22.748528869018187], [22.74852250237377, 22.74852465299425, 22.748533170260536, 22.748531019639763], [22.74852465299425, 22.748526803613903, 22.74853532088048, 22.748533170260536], [22.748526803613903, 22.748528954232754, 22.748537471499613, 22.74853532088048]], [[22.74852456777256, 22.748526718395773, 22.74853523566145, 22.748533085037966], [22.748526718395773, 22.748528869018187, 22.74853738628417, 22.74853523566145], [22.748528869018187, 22.748531019639763, 22.748539536906023, 22.74853738628417], [22.748531019639763, 22.748533170260536, 22.7485416875271, 22.748539536906023], [22.748533170260536, 22.74853532088048, 22.748543838147327, 22.7485416875271], [22.74853532088048, 22.748537471499613, 22.748545988766764, 22.748543838147327]], [[22.748533085037966, 22.74853523566145, 22.74854375292739, 22.74854160230359], [22.74853523566145, 22.74853738628417, 22.748545903550376, 22.74854375292739], [22.74853738628417, 22.748539536906023, 22.748548054172538, 22.748545903550376], [22.748539536906023, 22.7485416875271, 22.748550204793883, 22.748548054172538], [22.7485416875271, 22.748543838147327, 22.7485523554144, 22.748550204793883], [22.748543838147327, 22.748545988766764, 22.748554506034104, 22.7485523554144]], [[22.74854160230359, 22.74854375292739, 22.74855227019359, 22.748550119569494], [22.74854375292739, 22.748545903550376, 22.748554420816852, 22.74855227019359], [22.748545903550376, 22.748548054172538, 22.74855657143929, 22.748554420816852], [22.748548054172538, 22.748550204793883, 22.74855872206093, 22.74855657143929], [22.748550204793883, 22.7485523554144, 22.748560872681754, 22.74855872206093], [22.7485523554144, 22.748554506034104, 22.748563023301763, 22.748560872681754]]] return tncf
[docs] def testVal2idxBounds(self): lon = np.linspace(-179.5, 179.5, 360) f = PseudoNetCDFFile.from_arrays( lon=lon, dims=('lon',), attrs=dict(bounds='lon_bounds') ) with self.assertRaises(ValueError): f.val2idx('lon', -179.9, bounds='error') with self.assertRaises(ValueError): f.val2idx('lon', 179.9, bounds='error') iidx = f.val2idx('lon', -179.9, bounds='ignore') assert (iidx == 0) f.createVariable( 'lon_bounds', 'd', ('lon', 'nv'), values=np.array([lon - 0.5, lon + 0.5]).T ) iidx = f.val2idx('lon', -179.9, bounds='error') assert (iidx == 0) f.createVariable( 'lon_bounds', 'd', ('lonedges'), values=np.linspace(-180, 180, 361) ) iidx = f.val2idx('lon', -179.9, bounds='error') assert (iidx == 0)
[docs] def testVal2idx(self): ncf = PseudoNetCDFFile() coorde = np.arange(9, dtype='f') coordc = (coorde[:-1] + coorde[1:]) / 2. ncf.createDimension('coord', coordc.size) ncf.createDimension('nv', 2) ncf.createVariable( 'coord', 'f', ('coord',), values=coordc ) bncf = ncf.copy() bncf.createVariable( 'coord_bounds', 'f', ('coord', 'nv'), values=coorde.repeat(2, 0)[1:-1].reshape(-1, 2) ) bncf.variables['coord'].bounds = 'coord_bounds' cvals = [-1, .25, 4.5, 4.99, 5, 6.75, 10] expectedb = np.array([0, 0, 4, 4, 5, 6, 7]) expectedn = np.array([0, 0, 4, 4, 4, 6, 7]) def checkvals(ncf, method, clean, compare): withbnds = str('coord_bounds' in ncf.variables) prefix = withbnds + '&' + method + '&' + clean idx = ncf.val2idx( 'coord', cvals, method=method, clean=clean, bounds='warn' ) warn(prefix + ' got: ' + repr(idx)) warn(prefix + ' chk: ' + repr(compare)) assert (np.ma.allclose(compare, idx)) mw = np.ma.masked_where nn_mask = [0] * 7 bn_mask = [0] * 7 nm_mask = [1, 1, 0, 0, 0, 0, 1] bm_mask = [1, 0, 0, 0, 0, 0, 1] em_mask = [1, 1, 0, 1, 1, 1, 1] checkvals(ncf, 'nearest', 'none', mw(nn_mask, expectedn)) checkvals(ncf, 'bounds', 'none', mw(bn_mask, expectedb)) checkvals(ncf, 'nearest', 'mask', mw(nm_mask, expectedn)) checkvals(ncf, 'bounds', 'mask', mw(bm_mask, expectedb)) checkvals(bncf, 'nearest', 'none', mw(nn_mask, expectedn)) checkvals(bncf, 'bounds', 'none', mw(bn_mask, expectedb)) checkvals(bncf, 'nearest', 'mask', mw(nm_mask, expectedn)) checkvals(bncf, 'bounds', 'mask', mw(bm_mask, expectedb)) checkvals(bncf, 'exact', 'mask', mw(em_mask, expectedb))
[docs] def testCopyVariable(self): tncf = self.testncf var = tncf.copyVariable( tncf.variables['O3'], key='O3_PPB', withdata=True) self.assertEqual(True, (var[:] == tncf.variables['O3_PPB']).all())
[docs] def testSubsetVariables(self): tncf = self.testncf.copy() var = tncf.copyVariable( tncf.variables['O3'], key='O3_PPB', withdata=True) var[:] *= 1e3 var = tncf.copyVariable( tncf.variables['O3'], key='O3_PPT', withdata=True) var[:] *= 1e6 sncf = tncf.subsetVariables(['O3_PPT']) self.assertEqual(len(sncf.variables), 1) self.assertEqual(set(sncf.variables), set(['O3_PPT'])) sncf = tncf.subsetVariables(['O3_PPT'], exclude=True) self.assertEqual(len(sncf.variables), len( self.myvars.union(['O3_PPB']))) self.assertEqual(set(sncf.variables), self.myvars.union(['O3_PPB']))
[docs] def testRenameVariables(self): tncf = self.testncf sncf = tncf.renameVariables(O3='O3_PPM') self.assertEqual(len(sncf.variables), len(self.myvars)) self.assertEqual(set(sncf.variables), self.mymeta.union(['O3_PPM']))
[docs] def testRenameDimensions(self): tncf = self.testncf sncf = tncf.renameDimensions(TSTEP='TIME') self.assertEqual(len(sncf.dimensions), len(tncf.dimensions)) self.assertEqual(set(sncf.dimensions), set(self.mydims))
[docs] def testSliceDimensionInt(self): tncf = self.testncf o3 = tncf.variables['O3'][:] sncf = tncf.sliceDimensions(TSTEP=0) self.assertEqual(len(sncf.dimensions['TSTEP']), 1) self.assertEqual( True, (sncf.variables['O3'][:] == o3[0]).all())
[docs] def testSliceDimensionList(self): tncf = self.testncf o3 = tncf.variables['O3'][:] sncf = tncf.sliceDimensions(TSTEP=[0]) self.assertEqual(len(sncf.dimensions['TSTEP']), 1) self.assertEqual( True, (sncf.variables['O3'][:] == o3[0]).all()) sncf = tncf.sliceDimensions(TSTEP=[0, 8]) self.assertEqual(len(sncf.dimensions['TSTEP']), 2) self.assertEqual( True, (sncf.variables['O3'][:] == o3[[0, 8]]).all())
[docs] def testSliceDimensionComboListInt(self): tncf = self.testncf o3 = tncf.variables['O3'][:] sncf = tncf.sliceDimensions(TSTEP=[0, 8], ROW=2, COL=3) self.assertEqual(len(sncf.dimensions['TSTEP']), 2) self.assertEqual(len(sncf.dimensions['ROW']), 1) self.assertEqual(len(sncf.dimensions['COL']), 1) self.assertEqual(True, (sncf.variables['O3'][:] == o3[[ 0, 8], :, 2, 3][:, :, None, None]).all())
[docs] def testSliceDimensionMultiArray(self): tncf = self.testncf o3 = tncf.variables['O3'][:] i = np.arange(4) sncf = tncf.sliceDimensions(TSTEP=i, LAY=i, ROW=i, COL=i) self.assertEqual(len(sncf.dimensions['POINTS']), 4) self.assertEqual( True, (sncf.variables['O3'][:] == o3[i, i, i, i]).all())
[docs] def testSliceDimensionSlice(self): tncf = self.testncf o3 = tncf.variables['O3'][:] sncf = tncf.sliceDimensions(ROW=slice(0, 3)) self.assertEqual(len(sncf.dimensions['ROW']), 3) self.assertEqual( True, (sncf.variables['O3'][:] == o3[:, :, 0:3, :]).all())
[docs] def testSliceDimensionMultiSlice(self): tncf = self.testncf o3 = tncf.variables['O3'][:] sncf = tncf.sliceDimensions( LAY=slice(0, 3), ROW=slice(0, 3), COL=slice(0, 3)) self.assertEqual(len(sncf.dimensions['LAY']), 3) self.assertEqual(len(sncf.dimensions['ROW']), 3) self.assertEqual(len(sncf.dimensions['COL']), 3) self.assertEqual( True, (sncf.variables['O3'][:] == o3[:, 0:3, 0:3, 0:3]).all())
[docs] def testApplyAlongDimensionsNamed(self): tncf = self.testncf o3 = tncf.variables['O3'][:] ancf = tncf.applyAlongDimensions(LAY='min') self.assertEqual( True, (ancf.variables['O3'][:] == o3.min(1, keepdims=True)).all())
[docs] def testApplyAlongDimensionsMultiNamed(self): tncf = self.testncf o3 = tncf.variables['O3'][:] ancf = tncf.applyAlongDimensions(LAY='min', ROW='max') self.assertEqual(True, (ancf.variables['O3'][:] == o3.min( 1, keepdims=True).max(2, keepdims=True)).all())
[docs] def testApplyAlongDimensionsConvolve(self): tncf = self.testncf o3 = tncf.variables['O3'][:] ancf = tncf.applyAlongDimensions(TSTEP=lambda x: np.convolve( x, np.ones(2, dtype='f') / 2., mode='valid')) co3 = (o3[1:] + o3[:-1]) / 2 self.assertEqual(True, (ancf.variables['O3'][:] == co3).all())
[docs] def testApplyAlongDimensionsConvolveWithMax(self): tncf = self.testncf o3 = tncf.variables['O3'][:] co3 = (o3[1:] + o3[:-1]) / 2 # Testing convolution; useful for mda8 def convf(x): return np.convolve(x, np.ones(2, dtype='f') / 2., mode='valid') ancf = tncf.applyAlongDimensions(TSTEP=convf)\ .applyAlongDimensions(TSTEP=np.max) mco3 = co3.max(0, keepdims=True) self.assertEqual(True, (ancf.variables['O3'][:] == mco3).all())
[docs] def testAdd(self): tncf = self.testncf o3 = tncf.variables['O3'][:].copy() tmpf = tncf + tncf np_all_close(tmpf.variables['O3'][:], 2 * o3)
[docs] def testSub(self): tncf = self.testncf o3 = tncf.variables['O3'][:].copy() tmpf = tncf - tncf np_all_close(tmpf.variables['O3'][:], o3 - o3)
[docs] def testDiv(self): tncf = self.testncf o3 = tncf.variables['O3'][:].copy() np.seterr(invalid='ignore') tmpf = tncf / tncf np_all_close(tmpf.variables['O3'][:], o3 / o3) np.seterr(invalid='warn')
[docs] def testMul(self): tncf = self.testncf o3 = tncf.variables['O3'][:].copy() tmpf = tncf * tncf np_all_close(tmpf.variables['O3'][:], o3**2)
[docs] def testPow(self): tncf = self.testncf o3 = tncf.variables['O3'][:].copy() np.seterr(over='ignore', invalid='ignore') tmpf = tncf ** tncf np_all_close(tmpf.variables['O3'][:], o3**o3) np.seterr(over='warn', invalid='warn')
[docs] def testMod(self): tncf = self.testncf o3 = tncf.variables['O3'][:].copy() np.seterr(divide='ignore') tmpf = tncf % tncf np_all_close(tmpf.variables['O3'][:], o3 % o3) np.seterr(divide='warn')
[docs] def testXAdd(self): tncf = self.testncf o3 = tncf.variables['O3'][:].copy() tmpf = tncf.copy() print(tmpf._operator_exclude_vars) tmpf.setCoords(['O3']) print(tmpf._operator_exclude_vars) tmpf = tmpf + tncf np_all_close(tmpf.variables['O3'][:], o3)
[docs] @requires_basemap def testGetMap(self): tncf = self.testncf tncf.getMap(maptype='basemap_auto')
[docs] @requires_matplotlib def testPlot(self): tncf = self.testncf _ = tncf.plot('O3')
[docs] @requires_pyproj def testGetproj(self): tncf = self.testncf tncf.getproj(withgrid=False, projformat='pyproj')
[docs] @requires_pyproj def testLl2xy(self): tncf = self.testncf crs = tncf.variables['lambert_conformal_conic'] y0 = crs.false_northing x0 = crs.false_easting lon0, lat0 = tncf.xy2ll(x0, y0) x0t, y0t = tncf.ll2xy(lon0, lat0) self.assertEqual((x0, y0), (x0t, y0t))
[docs] @requires_pyproj def testLl2ij(self): tncf = self.testncf y0 = 0 x0 = 0 lon0, lat0 = tncf.xy2ll(x0, y0) i0, j0 = tncf.ll2ij(lon0, lat0) self.assertEqual(0, i0) self.assertEqual(0, j0)
[docs] def testTime2t(self): from datetime import datetime time = np.array([ datetime.strptime('1970-01-{}+0000'.format(t), '%Y-%m-%d %H:%M%z') for t in [ '01 00:20', '01 01:20', '01 01:40', '01 06:00', '02 01:00' ] ]) tncf = self.testncf t = tncf.time2t(time[:]) np_all_close(t, [0, 1, 2, 6, 23]) t = tncf.time2t(time[:-1], ttype='bounds') np_all_close(t, [0, 1, 1, 6]) t = tncf.time2t(time[:], ttype='bounds_close') self.assertEqual(True, np.allclose(t, [0, 1, 1, 6, 23])) t = tncf.time2t(time[:], ttype='bounds') np_all_close(t, np.ma.masked_invalid([0, 1, 1, 6, 23])) self.assertEqual(t.mask[-1], True)
[docs] def testTime2idx(self): from datetime import datetime time = np.array([ datetime.strptime('1970-01-{}+0000'.format(t), '%Y-%m-%d %H:%M%z') for t in [ '01 00:20', '01 01:20', '01 01:40', '01 06:00', '02 01:00' ] ]) tncf = self.testncf t = tncf.time2idx(time[:]) np_all_close(t, [0, 1, 2, 6, 23]) t = tncf.time2idx( time, method='bounds', clean='none', left=-1, right=999, ) np_all_close(t, [0, 1, 1, 6, 999]) t = tncf.time2idx(time[:], method='bounds') self.assertEqual(True, np.allclose(t, [0, 1, 1, 6, 23])) t = tncf.time2idx(time[:], method='bounds', clean='mask', right=np.nan) np_all_close(t, np.ma.masked_invalid([0, 1, 1, 6, np.nan])) self.assertEqual(t.mask[-1], True)
[docs] @requires_pyproj def testXy2ll(self): tncf = self.testncf crs = tncf.variables['lambert_conformal_conic'] y0 = crs.false_northing x0 = crs.false_easting lonmid, latmid = tncf.xy2ll(x0, y0) self.assertEqual(True, np.allclose( lonmid, crs.longitude_of_central_meridian)) self.assertEqual(True, np.allclose( latmid, crs.latitude_of_projection_origin)) lon0, lat0 = tncf.xy2ll(0, 0) self.assertEqual(True, np.allclose( lon0, tncf.variables['longitude_bounds'][0, 0, 0])) self.assertEqual(True, np.allclose( lat0, tncf.variables['latitude_bounds'][0, 0, 0]))
[docs] @requires_pyproj def testIj2ll(self): tncf = self.testncf lon0, lat0 = tncf.ij2ll(0, 0) self.assertEqual(True, np.allclose( lon0, tncf.variables['longitude'][0, 0])) self.assertEqual(True, np.allclose( lat0, tncf.variables['latitude'][0, 0]))
[docs] def testEval(self): tncf = self.testncf.copy() tncf.eval('O3_PPB = O3 * 1000.', inplace=True, copyall=False) o3ppmv = tncf.variables['O3'] o3ppbv = tncf.variables['O3_PPB'] self.assertEqual(True, ((o3ppmv * 1000.) == o3ppbv).all())
[docs] def testSetncatts(self): tncf = self.testncf.copy() tncf.setncatts({'test_new1': 1, 'test_new2': 'five'}) self.assertEqual(tncf.test_new1, 1) self.assertEqual(tncf.test_new2, 'five')
[docs] def testGetncatts(self): tncf = self.testncf t = tncf.getncatts() self.assertEqual(t['str_one'], '1') self.assertEqual(t['int_two'], 2) self.assertEqual(t['float_threeptfive'], 3.5)
[docs] def testCopy(self): tncf = self.testncf nncf = tncf.copy(props=True, dimensions=True, variables=True, data=True) for pk in tncf.ncattrs(): self.assertEqual(getattr(tncf, pk), getattr(nncf, pk, None)) for dk, dv in tncf.dimensions.items(): dvl = len(dv) ndvl = len(nncf.dimensions[dk]) self.assertEqual(dvl, ndvl) for vk, vv in tncf.variables.items(): nvv = nncf.variables[vk] self.assertEqual(True, np.allclose(vv[...], nvv[...])) self.assertEqual(vv.dimensions, nvv.dimensions) self.assertEqual(vv.dtype.char, nvv.dtype.char) for pk in vv.ncattrs(): pv = getattr(vv, pk) npv = getattr(nvv, pk, None) testv = pv == npv self.assertEqual(True, np.any(testv))
[docs] def testGetTimes(self): tncf = self.testncf t = tncf.getTimes() self.assertEqual(True, (t == self.mytimes).all())
[docs] def testStack(self): tncf = self.testncf sncf = tncf.stack(tncf, 'TSTEP') to3 = tncf.variables['O3'][:] no3 = sncf.variables['O3'][:] origlen = len(tncf.dimensions['TSTEP']) self.assertEqual(origlen * 2, len(sncf.dimensions['TSTEP'])) self.assertEqual(True, np.allclose(to3, no3[:origlen])) self.assertEqual(True, np.allclose(to3, no3[origlen:]))
[docs] def testRemoveSingleton(self): tncf = self.testncf nncf = tncf.copy() nncf.createDimension('test', 1) nncf = nncf.removeSingleton(dimkey='test') self.assertEqual(set(tncf.dimensions), set(nncf.dimensions)) nncf.createDimension('test', 1) nncf = nncf.removeSingleton(dimkey=None) self.assertEqual(set(tncf.dimensions), set(nncf.dimensions))
[docs] def testCreateDimension(self): tncf = self.testncf.copy() ndims = len(tncf.dimensions) olddims = set(tncf.dimensions) newdims = olddims.union(['newd']) tncf.createDimension('newd', 5) self.assertEqual(len(tncf.dimensions), ndims + 1) self.assertEqual(set(tncf.dimensions), newdims)
[docs] def testCreateVariable(self): tncf = self.testncf.copy() nvars = len(tncf.variables) var = tncf.createVariable('test', 'f', ('TSTEP',)) self.assertEqual(len(tncf.variables), nvars + 1) self.assertEqual(var.dimensions, ('TSTEP',)) self.assertEqual(var.dtype.char, 'f')
[docs] def testSave(self): import tempfile import os tncf = self.testncf.copy() to3 = tncf.variables['O3'][:] with tempfile.TemporaryDirectory() as tmpdirname: tmppath = os.path.join(tmpdirname, 'test.nc') tncf.save(tmppath).close() cncf = pncopen(tmppath) np_all_close(to3, cncf.variables['O3'][:]) sncf = cncf.subset(['O3']) np_all_close(to3, sncf.variables['O3'][:]) os.remove(tmppath)
[docs] def testNcattrs(self): tncf = self.testncf tncf.ncattrs()
[docs] def testSetncattr(self): tncf = self.testncf tncf.setncattr('test', 1) self.assertEqual(tncf.test, 1)
[docs] def testDelncattr(self): tncf = self.testncf.copy() tncf.setncattr('test', 1) tncf.delncattr('test') self.assertEqual(False, hasattr(tncf, 'test'))
[docs] def testInsertDimension(self): tncf = self.testncf.copy() nncf = tncf.insertDimension(TEST=2, before='LAY') checkv = (nncf.variables['O3'][:, 0] == nncf.variables['O3'][:, 1]) self.assertEqual(True, checkv.all()) nncf = tncf.insertDimension(TEST=2) checkv = (nncf.variables['O3'][0, :] == nncf.variables['O3'][1, :]) self.assertEqual(True, checkv.all())
[docs] def testInterpDimension(self): f1 = PseudoNetCDFFile() f1.createDimension('time', 2) f1.createDimension('layer', 3) f1.createDimension('latitude', 4) f1.createDimension('longitude', 5) lay = f1.createVariable('layer', 'f', ('layer',)) lay[:] = np.arange(0, 3) simple = f1.createVariable( 'simple', 'f', ('time', 'layer', 'latitude', 'longitude')) simple[0] = np.arange(3 * 4 * 5).reshape(3, 4, 5) simple[1] = np.arange(3 * 4 * 5).reshape(3, 4, 5) f2 = f1.applyAlongDimensions(layer=lambda x: (x[1:] + x[:-1]) * .5) f3 = f1.interpDimension('layer', f2.variables['layer']) self.assertEqual(True, np.allclose( f2.variables['simple'][:], f3.variables['simple'][:])) f4 = PseudoNetCDFFile() f4.createDimension('time', 2) f4.createDimension('layer', 3) f4.createDimension('latitude', 4) f4.createDimension('longitude', 5) lay = f4.createVariable( 'layer', 'f', ('time', 'layer', 'latitude', 'longitude')) lay[:] = np.arange(0, 3)[None, :, None, None] simple = f4.createVariable( 'simple', 'f', ('time', 'layer', 'latitude', 'longitude')) simple[0] = np.arange(3 * 4 * 5).reshape(3, 4, 5) simple[1] = np.arange(3 * 4 * 5).reshape(3, 4, 5) f5 = f4.applyAlongDimensions(layer=lambda x: (x[1:] + x[:-1]) * .5) lay[1] += .25 f6 = f4.interpDimension('layer', f5.variables['layer']) self.assertEqual(True, np.allclose( f5.variables['simple'][0], f6.variables['simple'][0])) self.assertEqual(False, np.allclose( f5.variables['simple'][1], f6.variables['simple'][1]))
[docs] def testNetCDFFileNew(self): t = PseudoNetCDFFile.__new__(PseudoNetCDFFile) self.assertEqual(t.variables, {}) self.assertEqual(t.dimensions, {}) self.assertEqual(t.ncattrs(), ())
[docs] def testNetCDFFileInit(self): from numpy import arange self._makencf() tncf = self.testncf self.assertEqual(len(tncf.dimensions['TSTEP']), 24) self.assertEqual(len(tncf.dimensions['LAY']), 4) self.assertEqual(len(tncf.dimensions['ROW']), 5) self.assertEqual(len(tncf.dimensions['COL']), 6) self.assertEqual(len(tncf.dimensions['nv']), 4) tncf.fish = 2 setattr(tncf, 'FROG-DOG', 'HAPPY') self.assertEqual(set(tncf.variables.keys()), self.myvars) o3 = tncf.variables['O3'] self.assertEqual( True, (o3 == arange(24 * 4 * 5 * 6).reshape(24, 4, 5, 6)).all()) self.assertEqual(o3.typecode(), 'f') filedims = list(tncf.dimensions) filedims.sort() vardims = list(o3.dimensions) vardims.sort() filedims.remove('nv') filedims.remove('tnv') self.assertEqual(filedims, vardims) from PseudoNetCDF.pncgen import Pseudo2NetCDF n = Pseudo2NetCDF().convert(tncf) self.assertEqual(set(n.variables.keys()), self.myvars) self.assertEqual( dict([(k, len(v)) for k, v in n.dimensions.items()]), dict([(k, len(v)) for k, v in self.testncf.dimensions.items()]) ) self.assertEqual( True, (n.variables['O3'][...] == tncf.variables['O3'][...]).all()) self.assertEqual(n.variables['O3'].units, 'ppbv') self.assertEqual(n.fish, 2) self.assertEqual(getattr(n, 'FROG-DOG'), 'HAPPY')
[docs] def testNetCDFVariables(self): from numpy import arange tncf = PseudoNetCDFFile() tncf.createDimension('TSTEP', 24) tncf.createDimension('LAY', 4) tncf.createDimension('ROW', 5) tncf.createDimension('COL', 6) def const(*args, **kwds): vals = arange(24 * 4 * 5 * 6).reshape((24, 4, 5, 6)) dims = ('TSTEP', 'LAY', 'ROW', 'COL') return PseudoNetCDFVariable(tncf, args[0], 'f', dims, values=vals) tncf.variables = PseudoNetCDFVariables(const, ['NO', 'O3']) self.assertEqual(True, (tncf.variables['O3'] == arange( 24 * 4 * 5 * 6).reshape(24, 4, 5, 6)).all())
[docs] def testFromArrays(self): var = self.testncf.variables['O3'] arr = var.array().copy() checkncf = PseudoNetCDFFile.from_arrays( dims=('TSTEP', 'LAY', 'ROW', 'COL'), fileattrs={'a': 1}, **{'O3': arr}, nameattr='long_name' ) checkvar = checkncf.variables['O3'] np_all_close( checkvar[:], var[:] ) assert (checkvar.getncattr('long_name') == 'O3') assert (checkncf.a == 1)
[docs] def testMask(self): o3 = self.testncf.variables['O3'][:] # median is the average of 1439 and 1440 # 1439.5, so masks half. Using 1440 median = 1440 chkf = self.testncf.mask(less=median) assert (chkf.variables['O3'].mask.sum() == 1440) chkf = self.testncf.mask(less_equal=median) assert (chkf.variables['O3'].mask.sum() == 1441) chkf = self.testncf.mask(where=o3 < median) assert (chkf.variables['O3'].mask.sum() == 1440) chkf = self.testncf.mask(greater=median) assert (chkf.variables['O3'].mask.sum() == 1439) chkf = self.testncf.mask(greater_equal=median) assert (chkf.variables['O3'].mask.sum() == 1440)
[docs] def testIsMine(self): import tempfile import os from .. import PseudoNetCDFFile from ..core import netcdf tncf = self.testncf.copy() with tempfile.TemporaryDirectory() as tmpdirname: tmppath = os.path.join(tmpdirname, 'test.nc') tncf.save(tmppath).close() assert ~PseudoNetCDFFile.isMine(tmppath) assert netcdf.isMine(tmppath) os.remove(tmppath) with open(tmppath, 'w') as dummy: dummy.write('hi') assert ~netcdf.isMine(tmppath) assert ~PseudoNetCDFFile.isMine(tmppath) os.remove(tmppath)
[docs] def testFromNCF(self): from .. import PseudoNetCDFFile from ..core import netcdf chkncf = PseudoNetCDFFile.from_ncf(self.testncf) compare_files(chkncf, self.testncf) chkncf = netcdf.from_ncf(self.testncf) compare_files(chkncf, self.testncf)
[docs] def testNetcdf(self): import tempfile import os from ..core import netcdf tncf = self.testncf.copy() with tempfile.TemporaryDirectory() as tmpdirname: tmppath = os.path.join(tmpdirname, 'test.nc') cncf = netcdf(tmppath, mode='w') cncf.setncatts(tncf.getncatts()) for dk, d in tncf.dimensions.items(): cncf.copyDimension(d, key=dk) for vk, v in tncf.variables.items(): cncf.copyVariable(v, key=vk) compare_files(tncf, cncf) os.remove(tmppath)
[docs] def testOpenMFDataset(self): import tempfile import os from ..core import netcdf tncf = self.testncf.copy() to3 = tncf.variables['O3'][:] with tempfile.TemporaryDirectory() as tmpdirname: tmppath1 = os.path.join(tmpdirname, 'test1.nc') tmppath2 = os.path.join(tmpdirname, 'test2.nc') tncf.save(tmppath1).close() tncf.save(tmppath2).close() nt = len(tncf.dimensions['TSTEP']) cncf = netcdf.open_mfdataset(tmppath1, tmppath2, stackdim='TSTEP') np_all_close(cncf.variables['O3'][:nt], to3) np_all_close(cncf.variables['O3'][nt:], to3) os.remove(tmppath1) os.remove(tmppath2)
[docs] def runTest(self): pass