__all__ = [
'RsigApi', 'RsigGui', 'open_ioapi', 'open_mfioapi', 'cmaq', 'emiss',
'grids'
]
__version__ = '0.11.0'
from . import cmaq
from .cmaq import open_ioapi, open_mfioapi
import pandas as pd
from .utils import customize_grid, def_grid_kw as _def_grid_kw
from .utils import coverages_from_xml, legacy_get
from . import grids
from . import emiss
from .data import loadrc as _loadrc
rcParams = _loadrc()
# This is the list of prefixes for WCS that support CORNERS=1 and will return
# Longitude_SW(deg), Longitude_SE(deg), Longitude_NE(deg), Longitude_NW(deg),
# Latitude_SW(deg), Latitude_SE(deg), Latitude_NE(deg), Latitude_NW(deg),
_corner_prefixes = (
'gasp', 'goes', 'modis', 'omibehr', 'tempo', 'tropomi', 'viirs', 'omi'
)
# Projected raster datasets do not need lon/lat when retrieved as ioapi or
# coards(?). When returning a CSV, they are the only coordinate information.
_nolonlats_prefixes = ('cmaq', 'regridded')
# Pre-ioapi datasets do not support regridded
_noregrid_prefixes = ('cmaq', 'regridded', 'hrrr', 'tempo.l3')
# Requesting xdr format for polygons supports one of two formats:
# xdr : contains binary chunks described by a 10-line ascii header
# bin : contains binary chunks described by a 1-line ascii header
_shpxdrprefixes = ['hms.']
_shpbinprefixes = [
'landuse.atlantic.population_iclus',
'landuse.gulf.population_iclus',
'landuse.pacific.population_iclus',
]
def _actionf(msg, action, ErrorTyp=None):
"""
Convenience function for warning or raising an error.
Arguments
---------
msg : str
Message to raise or warn.
action : str
If 'error', raise ErrorTyp(msg)
If 'warn', warnings.warn using msg
Else do nothing.
ErrorTyp : Exception
Defaults to ErrorTyp
Returns
-------
None
"""
import warnings
if ErrorTyp is None:
ErrorTyp = ValueError
if action == 'error':
raise ErrorTyp(msg)
elif action == 'warn':
warnings.warn(msg)
[docs]class RsigApi:
def __init__(
self, key=None, bdate=None, edate=None, bbox=None, grid_kw=None,
tropomi_kw=None, purpleair_kw=None, viirsnoaa_kw=None, tempo_kw=None,
pandora_kw=None, calipso_kw=None, hrrr_kw=None,
server=None, compress=1, corners=1, encoding=None,
overwrite=False, workdir='.', gridfit=False
):
"""
RsigApi is a python-based interface to RSIG's web-based API
Arguments
---------
key : str
Default key for query (e.g., 'aqs.o3', 'purpleair.pm25_corrected',
or 'tropomi.offl.no2.nitrogendioxide_tropospheric_column')
bdate : str or pd.Datetime
beginning date (inclusive) defaults to yesterday at 0Z
edate : str or pd.Datetime
ending date (inclusive) defaults to bdate + 23:59:59
bbox : tuple
wlon, slat, elon, nlat in decimal degrees (-180 to 180)
grid_kw : str or dict
If str, must be 12US1, 1US1, 12US2, 1US2, 36US3, 108NHEMI2, 36NHEMI2,
global_1pt0, global_0pt1, global_0pt01, TEMPOL3_0pt02, HRRR3K
and will be used to set parameters based on EPA domains. If dict,
IOAPI mapping parameters. For details, look at the help on grids or
at the values in 12US1:
import pyrsig
help(pyrsig.grids)
print(pyrsig.grids.def_grid_kw['12US1'])
There are two special RSIG regrid keywords that control regridding:
- REGRID_AGGREGATE defines how the regridded values are aggregated
in time. Options are None (default), daily, or all.
- REGRID defines how RSIG regrids data. Options are weighted
(default) or mean. weighted uses fractional area contribution,
while mean uses simple mean of pixels whose centroid is in a cell
viirsnoaa_kw : dict
Dictionary of VIIRS NOAA filter parameters default
{'minimum_quality': 'high'} other options 'medium' or 'low'
tropomi_kw : dict
Dictionary of TropOMI filter parameters default
{'minimum_quality': 75, 'maximum_cloud_fraction': 1.0} options
are 0-100 and 0-1.
purpleair_kw : dict
Dictionary of purpleair filter parameters and api_key.
'out_in_flag': 0, # options 0, 2, ''
'freq': 'hourly', # options hourly, daily, monthly, yearly, none
'maximum_difference': 5, # integer
'maximum_ratio': 0.70, # float
'agg_pct': 75, # 0-100
'default_humidity': 50,
'api_key': 'your_key_here'
tempo_kw : dict
Dictionary of TEMPO filter parameters default
'api_key': 'your_key_here' # 'password'
'minimum_quality': 'normal'
'maximum_cloud_fraction': 1.0
'maximum_solar_zenith_angle': 70.
pandora_kw : dict
Dictionary of Pandora filter parameters default
{'minimum_quality': 'high'} other options 'medium' or 'low'
calipso_kw : dict
Dictionary of Calipso filter parameters default
{'MINIMUM_CAD': 20, 'MAXIMUM_UNCERTAINTY': 99}
server : str
'ofmpub.epa.gov' for external users
'maple.hesc.epa.gov' for on EPA VPN users
None to test maple (preferred) and ofmpub
compress : int
1 to transfer files with gzip compression
0 to transfer uncompressed files (slow)
encoding : dict
IF encoding is provided, netCDF files will be stored as NetCDF4
with encoding for all variables. If _FillValue is provided, it will
not be applied to TFLAG and COUNT.
overwrite : bool
If True, overwrite downloaded files in workdir.
If False, reuse downloaded files in workdir.
workdir : str
Working directory (must exist) defaults to '.'
gridfit : bool
Default (False) keep grid as supplied.
If True, redefine grid to remove cells outside the bbox.
Properties
----------
grid_kw : dict
Dictionary of regridding IOAPI properties. Defaults to 12US1
viirsnoaa_kw : dict
Dictionary of filter properties
tropomi_kw : dict
Dictionary of filter properties
tempo_kw : dict
Dictionary of filter properties
purpleair_kw : dict
Dictionary of filter properties and api_key. Unlike other options,
purpleair_kw will not work with the defaults. The user *must* update
teh api_key property to their own key. Contact PurpleAir for more
details.
"""
if server is None:
from .utils import get_server
server = get_server()
self._description = {}
self._capabilities = None
self._coveragesdf = None
self._capabilitiesdf = None
self.server = server
self.key = key
self.compress = compress
self.workdir = workdir
self.encoding = encoding
self.overwrite = overwrite
if bbox is None:
self.bbox = (-126, 24, -66, 50)
else:
self.bbox = bbox
if bdate is None:
bdate = (
pd.to_datetime('now', utc=True) - pd.to_timedelta('1day')
).replace(hour=0, minute=0, second=0, microsecond=0, nanosecond=0)
self.bdate = pd.to_datetime(bdate)
if edate is None:
self.edate = edate
else:
self.edate = pd.to_datetime(edate)
self.corners = corners
if grid_kw is None:
grid_kw = '12US1'
if isinstance(grid_kw, str):
if grid_kw not in grids.def_grid_kw:
raise KeyError('unknown grid, you must specify properites')
grid_kw = grids.def_grid_kw[grid_kw].copy()
# if the user has not provided a regular parameter, add it for them
for k, v in grids.shared_grid_kw.items():
grid_kw.setdefault(k, v)
if gridfit:
grid_kw = customize_grid(grid_kw, self.bbox)
self.grid_kw = grid_kw
if tropomi_kw is None:
tropomi_kw = {}
tropomi_kw.setdefault('minimum_quality', 75)
tropomi_kw.setdefault('maximum_cloud_fraction', 1.0)
self.tropomi_kw = tropomi_kw
if tempo_kw is None:
tempo_kw = {}
tempo_kw.setdefault('minimum_quality', 'normal')
tempo_kw.setdefault('maximum_cloud_fraction', 1.0)
tempo_kw.setdefault('api_key', 'your_key_here')
tempo_kw.setdefault('maximum_solar_zenith_angle', 70.)
self.tempo_kw = tempo_kw
if viirsnoaa_kw is None:
viirsnoaa_kw = {}
viirsnoaa_kw.setdefault('minimum_quality', 'high')
self.viirsnoaa_kw = viirsnoaa_kw
if pandora_kw is None:
pandora_kw = {}
pandora_kw.setdefault('minimum_quality', 'high')
self.pandora_kw = pandora_kw
if calipso_kw is None:
calipso_kw = {}
calipso_kw.setdefault('MINIMUM_CAD', 20)
calipso_kw.setdefault('MAXIMUM_UNCERTAINTY', 99)
self.calipso_kw = calipso_kw
if purpleair_kw is None:
purpleair_kw = {}
defpurp_kw = {
'out_in_flag': 0, 'freq': 'hourly',
'maximum_difference': 5, 'maximum_ratio': 0.70,
'agg_pct': 75, 'api_key': 'your_key_here',
'default_humidity': 50.000000
}
for k, v in defpurp_kw.items():
purpleair_kw.setdefault(k, v)
self.purpleair_kw = purpleair_kw
if hrrr_kw is None:
hrrr_kw = {}
hrrr_kw.setdefault('CMAQ', 1)
self.hrrr_kw = hrrr_kw
[docs] def set_grid_kw(self, grid_kw):
if isinstance(grid_kw, str):
if grid_kw not in _def_grid_kw:
raise KeyError('unknown grid, you must specify properites')
grid_kw = _def_grid_kw[grid_kw].copy()
self.grid_kw = grid_kw
[docs] def resize_grid(self, clip=True):
"""
Update grid_kw property so that it only covers the bbox by adjusting
the XORIG, YORIG, NCOLS and NROWS. If clip is True, this has the affect
of reducing the number of rows and columns. This is useful when the
area of interest is much smaller than the grid defined in grid_kw.
Arguments
---------
clip : bool
Returns
-------
None
"""
self.grid_kw = customize_grid(self.grid_kw, self.bbox, clip=clip)
[docs] def describe(self, key, as_dataframe=True, raw=False):
"""
describe returns details about the coverage specified by key. Details
include spatial bounding box, time coverage, time resolution, variable
label, and a short description.
DescribeCoverage with a COVERAGE should be faster than descriptions
because it only returns a small xml chunk. Currently, DescribeCoverage
with a COVERAGE specified is unreliable because of malformed xml. If
this fails, describe will instead request all coverages and query the
specific coverage.
Arguments
---------
as_dataframe : bool
Defaults to True and descriptions are returned as a dataframe.
If False, returns a list of elements.
raw : bool
Return raw xml instead of parsing. Useful for debugging.
Returns
-------
coverages : pandas.DataFrame or list
dataframe or list of parsed descriptions
Example
-------
df = rsigapi.describe('airnow.no2')
print(df.to_csv())
# ,name,label,description,bbox_str,beginPosition,timeResolution
# 0,no2,no2(ppb),UTC hourly mean surface measured nitrogen ...,
# ... -157 21 -51 64,2003-01-02T00:00:00Z,PT1H
"""
import warnings
if key not in self._description:
r = legacy_get(
f'https://{self.server}/rsig/rsigserver?SERVICE=wcs&VERSION='
f'1.0.0&REQUEST=DescribeCoverage&COVERAGE={key}&compress=1'
)
self._description[key] = r.text
if raw:
return self._description[key]
try:
coverages = coverages_from_xml(self._description[key])
except Exception as e:
warnings.warn(str(e) + '; using descriptions')
return self.descriptions().query(f'name == "{key}"')
if as_dataframe:
coverages = pd.DataFrame.from_records(coverages)
coverages['prefix'] = coverages['name'].apply(
lambda x: x.split('.')[0]
)
coverages = coverages.drop('tag', axis=1)
return coverages
[docs] def descriptions(self, refresh=False, verbose=0):
"""
Experimental and may change.
descriptions returns details about all coverages. Details include
spatial bounding box, time coverage, time resolution, variable label,
and a short description.
Currently, parses capabilities using xml.etree.ElementTree and returns
coverages from details available in CoverageOffering elements from
DescribeCoverage.
Currently cleaning up data xml elements that are bad and doing a
per-coverage parsing to increase fault tolerance in the xml.
Arguments
---------
refresh : bool
If True, get new copy and save to ~/.pyrsig/descriptons.xml
If False (default), reload from saved if available.
verbose : int
If verbose is greater than 0, show warnings from parsing.
Returns
-------
coverages : pandas.DataFrame or list
dataframe or list of parsed descriptions
Example
-------
rsigapi = pyrsig.RsigApi()
desc = rsigapi.descriptions()
print(desc.query('prefix == "tropomi"').name.unique())
# ['tropomi.nrti.no2.nitrogendioxide_tropospheric_column'
# ... 43 other name here
# 'tropomi.rpro.ch4.methane_mixing_ratio_bias_corrected']
"""
from .data import get_descriptions
if self._coveragesdf is None:
self._coveragesdf = get_descriptions(
server=self.server, refresh=refresh
)
return self._coveragesdf
[docs] def capabilities(self, as_dataframe=True, refresh=False, verbose=0):
"""
At this time, the capabilities does not list cmaq.*
"""
import re
import pandas as pd
import os
import io
cappath = os.path.expanduser('~/.pyrsig/GetCapabilities.csv')
if not refresh and as_dataframe:
if self._capabilitiesdf is not None:
return self._capabilitiesdf
elif os.path.exists(cappath):
self._capabilitiesdf = pd.read_csv(cappath)
return self._capabilitiesdf
if refresh or self._capabilities is None:
self._capabilities = legacy_get(
f'https://{self.server}/rsig/rsigserver?SERVICE=wcs&VERSION='
'1.0.0&REQUEST=GetCapabilities&compress=1'
)
if as_dataframe:
os.makedirs(os.path.dirname(cappath), exist_ok=True)
cre = re.compile(
'<CoverageOfferingBrief>.+?</CoverageOfferingBrief>',
re.DOTALL + re.M
)
gre = re.compile(
r'<lonLatEnvelope srsName="WGS84\(DD\)">\s*<gml:pos>(.+?)'
+ r'</gml:pos>\s*<gml:pos>(.+?)</gml:pos>\s*</lonLatEnvelope>',
re.M
)
tre = re.compile(r'>\s+<', re.M)
ctext = self._capabilities.text
ctext = '\n'.join(cre.findall(ctext))
ctext = gre.sub(r'<bbox_str>\1 \2</bbox_str>', ctext)
ctext = tre.sub(r'><', ctext)
# Cleanup... for known issues
ctext = ctext.replace('>yyy', '>')
ctext = ctext.replace('<=', 'less than or equal to ')
ctext = ctext.replace('qa_value < 0', 'qa_value less than 0')
ctext = ctext.replace('>0=', 'greater than 0 =')
ctext = ctext.replace('<0=', 'less than 0 = ')
# version 1.5
if hasattr(pd, 'read_xml'):
ctext = f"""<?xml version="1.0" encoding="UTF-8" ?>
<WCS_Capabilities>
{ctext}
</WCS_Capabilities>"""
capabilitiesdf = pd.read_xml(io.StringIO(ctext))
else:
ccsv = ctext.replace('"', '\'')
ccsv = ccsv.replace('</name><label>', '","')
ccsv = ccsv.replace('</label><description>', '","')
ccsv = ccsv.replace('</description><bbox_str>', '","')
ccsv = ccsv.replace(
'</bbox_str></CoverageOfferingBrief>', '"\n'
)
ccsv = ccsv.replace('<CoverageOfferingBrief><name>', '"')
ccsv = 'name,label,description,bbox_str\n' + ccsv
capabilitiesdf = pd.read_csv(io.StringIO(ccsv))
capabilitiesdf['prefix'] = capabilitiesdf['name'].apply(
lambda x: x.split('.')[0]
)
capabilitiesdf.to_csv(cappath, index=False)
self._capabilitiesdf = capabilitiesdf
return self._capabilitiesdf
return self._capabilities
[docs] def keys(self, offline=True):
"""
Arguments
---------
offline : bool
If True, uses small cached set of tested coverages.
If False, finds all coverages from capabilities service.
"""
descdf = self.descriptions(refresh=not offline)
keys = tuple(sorted(descdf['name'].unique()))
return keys
[docs] def get_file(
self, formatstr, key=None, bdate=None, edate=None, bbox=None,
grid=False, corners=None, request='GetCoverage', compress=0,
overwrite=None, verbose=0
):
"""
Build url, outpath, and download the file. Returns outpath
"""
from .utils import get_file
if overwrite is None:
overwrite = self.overwrite
url, outpath = self._build_url(
formatstr, key=key, bdate=bdate, edate=edate, bbox=bbox,
grid=grid, request=request, compress=compress, corners=corners
)
if verbose > 0:
print(url)
get_file(url, outpath, verbose=verbose, overwrite=overwrite)
return outpath
def _build_url(
self, formatstr, key=None, bdate=None, edate=None, bbox=None,
grid=False, corners=None, request='GetCoverage',
compress=1
):
"""
Arguments
---------
formatstr : str
'xdr', 'ascii', 'netcdf-ioapi', 'netcdf-coards'
key : str
Coverage name
bdate : date-like
edate : date-like
bbox : iterable
grid : bool
corners : bool
If True, return corners.
request : str
RSIG service (default GetCoverage)
compress : int
1: gzip compression (default); 0: no compress
request : str
'GetCoverage' or 'GetMetadata'
all other keywords see __init__
"""
if key is None:
key = self.key
if key is None:
raise ValueError('key must be specified')
if bdate is None:
bdate = self.bdate
else:
bdate = pd.to_datetime(bdate)
if edate is None:
if self.edate is None:
edate = (
bdate + pd.to_timedelta('+1day') + pd.to_timedelta('-1s')
)
else:
edate = self.edate
else:
edate = pd.to_datetime(edate)
if bbox is None:
bbox = self.bbox
if edate < bdate:
raise ValueError('edate cannot be before bdate')
if bbox[2] < bbox[0]:
raise ValueError('elon cannot be less than wlon')
if bbox[3] < bbox[1]:
raise ValueError('nlat cannot be less than slat')
if corners is None:
corners = self.corners
grid_kw = self.grid_kw
purpleair_kw = self.purpleair_kw
hrrr_kw = self.hrrr_kw
tropomi_kw = self.tropomi_kw
tempo_kw = self.tempo_kw
viirsnoaa_kw = self.viirsnoaa_kw
pandora_kw = self.pandora_kw
calipso_kw = self.calipso_kw
if compress is None:
compress = self.compress
wlon, slat, elon, nlat = bbox
# If already gridded, do not use grid keywords
nogridkw = any([key.startswith(pre) for pre in _noregrid_prefixes])
if (grid and not nogridkw) and request == 'GetCoverage':
gridstr = self._build_grid(grid_kw)
else:
gridstr = ''
if key.startswith('viirsnoaa'):
viirsnoaastr = '&MINIMUM_QUALITY={minimum_quality}'.format(
**viirsnoaa_kw
)
else:
viirsnoaastr = ''
if key.startswith('pandora'):
pandorastr = '&MINIMUM_QUALITY={minimum_quality}'.format(
**pandora_kw
)
else:
pandorastr = ''
if key.startswith('calipso'):
calipsostr = (
'&MINIMUM_CAD={MINIMUM_CAD}'
+ '&MAXIMUM_UNCERTAINTY={MAXIMUM_UNCERTAINTY}'
).format(**calipso_kw)
else:
calipsostr = ''
if key.startswith('tropomi'):
tropomistr = (
'&MINIMUM_QUALITY={minimum_quality}'
'&MAXIMUM_CLOUD_FRACTION={maximum_cloud_fraction}'
).format(**tropomi_kw)
else:
tropomistr = ''
if key.startswith('tempo.l2'):
tempostr = (
'&MAXIMUM_CLOUD_FRACTION={maximum_cloud_fraction}'
'&MINIMUM_QUALITY={minimum_quality}'
'&MAXIMUM_SOLAR_ZENITH_ANGLE={maximum_solar_zenith_angle}'
).format(**tempo_kw)
fake_keys = ('your_key_here', 'anonymous', 'none')
if tempo_kw['api_key'] not in fake_keys:
tempostr += '&KEY={api_key}'.format(**tempo_kw)
else:
tempostr = ''
if key.startswith('hrrr'):
hrrrstr = '&CMAQ={CMAQ}'.format(**hrrr_kw)
else:
hrrrstr = ''
if key.startswith('purpleair'):
if purpleair_kw['api_key'] == 'your_key_here':
raise ValueError('''You must set the purpleair_kw api_key
(e.g., api.purpleair_kw["api_key"] = "9...") before submitting a query.''')
purpleairstr = (
'&OUT_IN_FLAG={out_in_flag}&MAXIMUM_DIFFERENCE='
'{maximum_difference}&MAXIMUM_RATIO={maximum_ratio}'
'&AGGREGATE={freq}&MINIMUM_AGGREGATION_COUNT_PERCENTAGE='
'{agg_pct}&DEFAULT_HUMIDITY={default_humidity}&KEY={api_key}'
).format(**purpleair_kw)
else:
purpleairstr = ''
if corners == 1:
if any([key.startswith(pre) for pre in _corner_prefixes]):
cornerstr = f'&CORNERS={corners}'
else:
cornerstr = ''
else:
cornerstr = ''
if (
any([key.startswith(pre) for pre in _nolonlats_prefixes])
and formatstr != 'ascii'
):
nolonlatsstr = '&NOLONLATS=1'
else:
nolonlatsstr = ''
url = (
f'https://{self.server}/rsig/rsigserver?SERVICE=wcs&VERSION=1.0.0'
f'&REQUEST={request}&FORMAT={formatstr}'
f'&TIME={bdate:%Y-%m-%dT%H:%M:%SZ}/{edate:%Y-%m-%dT%H:%M:%SZ}'
f'&BBOX={wlon},{slat},{elon},{nlat}'
f'&COVERAGE={key}'
f'&COMPRESS={compress}'
) + (
purpleairstr + viirsnoaastr + tropomistr + tempostr + pandorastr
+ calipsostr + hrrrstr + gridstr + cornerstr + nolonlatsstr
)
outpath = (
f'{self.workdir}/{key}_{bdate:%Y-%m-%dT%H%M%SZ}'
f'_{edate:%Y-%m-%dT%H%M%SZ}'
)
if formatstr.lower() == 'ascii':
outpath += '.csv'
elif formatstr.lower() == 'netcdf-ioapi':
outpath += '.nc'
elif formatstr.lower() == 'netcdf-coards':
outpath += '.nc'
elif formatstr.lower() == 'xdr':
outpath += '.xdr'
if request == 'GetMetadata':
outpath += '.txt'
elif compress:
outpath += '.gz'
return url, outpath
def _build_grid(self, grid_kw):
"""
Build the regrid portion of the URL
"""
grid_kw.setdefault('earth_radius', 6370000)
GDTYP = grid_kw.get('GDTYP', 2)
if GDTYP == 1:
projstr = '&LONLAT=1'
elif GDTYP == 2:
projstr = '&LAMBERT={P_ALP},{P_BET},{XCENT},{YCENT}'
elif GDTYP == 6:
projstr = '&STEREOGRAPHIC={XCENT},{YCENT},{P_BET}'
elif GDTYP == 7:
projstr = '&MERCATOR={P_GAM}'
else:
raise KeyError('GDTYP only implemented for ')
gridstr = (
'®RID={REGRID}'
+ projstr
+ '&ELLIPSOID={earth_radius},{earth_radius}'
+ '&GRID={NCOLS},{NROWS},{XORIG},{YORIG},{XCELL},{YCELL}'
)
if grid_kw.get('REGRID_AGGREGATE', 'None').strip() != 'None':
gridstr += "®RID_AGGREGATE={REGRID_AGGREGATE}"
return gridstr.format(**grid_kw)
[docs] def to_dataset(
self, key=None, bdate=None, edate=None, bbox=None,
corners=None, withmeta=False, verbose=0, grid=False
):
"""
Only works with queries that return subset or grid.
All arguments default to those provided during initialization.
Arguments
---------
key : str
Default key for query (e.g., 'aqs.o3', 'purpleair.pm25_corrected',
or 'tropomi.offl.no2.nitrogendioxide_tropospheric_column')
bdate : str or pd.Datetime
beginning date (inclusive) defaults to yesterday at 0Z
edate : str or pd.Datetime
ending date (inclusive) defaults to bdate + 23:59:59
bbox : tuple
wlon, slat, elon, nlat in decimal degrees (-180 to 180)
withmeta: bool
If True, add 'GetMetadata' results as a "metadata" attribute of the
dataframe. This is useful for understanding the underlying datasets
used to create the result.
verbose : int
level of verbosity
Returns
-------
ds : xarray.Dataset
Results from download
"""
from . import xdr
outpath = self.get_file(
'xdr', key=key, bdate=bdate, edate=edate, bbox=bbox,
grid=grid, verbose=verbose, corners=corners,
compress=1
)
ds = xdr.from_xdrfile(outpath, as_dataframe=False)
if withmeta:
metapath = self.get_file(
'ascii', key=key, bdate=bdate, edate=edate, bbox=bbox,
grid=grid, verbose=verbose, request='GetMetadata',
compress=1
)
metatxt = open(metapath, 'r').read()
ds.attrs['metadata'] = metatxt
return ds
def _datesfunc(
self, func, bdates, edates=None, **kwds
):
"""
Thin wrapper around to_* functions
Arguments
---------
func : method
to_dataframe, to_ioapi or to_dataset
bdates : list
Start dates
edates : list
End dates
kwds : mappable
See func for explanation of all other keywords.
Returns
-------
out : list
List of outputs from func
"""
if edates is None:
edates = bdates + pd.to_timedelta('86399s') # default to 1day
outs = []
for bdate, edate in zip(bdates, edates):
try:
out = func(
bdate=bdate, edate=edate, **kwds
)
except Exception:
print(f'Unable to retrieve {bdate} {edate}: {kwds}')
continue
outs.append(out)
return outs
[docs] def to_dataframe(
self, key=None, bdate=None, edate=None, bbox=None, unit_keys=True,
parse_dates=False, corners=None, withmeta=False, verbose=0,
backend='ascii', grid=False
):
"""
All arguments default to those provided during initialization.
Arguments
---------
key : str
Default key for query (e.g., 'aqs.o3', 'purpleair.pm25_corrected',
or 'tropomi.offl.no2.nitrogendioxide_tropospheric_column')
bdate : str or pd.Datetime
beginning date (inclusive) defaults to yesterday at 0Z
edate : str or pd.Datetime
ending date (inclusive) defaults to bdate + 23:59:59
bbox : tuple
wlon, slat, elon, nlat in decimal degrees (-180 to 180)
unit_keys : bool
If True, keep unit in column name.
If False, move last parenthetical part of key to attrs of Series.
parse_dates : bool
If True, parse Timestamp(UTC)
withmeta: bool
If True, add 'GetMetadata' results as a "metadata" attribute of the
dataframe. This is useful for understanding the underlying datasets
used to create the result.
verbose : int
level of verbosity
Returns
-------
df : pandas.DataFrame
Results from download
"""
from collections.abc import Iterable
from . import xdr
from . import bin
if isinstance(bdate, Iterable) and not isinstance(bdate, str):
kwds = dict(
key=key, bbox=bbox, unit_keys=unit_keys,
parse_dates=parse_dates, corners=corners, withmeta=withmeta,
verbose=verbose, backend=backend, grid=grid
)
dfs = self._datesfunc(
self.to_dataframe, bdates=bdate, edates=edate, **kwds
)
df = pd.concat(dfs, ignore_index=True)
return df
assert backend in {'ascii', 'xdr', 'bin'}
if any([key.startswith(pfx) for pfx in _shpxdrprefixes]):
backend = 'xdr'
elif any([key.startswith(pfx) for pfx in _shpbinprefixes]):
backend = 'bin'
outpath = self.get_file(
backend, key=key, bdate=bdate, edate=edate, bbox=bbox,
grid=grid, verbose=verbose, corners=corners,
compress=1
)
if backend == 'ascii':
df = pd.read_csv(outpath, delimiter='\t', na_values=[-9999., -999])
elif backend == 'xdr':
df = xdr.from_xdrfile(outpath, na_values=[-9999., -999])
elif backend == 'bin':
df = bin.from_binfile(outpath)
else:
raise KeyError(f'format {backend} unknown; use xdr, bin or ascii')
if withmeta:
metapath = self.get_file(
'ascii', key=key, bdate=bdate, edate=edate, bbox=bbox,
grid=grid, verbose=verbose, request='GetMetadata',
compress=1
)
metatxt = open(metapath, 'r').read()
df.attrs['metadata'] = metatxt
if not unit_keys:
columns = [k for k in df.columns]
newcolumns = []
unit_dict = {}
for k in columns:
if '(' not in k:
newk = k
unit = 'unknown'
else:
idx = k.rfind('(')
newk = k[:idx]
unit = k[idx+1:-1]
unit_dict[newk] = unit
newcolumns.append(newk)
df.columns = newcolumns
for k in newcolumns:
if hasattr(df[k], 'attrs'):
df[k].attrs.update(dict(units=unit_dict.get(k, 'unknown')))
if parse_dates:
if 'Timestamp(UTC)' in df:
df['time'] = pd.to_datetime(df['Timestamp(UTC)'])
if 'Timestamp' in df:
df['time'] = pd.to_datetime(df['Timestamp'])
return df
[docs] def to_geodataframe(
self, *args, centroid=False, keepgeomcols=False, **kwds
):
"""
Same as to_dataframe, but returns a geopandas.GeoDataFrame where the
geometry represents either the centroid point or, where possible, the
polygon.
Arguments
---------
args : see to_dataframe
kwds : see to_dataframe
centroid : bool
If True, return a point at the centroid for polygons
keepgeomcols : bool
If True, do not drop coordinate columns
Returns
-------
gdf : geopandas.GeoDataFrame
DataFrame with polygon or point geometry where appropriate. Only
dataframes with corners
"""
from shapely import points, polygons
import geopandas as gpd
from warnings import warn
crnrkeys = [
'longitude_sw(deg)', 'latitude_sw(deg)',
'longitude_se(deg)', 'latitude_se(deg)',
'longitude_ne(deg)', 'latitude_ne(deg)',
'longitude_nw(deg)', 'latitude_nw(deg)',
'longitude_sw(deg)', 'latitude_sw(deg)',
]
centroidkeys = ['longitude(deg)', 'latitude(deg)']
if not kwds.get('unit_keys', True):
# if unit_keys is false, (deg) will not be part of the key
crnrkeys = [k.replace('(deg)', '') for k in crnrkeys]
centroidkeys = [k.replace('(deg)', '') for k in centroidkeys]
df = self.to_dataframe(*args, **kwds)
tmpdf = df.rename(columns=str.lower, copy=False)
lcols = list(tmpdf.columns)
# Drop coordinate keys
keepkeys = list(df.columns)
if not keepgeomcols:
keepkeys = [
k for k in keepkeys
if k.lower() not in (crnrkeys + centroidkeys)
]
if all([k in lcols for k in crnrkeys]) and not centroid:
geom = polygons(tmpdf[crnrkeys].values.reshape(-1, 5, 2))
elif all([k in lcols for k in centroidkeys]):
geom = points(tmpdf[centroidkeys].values)
else:
warn('no geom keys; returning original dataframe')
return df
gdf = gpd.GeoDataFrame(df[keepkeys], geometry=geom, crs=4326)
return gdf
[docs] def to_ioapi(
self, key=None, bdate=None, edate=None, bbox=None, withmeta=False,
removegz=False, verbose=0
):
"""
All arguments default to those provided during initialization.
Arguments
---------
key : str
Default key for query (e.g., 'aqs.o3', 'purpleair.pm25_corrected',
or 'tropomi.offl.no2.nitrogendioxide_tropospheric_column')
bdate : str or pd.Datetime
beginning date (inclusive) defaults to yesterday at 0Z
edate : str or pd.Datetime
ending date (inclusive) defaults to bdate + 23:59:59
bbox : tuple
wlon, slat, elon, nlat in decimal degrees (-180 to 180)
withmeta : bool
If True, add 'GetMetadata' results at an attribute "metadata" to the
netcdf file. This is useful for understanding the underlying datasets
used to create the result.
removegz : bool
If True, then remove the downloaded gz file. Bad for caching.
Returns
-------
ds : xarray.Dataset
Results from download
"""
import gzip
import shutil
import os
from collections.abc import Iterable
if isinstance(bdate, Iterable) and not isinstance(bdate, str):
import xarray as xr
kwds = dict(
key=key, bbox=bbox, withmeta=withmeta,
removegz=removegz, verbose=verbose
)
dss = self._datesfunc(
self.to_ioapi, bdates=bdate, edates=edate, **kwds
)
dss = xr.concat(dss, dim='TSTEP')
return dss
# always use compression for network speed.
outpath = self.get_file(
'netcdf-ioapi', key=key, bdate=bdate, edate=edate, bbox=bbox,
grid=True, compress=1, verbose=verbose
)
ncpath = outpath[:-3]
# Uncompress the netcdf file. If encoding is available, apply it
if not self.overwrite and os.path.exists(ncpath):
if verbose >= 0:
print('Using cached:', ncpath)
else:
with gzip.open(outpath, 'rb') as f_in:
with open(ncpath, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
f_out.flush()
if self.encoding is not None:
import xarray as xr
with xr.open_dataset(ncpath) as tmpf:
tmpf.load()
for key in tmpf.data_vars:
tvar = tmpf[key]
tvar.encoding.update(self.encoding)
if key in ('TFLAG', 'COUNT'):
tvar.encoding.pop('_FillValue', '')
tmpf.to_netcdf(ncpath, format='NETCDF4_CLASSIC')
if withmeta:
metapath = self.get_file(
'netcdf-ioapi', key=key, bdate=bdate, edate=edate, bbox=bbox,
grid=True, compress=1, request='GetMetadata', verbose=verbose
)
else:
metapath = None
f = open_ioapi(ncpath, metapath=metapath)
if removegz:
os.remove(outpath)
return f
[docs] def to_netcdf(
self, key=None, bdate=None, edate=None, bbox=None, grid=False,
withmeta=False, removegz=False, verbose=0
):
"""
All arguments default to those provided during initialization.
Arguments
---------
key : str
Default key for query (e.g., 'aqs.o3', 'purpleair.pm25_corrected',
or 'tropomi.offl.no2.nitrogendioxide_tropospheric_column')
bdate : str or pd.Datetime
beginning date (inclusive) defaults to yesterday at 0Z
edate : str or pd.Datetime
ending date (inclusive) defaults to bdate + 23:59:59
bbox : tuple
wlon, slat, elon, nlat in decimal degrees (-180 to 180)
grid : bool
Add column and row variables with grid assignments.
withmeta : bool
If True, add 'GetMetadata' results at an attribute "metadata" to the
netcdf file.
removegz : bool
If True, then remove the downloaded gz file. Bad for caching.
Returns
-------
ds : xarray.Dataset
Results from download
"""
import gzip
import shutil
import os
import xarray as xr
from collections.abc import Iterable
if isinstance(bdate, Iterable) and not isinstance(bdate, str):
kwds = dict(
key=key, bbox=bbox, withmeta=withmeta,
removegz=removegz, verbose=verbose
)
dss = self._datesfunc(
self.to_netcdf, bdates=bdate, edates=edate, **kwds
)
dss = xr.concat(dss, dim='time')
return dss
# always use compression for network speed.
outpath = self.get_file(
'netcdf-coards', key=key, bdate=bdate, edate=edate, bbox=bbox,
grid=grid, compress=1, verbose=verbose
)
ncpath = outpath[:-3]
# Uncompress the netcdf file. If encoding is available, apply it
if not self.overwrite and os.path.exists(ncpath):
if verbose > 0:
print('Using cached:', ncpath)
else:
with gzip.open(outpath, 'rb') as f_in:
with open(ncpath, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
f_out.flush()
f = xr.open_dataset(ncpath)
if withmeta:
metapath = self.get_file(
'netcdf-coards', key=key, bdate=bdate, edate=edate, bbox=bbox,
grid=grid, compress=1, request='GetMetadata', verbose=verbose
)
with open(metapath, 'r') as metaf:
metatxt = metaf.read()
f.attrs['metadata'] = metatxt
if removegz:
os.remove(outpath)
return f
[docs]class RsigGui:
[docs] @classmethod
def from_api(cls, api):
gui = cls()
gui._bbwe.value = api.bbox[::2]
gui._bbsn.value = api.bbox[1::2]
if api.bdate is not None:
bdate = pd.to_datetime(api.bdate)
gui._dates.value = bdate.floor('1D')
gui._hours.value = (
bdate - bdate.floor('1D')
).total_seconds() // 3600
if api.edate is not None:
edate = pd.to_datetime(api.edate)
gui._datee.value = edate.floor('1D')
gui._houre.value = (
edate - edate.floor('1D')
).total_seconds() // 3600
if api.key is not None:
gui._prodd.value = api.key
if api.grid_kw is not None:
gui._gridd.value = api.grid_kw['GDNAM']
if api.workdir is not None:
gui._workd.value = api.workdir
return gui
def __init__(self):
"""
RsigGui Object designed for IPython with ipywidgets in Jupyter
Example:
gui = RsigGui()
gui.form # As last line in cell, displays controls for user
gui.plotopts() # Plots current options
gui.check() # Check bounding box and date options make sense
rsigapi = gui.get_api() # Convert gui to standard api
# proceed with normal RsigApi usage
"""
from datetime import date
from ipywidgets import Layout, Box, Dropdown, Label, FloatRangeSlider
from ipywidgets import DatePicker, Textarea, BoundedIntText, Output
api = RsigApi()
descdf = api.descriptions().copy()
descdf['begin'] = descdf['beginPosition']
descdf['end'] = descdf['endPosition']
descdf['bbox'] = descdf['bbox_str']
descdf['opt_txt'] = descdf.apply(
lambda x: '{name}\t({begin}-{end})\t({bbox})'.format(**x), axis=1
)
descdf['sort'] = ~descdf.name.isin(api.keys())
prodopts = descdf.sort_values(by=['sort', 'name'], ascending=True)[
['opt_txt', 'name']
].values.tolist()
l100 = Layout(width='95%')
l50 = Layout(width='30em')
self._prodd = prodd = Dropdown(
options=prodopts, description='Product', layout=l100,
value='tropomi.offl.no2.nitrogendioxide_tropospheric_column'
)
self._gridd = gridd = Dropdown(
options=list(_def_grid_kw), value='12US1', description='grid',
layout=l50
)
self._dates = datesa = DatePicker(
description='Start Date', disabled=False, layout=l50,
value=(
date.today()
- pd.to_timedelta('7d')
)
)
self._datee = dateea = DatePicker(
description='End Date', disabled=False, value=datesa.value,
layout=l50
)
self._hours = hours = BoundedIntText(
min=0, max=23, value=0, description='Start HR', layout=l50
)
self._houre = houre = BoundedIntText(
min=0, max=23, value=23, description='End HR', layout=l50
)
self._bbsn = FloatRangeSlider(
min=-90, max=90, value=(24, 50), description='South-North',
layout=l100
)
self._bbwe = FloatRangeSlider(
min=-180, max=180, value=(-126, -66), description='West-East',
layout=l100
)
self._workd = workd = Textarea(
value='.', description='Work Dir', layout=l100
)
self._out = Output(layout=l100)
form_items = [
Label(value='RSIG Options'),
prodd, self._bbsn, self._bbwe,
Box([datesa, hours]), Box([dateea, houre]),
gridd, workd, self._out
]
[
fi.observe(self._update_out, names='value')
for fi in form_items + [datesa, hours, dateea, houre]
]
self._form = Box(form_items, layout=Layout(
display='flex', flex_flow='column', border='solid 2px',
align_items='stretch', width='100%'
))
def _update_out(self, *args):
from IPython.display import clear_output, display
fig = self.plotopts()
with self._out:
clear_output(wait=True)
display(fig)
[docs] def date_range(self):
import pandas as pd
return pd.date_range(self.bdate, self.edate)
@property
def form(self):
return self._form
@property
def key(self):
return self._prodd.value
@property
def bdate(self):
import pandas as pd
return (
pd.to_datetime(self._dates.value)
+ pd.to_timedelta(self._hours.value, unit='h')
)
@property
def edate(self):
import pandas as pd
hms = self._houre.value * 3600 + 3599
return pd.to_datetime(
self._datee.value
) + pd.to_timedelta(hms, unit='s')
@property
def grid_kw(self):
return self._gridd.value
@property
def bbox(self):
w, e = self._bbwe.value
s, n = self._bbsn.value
return (w, s, e, n)
@property
def workdir(self):
return self._workd.value
[docs] def plotopts(self):
import pycno
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
bbw, bbs, bbe, bbn = self.bbox
c = {True: 'g', False: 'r'}.get(self.check(), 'r')
ax.plot(
[bbw, bbe, bbe, bbw, bbw],
[bbs, bbs, bbn, bbn, bbs],
color=c
)
if c == 'r':
ax.text(
.5, .5, 'Invalid Options', horizontalalignment='center',
transform=ax.transAxes, color='r', fontsize=30,
bbox={'edgecolor': c, 'facecolor': 'white'}
)
fig.suptitle(f'Query Options: {self.key}, {self.grid_kw}')
ax.set(title=f'{self.bdate:%FT%H:%M:%S} {self.edate:%FT%H:%M:%S}')
pycno.cno().drawstates(ax=ax)
return fig
[docs] def get_api(self):
rsigapi = RsigApi(
key=self.key, bdate=self.bdate, edate=self.edate,
bbox=self.bbox, grid_kw=self.grid_kw, workdir=self.workdir
)
return rsigapi
[docs] def check(self, action='return'):
bbw, bbs, bbe, bbn = self.bbox
iswe = bbw < bbe
issn = bbs < bbn
isbe = self.bdate <= self.edate
if not iswe:
_actionf('West is East of East', action)
if not issn:
_actionf('South is North of North', action)
if not isbe:
_actionf('bdate is later than edate', action)
return iswe & issn & isbe
def findkeys(
temporal=None, bbox=None, case=False, flags=0, verbose=1, **kwds
):
"""
Arguments
---------
temporal : str
Date or date range (2023-01-01 or 2023-01-01/2023-12-31)
bbox : iterable
If provided (default: None), bounding box (e.g., -180, -90, 180, 90)
case : bool, default False
If True, case sensitive.
flags : int, default 0 (no flags)
Regex module flags, e.g. re.IGNORECASE.
kwds : mappable
Regular expressions for columns (eg., name='tempo.*') see descdf
returns for more columns
verbose : int
Level of verbosity
Returns
-------
descdf : pandas.DataFrame
Has columns: name, label, description, bbox_str, beginPosition,
timeResolution, endPosition, and prefix
"""
api = RsigApi()
df = api.descriptions().fillna('')
for k, v in kwds.items():
if verbose > 1:
print(df.shape[0], f'before query ({k} {v})')
qstr = f'{k}.str.match("{v}", case={case}, flags={flags})'
df = df.query(qstr)
if verbose > 0:
print(df.shape[0], f'after query ({qstr})')
if bbox is not None:
from shapely import box
import numpy as np
if verbose > 1:
print(df.shape[0], f'before bbox ({bbox})')
bbox = box(*bbox)
keepids = []
for id, row in df.iterrows():
try:
dbbox = np.array(row['bbox_str'].split(), dtype='f')
except Exception as e:
if verbose > 2:
etmpl = '{name} {bbox_str} {e}'
print(etmpl.format(e=e, **row))
dbbox = np.array([-180, -90, 180, 90], dtype='f')
dbbox = box(*dbbox)
if bbox.intersects(dbbox):
keepids.append(id)
df = df.loc[keepids]
if verbose > 0:
print(df.shape[0], f'after bbox ({bbox})')
if temporal is not None:
if verbose > 1:
print(df.shape[0], f'before temporal ({temporal})')
parts = temporal.split('/')
start = pd.to_datetime(parts[0], utc=True)
if len(parts) == 1:
end = start
else:
if parts[1].startswith('PT'):
end = start + pd.to_timedelta(parts[1][2:])
else:
end = pd.to_datetime(parts[1], utc=True)
keepids = []
for id, row in df.iterrows():
try:
dstart = pd.to_datetime(row['beginPosition'], utc=True)
if row['endPosition'].startswith('PT'):
dend = dstart + pd.to_timedelta(row['endPosition'][2:])
else:
dend = pd.to_datetime(row['endPosition'], utc=True)
except Exception as e:
if verbose > 2:
etmpl = '{name} {beginPosition} {endPosition} {e}'
print(etmpl.format(e=e, **row))
keepids.append(id)
continue
if start <= dend and end >= dstart:
keepids.append(id)
df = df.loc[keepids]
if verbose > 0:
print(df.shape[0], f'after temporal ({start} {end})')
return df.copy()
# Add easy access defaults
_defapi = RsigApi()
descriptions = _defapi.descriptions
to_dataframe = _defapi.to_dataframe
to_ioapi = _defapi.to_ioapi
to_netcdf = _defapi.to_netcdf