from __future__ import print_function
from PseudoNetCDF.sci_var import PseudoNetCDFFile
import numpy as np
from datetime import datetime, timedelta
def getbdate(x):
if x is None:
return x
elif isinstance(x, (datetime, np.datetime64)):
return x
else:
return datetime.strptime(x + ' 00:00', '%Y-%m-%d %H:%M')
def getedate(x):
if x is None:
return x
elif isinstance(x, (datetime, np.datetime64)):
return x
else:
return datetime.strptime(x + ' 23:59', '%Y-%m-%d %H:%M')
[docs]
class aqsraw(PseudoNetCDFFile):
def __init__(self, yearpath, timeresolution='hourly', bdate=None,
edate=None, rdate=datetime(1900, 1, 1), wktpolygon=None,
sampleval=None, verbose=0):
"""
yearpath - path to csv file from AQS
timeresolution - choices = ['daily', 'hourly'] default = 'hourly',
defaults to hourly'
bdate - Limit output so that the date starts at YYYY-MM-DD
edate - Limit output so that the date end YYYY-MM-DD (inclusive)
rdate - datetime(1900, 1, 1), dest = 'rdate', type = getrdate,
help = 'Reference date YYYYMMDD HH:MM:SS')
wktpolygon - WKT Polygon (default: None) equivalent to
"POLYGON ((-180 -90, 180 -90, 180 90, -180 90, -180 -90))"
sampleval - Defaults to "Sample Measurement" for hourly and "Arithmetic
Mean" for daily
verbose - level of verbosity
"""
try:
import pandas
except Exception:
raise ImportError('aqsraw requires pandas; install pandas ' +
'(e.g., pip install pandas)')
if wktpolygon is not None:
from shapely.wkt import loads
from shapely.prepared import prep
bounds = loads(wktpolygon)
pbounds = prep(bounds)
else:
from shapely.geometry import Point
bdate = getbdate(bdate)
edate = getedate(edate)
nseconds = {'hourly': 3600, 'daily': 3600 * 24}[timeresolution]
tunit = {'hourly': 'hours', 'daily': 'days'}[timeresolution]
if sampleval is None:
if timeresolution == 'hourly':
sampleval = "Sample Measurement"
elif timeresolution == 'daily':
sampleval = "Arithmetic Mean"
else:
raise KeyError(sampleval + ' not appropriate sampleval value')
hourlys = []
for yearpath in [yearpath]:
if verbose > 0:
print('Reading', yearpath)
if timeresolution == 'hourly':
parse_dates = [[11, 12]]
date_key = 'Date GMT_Time GMT'
else:
parse_dates = [11]
date_key = 'Date Local'
data = pandas.read_csv(yearpath,
index_col=False,
converters={'State Code': str,
'County Code': str,
'Site Num': str},
parse_dates=parse_dates)
intimes = np.array([True]).repeat(len(data), 0)
# intimes = intimes & (data['Parameter Code'] == param)
if bdate is not None:
intimes = intimes & (data[date_key] >= bdate)
if edate is not None:
intimes = intimes & (data[date_key] < edate)
if wktpolygon is None:
inspace_and_time = intimes
else:
inspace_and_time = np.array([False]).repeat(len(data), 0)
zll = zip(intimes,
data['Longitude'].values,
data['Latitude'].values)
for idx, (intime, plon, plat) in enumerate(zll):
if intime:
inspace_and_time[idx] = pbounds.contains(
Point(plon, plat))
mask = inspace_and_time
groupkeys = ['Parameter Code', 'Parameter Name',
'Units of Measure', date_key,
'State Code', 'County Code',
'Site Num']
hourly = data[mask].groupby(groupkeys, as_index=False)\
.aggregate(np.mean)\
.sort_values(by=groupkeys)
hourlys.append(hourly)
if verbose > 0:
print('Concatenating files')
if len(hourlys) > 1:
hourly = pandas.concat(hourlys)
else:
hourly = hourlys[0]
sites = hourly.groupby(['State Code', 'County Code', 'Site Num'],
as_index=False).aggregate(np.mean)
nsites = len(sites)
rawdates = hourly[date_key]
if bdate is None:
bdate = rawdates.min()
if edate is None:
edate = rawdates.max()
ntimes = int((edate - bdate).total_seconds() // nseconds) + 1
alltimes = [timedelta(**{tunit: i}) + bdate for i in range(ntimes)]
if verbose > 0:
print('Creating output file')
tdim = self.createDimension('time', ntimes)
tdim.setunlimited(True)
self.createDimension('LAY', 1)
sitelist = [row['State Code'] + row['County Code'] +
row['Site Num'] for idx, row in sites.iterrows()]
self.SITENAMES = ';'.join(sitelist)
self.createDimension('points', len(sitelist))
lonlatcoordstrs = ['%f,%f' % (row['Longitude'], row['Latitude'])
for idx, row in sites.iterrows()]
self.lonlatcoords = '/'.join(lonlatcoordstrs)
last_time = hourly[date_key].values.min()
# end_time = hourly[date_key].values.max()
temp = {}
lat = self.createVariable('latitude', 'f', ('points',))
lat.units = 'degrees_north'
lat.standard_name = 'latitude'
lat[:] = sites['Latitude'].values
lon = self.createVariable('longitude', 'f', ('points',))
lon.units = 'degrees_east'
lon.standard_name = 'longitude'
lon[:] = sites['Longitude'].values
if verbose > 0:
print('Processing data rows')
for idx, row in hourly.iterrows():
this_time = row[date_key]
val = row[sampleval]
unit = row['Units of Measure'].strip()
aqsid = row['State Code'] + row['County Code'] + row['Site Num']
sidx = sitelist.index(aqsid)
var_name = row['Parameter Name']
if var_name not in self.variables.keys():
var = self.createVariable(
var_name, 'f', ('time', 'LAY', 'points'), fill_value=-999)
var.units = unit
var.standard_name = var_name
temp[var_name] = np.ma.masked_all(
(ntimes, 1, nsites), dtype='f')
temp[var_name].set_fill_value(-999)
tmpvar = temp[var_name]
var = self.variables[var_name]
assert (var.units == unit)
if last_time != this_time:
last_time = this_time
tidx = alltimes.index(this_time.to_pydatetime())
if verbose > 0:
print('\b\b\b\b\b\b %3d%%' %
int(tidx / float(ntimes) * 100), end='', flush=True)
if tidx > ntimes:
raise ValueError(
'Times (%d) exceed expected (%d)' % (tidx, ntimes))
tmpvar[tidx, 0, sidx] = val
if verbose > 0:
print('')
if verbose > 0:
print('Writing to file')
for varkey, tempvals in temp.items():
self.variables[varkey][:] = tempvals
outtimes = np.array(
[(t - rdate).total_seconds() / nseconds for t in alltimes])
time = self.createVariable('time', outtimes.dtype.char, ('time',))
time.units = rdate.strftime(tunit + ' since %F')
time.standard_name = 'time'
time[:] = outtimes
if __name__ == '__main__':
import sys
f = aqsraw(sys.argv[1], bdate=datetime(
2015, 1, 1), edate=datetime(2015, 2, 1))