from __future__ import print_function
__all__ = ['cdtoms', 'tomsl3']
from PseudoNetCDF import PseudoNetCDFFile
from PseudoNetCDF.pncwarn import warn
from re import compile
import numpy as np
from datetime import datetime
import unittest
dayre = compile(' Day:\s+(?P<jday>\d+) (?P<daystring>.{12})\s+EP/' +
'TOMS CORRECTED OZONE GEN:\d+\.\d+\sV\d ALECT:\s+\d+' +
':\d+ [AP]M ')
edgere = compile(
'.+(?P<bins>\d+)\s+bins.*' +
'\s+[-+]?(?P<start>\d+\.\d+)\s*(?P<startdir>[WENS]).*' +
'\s+[-+]?(?P<end>\d+\.\d+)\s*(?P<enddir>[WENS])\s+.*' +
'\((?P<step>\d+.\d+)\s*degree steps.+')
def _groupdict(reo, line):
result = reo.match(line)
if result is None:
return {}
else:
return result.groupdict()
[docs]
def cdtoms(path, outfile=None):
if outfile is None:
outfile = PseudoNetCDFFile()
if hasattr(path, 'readlines'):
inlines = path.readlines()
else:
inlines = open(path, 'r').readlines()
dayline = inlines[0]
daygrp = _groupdict(dayre, dayline)
sdate = 0
if 'daystring' in daygrp:
date = datetime.strptime(daygrp['daystring'], '%b %d, %Y')
rdate = datetime(1970, 1, 1)
sdate = (date - rdate).total_seconds()
else:
import pandas as pd
dayparts = dayline.split(' ')
for i in [3, 2, 1]:
daystr = ' '.join(dayparts[:i])
try:
date = pd.to_datetime(daystr).to_numpy()
break
except Exception as e:
warn(e)
else:
date = np.datetime64('1970-01-01')
rdate = np.datetime64('1970-01-01')
sdate = (date - rdate).astype('d') / 1e9
longrp = _groupdict(edgere, inlines[1])
latgrp = _groupdict(edgere, inlines[2])
for propdict in [daygrp, longrp, latgrp]:
for k, v in propdict.items():
try:
v = eval(v)
except Exception:
pass
setattr(outfile, k, v)
outfile.HISTORY = ''.join(inlines[:3])
blat = latgrp.get('start', '59.5')
bsn = latgrp.get('startdir', 'S')
elat = latgrp.get('end', '59.5')
esn = latgrp.get('enddir', 'N')
latstep = float(latgrp.get('step', '1'))
blat = {'N': 1, 'S': -1}[bsn] * float(blat)
elat = {'N': 1, 'S': -1}[esn] * float(elat)
blon = longrp.get('start', '179.375')
bwe = longrp.get('startdir', 'W')
elon = longrp.get('end', '179.375')
ewe = longrp.get('enddir', 'E')
lonstep = float(longrp.get('step', '1.25'))
blon = {'E': 1, 'W': -1}[bwe] * float(blon)
elon = {'E': 1, 'W': -1}[ewe] * float(elon)
datalines = inlines[3:]
lats = []
for i, line in enumerate(datalines):
if 'lat' not in line:
datalines[i] = line[1:-1].rstrip()
else:
data, lat = line.split('lat =')
datalines[i] = data[1:-1].rstrip()
lats.append(lat.strip())
nlats = len(lats)
datablock = ''.join(datalines).replace(' ', '0').replace('***', '999')
nlons = len(datablock) // 3 // nlats
outfile.createDimension('time', 1)
outfile.createDimension('latitude', nlats)
outfile.createDimension('longitude', nlons)
outfile.createDimension('nv', 2)
var = outfile.createVariable('time', 'f', ('time',))
var.units = 'seconds since 1970-01-01 00:00:00+0000'
var[:] = sdate
var = outfile.createVariable('latitude', 'f', ('latitude',))
var.units = 'degrees N'
var[:] = np.arange(blat, elat + latstep, latstep)
lat = var
linelat = np.array(lats, dtype='f')
if not (lat[:] == linelat).all():
warn('Header metadata does not match lats')
lat[:] = linelat
var = outfile.createVariable('latitude_bounds', 'f', ('latitude', 'nv'))
var.units = 'degrees N'
var[:, 0] = lat - latstep / 2.
var[:, 1] = lat + latstep / 2.
var = outfile.createVariable('longitude', 'f', ('longitude',))
var.units = 'degrees E'
lon = var[:] = np.arange(blon, elon + lonstep, lonstep)
var = outfile.createVariable('longitude_bounds', 'f', ('longitude', 'nv'))
var.units = 'degrees E'
var[:, 0] = lon - lonstep / 2.
var[:, 1] = lon + lonstep / 2.
var = outfile.createVariable(
'ozone', 'f', ('time', 'latitude', 'longitude'),
missing_value=999
)
var.units = 'matm-cm'
var.long_name = var.var_desc = 'ozone'.ljust(16)
var[0] = np.ma.masked_values(
np.array(
[i for i in datablock],
dtype='S1'
).view('S3').astype('i'),
var.missing_value
).reshape(nlats, nlons)
return outfile
[docs]
class tomsl3(PseudoNetCDFFile):
def __init__(self, path):
cdtoms(path, self)
[docs]
class test_tomsl3(unittest.TestCase):
[docs]
def setUp(self):
from PseudoNetCDF.testcase import toms_paths
self.tomspath = toms_paths['tomsl3']
[docs]
def testTOMS2NCF(self):
import os
vfile = tomsl3(self.tomspath)
outpath = self.tomspath + '.nc'
ncfile = vfile.save(outpath)
for k, ncv in ncfile.variables.items():
vpv = vfile.variables[k]
np.testing.assert_allclose(ncv[...], vpv[...])
os.remove(outpath)
if __name__ == '__main__':
from PseudoNetCDF.pncdump import pncdump
pfile = cdtoms('test.txt')
pncdump(pfile)
print(pfile.variables['ozone'][2, 3])