Source code for pyrsig

__all__ = ['RsigApi', 'RsigGui', 'open_ioapi', 'open_mfioapi', 'cmaq']
__version__ = '0.10.0'

from . import cmaq
from .cmaq import open_ioapi, open_mfioapi
import pandas as pd
from .utils import customize_grid, def_grid_kw as _def_grid_kw
from .utils import coverages_from_xml, legacy_get


_corner_prefixes = (
    'gasp', 'goes', 'modis', 'omibehr', 'tempo', 'tropomi', 'viirs'
)
_nolonlats_prefixes = ('cmaq', 'regridded')
_noregrid_prefixes = ('cmaq', 'regridded')
_shpxdrprefixes = ['hms.']
_shpbinprefixes = [
    'landuse.atlantic.population_iclus',
    'landuse.gulf.population_iclus',
    'landuse.pacific.population_iclus',
]


def _actionf(msg, action, ErrorTyp=None):
    """
    Convenience function for warning or raising an error.

    Arguments
    ---------
    msg : str
        Message to raise or warn.
    action : str
        If 'error', raise ErrorTyp(msg)
        If 'warn', warnings.warn using msg
        Else do nothing.
    ErrorTyp : Exception
        Defaults to ErrorTyp

    Returns
    -------
    None
    """
    import warnings

    if ErrorTyp is None:
        ErrorTyp = ValueError
    if action == 'error':
        raise ErrorTyp(msg)
    elif action == 'warn':
        warnings.warn(msg)


[docs]class RsigApi: def __init__( self, key=None, bdate=None, edate=None, bbox=None, grid_kw=None, tropomi_kw=None, purpleair_kw=None, viirsnoaa_kw=None, tempo_kw=None, pandora_kw=None, calipso_kw=None, server='ofmpub.epa.gov', compress=1, corners=1, encoding=None, overwrite=False, workdir='.', gridfit=False ): """ RsigApi is a python-based interface to RSIG's web-based API Arguments --------- key : str Default key for query (e.g., 'aqs.o3', 'purpleair.pm25_corrected', or 'tropomi.offl.no2.nitrogendioxide_tropospheric_column') bdate : str or pd.Datetime beginning date (inclusive) defaults to yesterday at 0Z edate : str or pd.Datetime ending date (inclusive) defaults to bdate + 23:59:59 bbox : tuple wlon, slat, elon, nlat in decimal degrees (-180 to 180) grid_kw : str or dict If str, must be 12US1, 1US1, 12US2, 1US2, 36US3, 108NHEMI2, 36NHEMI2 and will be used to set parameters based on EPA domains. If dict, IOAPI mapping parameters. For details, look at the defaults: import pyrsig; print(pyrsig.RsigApi().grid_kw) The REGRID_AGGREGATE defines how the regridded values are aggregated in time. Options are None (default), daily, or all. viirsnoaa_kw : dict Dictionary of VIIRS NOAA filter parameters default {'minimum_quality': 'high'} other options 'medium' or 'low' tropomi_kw : dict Dictionary of TropOMI filter parameters default {'minimum_quality': 75, 'maximum_cloud_fraction': 1.0} options are 0-100 and 0-1. purpleair_kw : dict Dictionary of purpleair filter parameters and api_key. 'out_in_flag': 0, # options 0, 2, '' 'freq': 'hourly', # options hourly, daily, monthly, yearly, none 'maximum_difference': 5, # integer 'maximum_ratio': 0.70, # float 'agg_pct': 75, # 0-100 'default_humidity': 50, 'api_key': 'your_key_here' tempo_kw : dict Dictionary of TEMPO filter parameters default 'api_key': 'your_key_here' # 'password' 'minimum_quality': 'normal' 'maximum_cloud_fraction': 1.0 'maximum_solar_zenith_angle': 70. pandora_kw : dict Dictionary of Pandora filter parameters default {'minimum_quality': 'high'} other options 'medium' or 'low' calipso_kw : dict Dictionary of Calipso filter parameters default {'MINIMUM_CAD': 20, 'MAXIMUM_UNCERTAINTY': 99} server : str 'ofmpub.epa.gov' for external users 'maple.hesc.epa.gov' for on EPA VPN users compress : int 1 to transfer files with gzip compression 0 to transfer uncompressed files (slow) encoding : dict IF encoding is provided, netCDF files will be stored as NetCDF4 with encoding for all variables. If _FillValue is provided, it will not be applied to TFLAG and COUNT. overwrite : bool If True, overwrite downloaded files in workdir. If False, reuse downloaded files in workdir. workdir : str Working directory (must exist) defaults to '.' gridfit : bool Default (False) keep grid as supplied. If True, redefine grid to remove cells outside the bbox. Properties ---------- grid_kw : dict Dictionary of regridding IOAPI properties. Defaults to 12US1 viirsnoaa_kw : dict Dictionary of filter properties tropomi_kw : dict Dictionary of filter properties tempo_kw : dict Dictionary of filter properties purpleair_kw : dict Dictionary of filter properties and api_key. Unlike other options, purpleair_kw will not work with the defaults. The user *must* update teh api_key property to their own key. Contact PurpleAir for more details. """ self._description = {} self._capabilities = None self._coveragesdf = None self._capabilitiesdf = None self.server = server self.key = key self.compress = compress self.workdir = workdir self.encoding = encoding self.overwrite = overwrite if bbox is None: self.bbox = (-126, 24, -66, 50) else: self.bbox = bbox if bdate is None: bdate = ( pd.to_datetime('now', utc=True) - pd.to_timedelta('1day') ).replace(hour=0, minute=0, second=0, microsecond=0, nanosecond=0) self.bdate = pd.to_datetime(bdate) if edate is None: self.edate = edate else: self.edate = pd.to_datetime(edate) self.corners = corners if grid_kw is None: grid_kw = '12US1' if isinstance(grid_kw, str): if grid_kw not in _def_grid_kw: raise KeyError('unknown grid, you must specify properites') grid_kw = _def_grid_kw[grid_kw].copy() if gridfit: grid_kw = customize_grid(grid_kw, self.bbox) self.grid_kw = grid_kw if tropomi_kw is None: tropomi_kw = {} tropomi_kw.setdefault('minimum_quality', 75) tropomi_kw.setdefault('maximum_cloud_fraction', 1.0) self.tropomi_kw = tropomi_kw if tempo_kw is None: tempo_kw = {} tempo_kw.setdefault('minimum_quality', 'normal') tempo_kw.setdefault('maximum_cloud_fraction', 1.0) tempo_kw.setdefault('api_key', 'your_key_here') tempo_kw.setdefault('maximum_solar_zenith_angle', 70.) self.tempo_kw = tempo_kw if viirsnoaa_kw is None: viirsnoaa_kw = {} viirsnoaa_kw.setdefault('minimum_quality', 'high') self.viirsnoaa_kw = viirsnoaa_kw if pandora_kw is None: pandora_kw = {} pandora_kw.setdefault('minimum_quality', 'high') self.pandora_kw = pandora_kw if calipso_kw is None: calipso_kw = {} calipso_kw.setdefault('MINIMUM_CAD', 20) calipso_kw.setdefault('MAXIMUM_UNCERTAINTY', 99) self.calipso_kw = calipso_kw if purpleair_kw is None: purpleair_kw = {} defpurp_kw = { 'out_in_flag': 0, 'freq': 'hourly', 'maximum_difference': 5, 'maximum_ratio': 0.70, 'agg_pct': 75, 'api_key': 'your_key_here', 'default_humidity': 50.000000 } for k, v in defpurp_kw.items(): purpleair_kw.setdefault(k, v) self.purpleair_kw = purpleair_kw
[docs] def set_grid_kw(self, grid_kw): if isinstance(grid_kw, str): if grid_kw not in _def_grid_kw: raise KeyError('unknown grid, you must specify properites') grid_kw = _def_grid_kw[grid_kw].copy() self.grid_kw = grid_kw
[docs] def resize_grid(self, clip=True): """ Update grid_kw property so that it only covers the bbox by adjusting the XORIG, YORIG, NCOLS and NROWS. If clip is True, this has the affect of reducing the number of rows and columns. This is useful when the area of interest is much smaller than the grid defined in grid_kw. Arguments --------- clip : bool Returns ------- None """ self.grid_kw = customize_grid(self.grid_kw, self.bbox, clip=clip)
[docs] def describe(self, key, as_dataframe=True, raw=False): """ describe returns details about the coverage specified by key. Details include spatial bounding box, time coverage, time resolution, variable label, and a short description. DescribeCoverage with a COVERAGE should be faster than descriptions because it only returns a small xml chunk. Currently, DescribeCoverage with a COVERAGE specified is unreliable because of malformed xml. If this fails, describe will instead request all coverages and query the specific coverage. Arguments --------- as_dataframe : bool Defaults to True and descriptions are returned as a dataframe. If False, returns a list of elements. raw : bool Return raw xml instead of parsing. Useful for debugging. Returns ------- coverages : pandas.DataFrame or list dataframe or list of parsed descriptions Example ------- df = rsigapi.describe('airnow.no2') print(df.to_csv()) # ,name,label,description,bbox_str,beginPosition,timeResolution # 0,no2,no2(ppb),UTC hourly mean surface measured nitrogen ..., # ... -157 21 -51 64,2003-01-02T00:00:00Z,PT1H """ import warnings if key not in self._description: r = legacy_get( f'https://{self.server}/rsig/rsigserver?SERVICE=wcs&VERSION=' f'1.0.0&REQUEST=DescribeCoverage&COVERAGE={key}&compress=1' ) self._description[key] = r.text if raw: return self._description[key] try: coverages = coverages_from_xml(self._description[key]) except Exception as e: warnings.warn(str(e) + '; using descriptions') return self.descriptions().query(f'name == "{key}"') if as_dataframe: coverages = pd.DataFrame.from_records(coverages) coverages['prefix'] = coverages['name'].apply( lambda x: x.split('.')[0] ) coverages = coverages.drop('tag', axis=1) return coverages
[docs] def descriptions(self, refresh=False, verbose=0): """ Experimental and may change. descriptions returns details about all coverages. Details include spatial bounding box, time coverage, time resolution, variable label, and a short description. Currently, parses capabilities using xml.etree.ElementTree and returns coverages from details available in CoverageOffering elements from DescribeCoverage. Currently cleaning up data xml elements that are bad and doing a per-coverage parsing to increase fault tolerance in the xml. Arguments --------- refresh : bool If True, get new copy and save to ~/.pyrsig/descriptons.xml If False (default), reload from saved if available. verbose : int If verbose is greater than 0, show warnings from parsing. Returns ------- coverages : pandas.DataFrame or list dataframe or list of parsed descriptions Example ------- rsigapi = pyrsig.RsigApi() desc = rsigapi.descriptions() print(desc.query('prefix == "tropomi"').name.unique()) # ['tropomi.nrti.no2.nitrogendioxide_tropospheric_column' # ... 43 other name here # 'tropomi.rpro.ch4.methane_mixing_ratio_bias_corrected'] """ from .data import get_descriptions if self._coveragesdf is None: self._coveragesdf = get_descriptions( server=self.server, refresh=refresh ) return self._coveragesdf
[docs] def capabilities(self, as_dataframe=True, refresh=False, verbose=0): """ At this time, the capabilities does not list cmaq.* """ import re import pandas as pd import os import io cappath = os.path.expanduser('~/.pyrsig/GetCapabilities.csv') if not refresh and as_dataframe: if self._capabilitiesdf is not None: return self._capabilitiesdf elif os.path.exists(cappath): self._capabilitiesdf = pd.read_csv(cappath) return self._capabilitiesdf if refresh or self._capabilities is None: self._capabilities = legacy_get( f'https://{self.server}/rsig/rsigserver?SERVICE=wcs&VERSION=' '1.0.0&REQUEST=GetCapabilities&compress=1' ) if as_dataframe: os.makedirs(os.path.dirname(cappath), exist_ok=True) cre = re.compile( '<CoverageOfferingBrief>.+?</CoverageOfferingBrief>', re.DOTALL + re.M ) gre = re.compile( r'<lonLatEnvelope srsName="WGS84\(DD\)">\s*<gml:pos>(.+?)' + r'</gml:pos>\s*<gml:pos>(.+?)</gml:pos>\s*</lonLatEnvelope>', re.M ) tre = re.compile(r'>\s+<', re.M) ctext = self._capabilities.text ctext = '\n'.join(cre.findall(ctext)) ctext = gre.sub(r'<bbox_str>\1 \2</bbox_str>', ctext) ctext = tre.sub(r'><', ctext) # Cleanup... for known issues ctext = ctext.replace('>yyy', '>') ctext = ctext.replace('<=', 'less than or equal to ') ctext = ctext.replace('qa_value < 0', 'qa_value less than 0') ctext = ctext.replace('>0=', 'greater than 0 =') ctext = ctext.replace('<0=', 'less than 0 = ') # version 1.5 if hasattr(pd, 'read_xml'): ctext = f"""<?xml version="1.0" encoding="UTF-8" ?> <WCS_Capabilities> {ctext} </WCS_Capabilities>""" capabilitiesdf = pd.read_xml(io.StringIO(ctext)) else: ccsv = ctext.replace('"', '\'') ccsv = ccsv.replace('</name><label>', '","') ccsv = ccsv.replace('</label><description>', '","') ccsv = ccsv.replace('</description><bbox_str>', '","') ccsv = ccsv.replace( '</bbox_str></CoverageOfferingBrief>', '"\n' ) ccsv = ccsv.replace('<CoverageOfferingBrief><name>', '"') ccsv = 'name,label,description,bbox_str\n' + ccsv capabilitiesdf = pd.read_csv(io.StringIO(ccsv)) capabilitiesdf['prefix'] = capabilitiesdf['name'].apply( lambda x: x.split('.')[0] ) capabilitiesdf.to_csv(cappath, index=False) self._capabilitiesdf = capabilitiesdf return self._capabilitiesdf return self._capabilities
[docs] def keys(self, offline=True): """ Arguments --------- offline : bool If True, uses small cached set of tested coverages. If False, finds all coverages from capabilities service. """ descdf = self.descriptions(refresh=not offline) keys = tuple(sorted(descdf['name'].unique())) return keys
[docs] def get_file( self, formatstr, key=None, bdate=None, edate=None, bbox=None, grid=False, corners=None, request='GetCoverage', compress=0, overwrite=None, verbose=0 ): """ Build url, outpath, and download the file. Returns outpath """ from .utils import get_file if overwrite is None: overwrite = self.overwrite url, outpath = self._build_url( formatstr, key=key, bdate=bdate, edate=edate, bbox=bbox, grid=grid, request=request, compress=compress, corners=corners ) if verbose > 0: print(url) get_file(url, outpath, verbose=verbose, overwrite=overwrite) return outpath
def _build_url( self, formatstr, key=None, bdate=None, edate=None, bbox=None, grid=False, corners=None, request='GetCoverage', compress=1 ): """ Arguments --------- formatstr : str 'xdr', 'ascii', 'netcdf-ioapi', 'netcdf-coards' request : str 'GetCoverage' or 'GetMetadata' all other keywords see __init__ """ if key is None: key = self.key if key is None: raise ValueError('key must be specified') if bdate is None: bdate = self.bdate else: bdate = pd.to_datetime(bdate) if edate is None: if self.edate is None: edate = ( bdate + pd.to_timedelta('+1day') + pd.to_timedelta('-1s') ) else: edate = self.edate else: edate = pd.to_datetime(edate) if bbox is None: bbox = self.bbox if edate < bdate: raise ValueError('edate cannot be before bdate') if bbox[2] < bbox[0]: raise ValueError('elon cannot be less than wlon') if bbox[3] < bbox[1]: raise ValueError('nlat cannot be less than slat') if corners is None: corners = self.corners grid_kw = self.grid_kw purpleair_kw = self.purpleair_kw tropomi_kw = self.tropomi_kw tempo_kw = self.tempo_kw viirsnoaa_kw = self.viirsnoaa_kw pandora_kw = self.pandora_kw calipso_kw = self.calipso_kw if compress is None: compress = self.compress wlon, slat, elon, nlat = bbox # If already gridded, do not use grid keywords nogridkw = any([key.startswith(pre) for pre in _noregrid_prefixes]) if (grid and not nogridkw) and request == 'GetCoverage': gridstr = self._build_grid(grid_kw) else: gridstr = '' if key.startswith('viirsnoaa'): viirsnoaastr = '&MINIMUM_QUALITY={minimum_quality}'.format( **viirsnoaa_kw ) else: viirsnoaastr = '' if key.startswith('pandora'): pandorastr = '&MINIMUM_QUALITY={minimum_quality}'.format( **pandora_kw ) else: pandorastr = '' if key.startswith('calipso'): calipsostr = ( '&MINIMUM_CAD={MINIMUM_CAD}' + '&MAXIMUM_UNCERTAINTY={MAXIMUM_UNCERTAINTY}' ).format(**calipso_kw) else: calipsostr = '' if key.startswith('tropomi'): tropomistr = ( '&MINIMUM_QUALITY={minimum_quality}' '&MAXIMUM_CLOUD_FRACTION={maximum_cloud_fraction}' ).format(**tropomi_kw) else: tropomistr = '' if key.startswith('tempo.l2'): if tempo_kw['api_key'] == 'your_key_here': raise ValueError('''You must set the tempo_kw api_key (e.g., api.tempo_kw["api_key"] = "...") before submitting a query.''') tempostr = ( '&MAXIMUM_CLOUD_FRACTION={maximum_cloud_fraction}' '&MINIMUM_QUALITY={minimum_quality}&KEY={api_key}' '&MAXIMUM_SOLAR_ZENITH_ANGLE={maximum_solar_zenith_angle}' ).format(**tempo_kw) else: tempostr = '' if key.startswith('purpleair'): if purpleair_kw['api_key'] == 'your_key_here': raise ValueError('''You must set the purpleair_kw api_key (e.g., api.purpleair_kw["api_key"] = "9...") before submitting a query.''') purpleairstr = ( '&OUT_IN_FLAG={out_in_flag}&MAXIMUM_DIFFERENCE=' '{maximum_difference}&MAXIMUM_RATIO={maximum_ratio}' '&AGGREGATE={freq}&MINIMUM_AGGREGATION_COUNT_PERCENTAGE=' '{agg_pct}&DEFAULT_HUMIDITY={default_humidity}&KEY={api_key}' ).format(**purpleair_kw) else: purpleairstr = '' if corners == 1: if any([key.startswith(pre) for pre in _corner_prefixes]): cornerstr = f'&CORNERS={corners}' else: cornerstr = '' else: cornerstr = '' if any([key.startswith(pre) for pre in _nolonlats_prefixes]): nolonlatsstr = '&NOLONLATS=1' else: nolonlatsstr = '' url = ( f'https://{self.server}/rsig/rsigserver?SERVICE=wcs&VERSION=1.0.0' f'&REQUEST={request}&FORMAT={formatstr}' f'&TIME={bdate:%Y-%m-%dT%H:%M:%SZ}/{edate:%Y-%m-%dT%H:%M:%SZ}' f'&BBOX={wlon},{slat},{elon},{nlat}' f'&COVERAGE={key}' f'&COMPRESS={compress}' ) + ( purpleairstr + viirsnoaastr + tropomistr + tempostr + pandorastr + calipsostr + gridstr + cornerstr + nolonlatsstr ) outpath = ( f'{self.workdir}/{key}_{bdate:%Y-%m-%dT%H%M%SZ}' f'_{edate:%Y-%m-%dT%H%M%SZ}' ) if formatstr.lower() == 'ascii': outpath += '.csv' elif formatstr.lower() == 'netcdf-ioapi': outpath += '.nc' elif formatstr.lower() == 'netcdf-coards': outpath += '.nc' elif formatstr.lower() == 'xdr': outpath += '.xdr' if request == 'GetMetadata': outpath += '.txt' elif compress: outpath += '.gz' return url, outpath def _build_grid(self, grid_kw): """ Build the regrid portion of the URL """ grid_kw.setdefault('earth_radius', 6370000) GDTYP = grid_kw.get('GDTYP', 2) if GDTYP == 1: projstr = '&LONLAT=1' elif GDTYP == 2: projstr = '&LAMBERT={P_ALP},{P_BET},{XCENT},{YCENT}' elif GDTYP == 6: projstr = '&STEREOGRAPHIC={XCENT},{YCENT},{P_BET}' elif GDTYP == 7: projstr = '&MERCATOR={P_GAM}' else: raise KeyError('GDTYP only implemented for ') gridstr = ( '&REGRID=weighted' + projstr + '&ELLIPSOID={earth_radius},{earth_radius}' + '&GRID={NCOLS},{NROWS},{XORIG},{YORIG},{XCELL},{YCELL}' ) if grid_kw.get('REGRID_AGGREGATE', 'None').strip() != 'None': gridstr += "&REGRID_AGGREGATE={REGRID_AGGREGATE}" return gridstr.format(**grid_kw)
[docs] def to_dataframe( self, key=None, bdate=None, edate=None, bbox=None, unit_keys=True, parse_dates=False, corners=None, withmeta=False, verbose=0, backend='ascii', grid=False ): """ All arguments default to those provided during initialization. Arguments --------- key : str Default key for query (e.g., 'aqs.o3', 'purpleair.pm25_corrected', or 'tropomi.offl.no2.nitrogendioxide_tropospheric_column') bdate : str or pd.Datetime beginning date (inclusive) defaults to yesterday at 0Z edate : str or pd.Datetime ending date (inclusive) defaults to bdate + 23:59:59 bbox : tuple wlon, slat, elon, nlat in decimal degrees (-180 to 180) unit_keys : bool If True, keep unit in column name. If False, move last parenthetical part of key to attrs of Series. parse_dates : bool If True, parse Timestamp(UTC) withmeta: bool If True, add 'GetMetadata' results as a "metadata" attribute of the dataframe. This is useful for understanding the underlying datasets used to create the result. verbose : int level of verbosity Returns ------- df : pandas.DataFrame Results from download """ from . import xdr from . import bin assert backend in {'ascii', 'xdr', 'bin'} if any([key.startswith(pfx) for pfx in _shpxdrprefixes]): backend = 'xdr' elif any([key.startswith(pfx) for pfx in _shpbinprefixes]): backend = 'bin' outpath = self.get_file( backend, key=key, bdate=bdate, edate=edate, bbox=bbox, grid=grid, verbose=verbose, corners=corners, compress=1 ) if backend == 'ascii': df = pd.read_csv(outpath, delimiter='\t', na_values=[-9999., -999]) elif backend == 'xdr': df = xdr.from_xdrfile(outpath, na_values=[-9999., -999]) elif backend == 'bin': df = bin.from_binfile(outpath) else: raise KeyError(f'format {backend} unknown; use xdr, bin or ascii') if withmeta: metapath = self.get_file( 'ascii', key=key, bdate=bdate, edate=edate, bbox=bbox, grid=grid, verbose=verbose, request='GetMetadata', compress=1 ) metatxt = open(metapath, 'r').read() df.attrs['metadata'] = metatxt if not unit_keys: columns = [k for k in df.columns] newcolumns = [] unit_dict = {} for k in columns: if '(' not in k: newk = k unit = 'unknown' else: idx = k.rfind('(') newk = k[:idx] unit = k[idx+1:-1] unit_dict[newk] = unit newcolumns.append(newk) df.columns = newcolumns for k in newcolumns: if hasattr(df[k], 'attrs'): df[k].attrs.update(dict(units=unit_dict.get(k, 'unknown'))) if parse_dates: if 'Timestamp(UTC)' in df: df['time'] = pd.to_datetime(df['Timestamp(UTC)']) if 'Timestamp' in df: df['time'] = pd.to_datetime(df['Timestamp']) return df
[docs] def to_ioapi( self, key=None, bdate=None, edate=None, bbox=None, withmeta=False, removegz=False, verbose=0 ): """ All arguments default to those provided during initialization. Arguments --------- key : str Default key for query (e.g., 'aqs.o3', 'purpleair.pm25_corrected', or 'tropomi.offl.no2.nitrogendioxide_tropospheric_column') bdate : str or pd.Datetime beginning date (inclusive) defaults to yesterday at 0Z edate : str or pd.Datetime ending date (inclusive) defaults to bdate + 23:59:59 bbox : tuple wlon, slat, elon, nlat in decimal degrees (-180 to 180) withmeta : bool If True, add 'GetMetadata' results at an attribute "metadata" to the netcdf file. This is useful for understanding the underlying datasets used to create the result. removegz : bool If True, then remove the downloaded gz file. Bad for caching. Returns ------- ds : xarray.Dataset Results from download """ import gzip import shutil import os # always use compression for network speed. outpath = self.get_file( 'netcdf-ioapi', key=key, bdate=bdate, edate=edate, bbox=bbox, grid=True, compress=1, verbose=verbose ) # Uncompress the netcdf file. If encoding is available, apply it if not self.overwrite and os.path.exists(outpath[:-3]): print('Using cached:', outpath[:-3]) else: with gzip.open(outpath, 'rb') as f_in: with open(outpath[:-3], 'wb') as f_out: shutil.copyfileobj(f_in, f_out) f_out.flush() if self.encoding is not None: import xarray as xr with xr.open_dataset(outpath[:-3]) as tmpf: tmpf.load() for key in tmpf.data_vars: tvar = tmpf[key] tvar.encoding.update(self.encoding) if key in ('TFLAG', 'COUNT'): tvar.encoding.pop('_FillValue', '') tmpf.to_netcdf(outpath[:-3], format='NETCDF4_CLASSIC') if withmeta: metapath = self.get_file( 'netcdf-ioapi', key=key, bdate=bdate, edate=edate, bbox=bbox, grid=True, compress=1, request='GetMetadata', verbose=verbose ) else: metapath = None f = open_ioapi(outpath[:-3], metapath=metapath) if removegz: os.remove(outpath) return f
[docs] def to_netcdf( self, key=None, bdate=None, edate=None, bbox=None, grid=False, withmeta=False, removegz=False, verbose=0 ): """ All arguments default to those provided during initialization. Arguments --------- key : str Default key for query (e.g., 'aqs.o3', 'purpleair.pm25_corrected', or 'tropomi.offl.no2.nitrogendioxide_tropospheric_column') bdate : str or pd.Datetime beginning date (inclusive) defaults to yesterday at 0Z edate : str or pd.Datetime ending date (inclusive) defaults to bdate + 23:59:59 bbox : tuple wlon, slat, elon, nlat in decimal degrees (-180 to 180) grid : bool Add column and row variables with grid assignments. withmeta : bool If True, add 'GetMetadata' results at an attribute "metadata" to the netcdf file. removegz : bool If True, then remove the downloaded gz file. Bad for caching. Returns ------- ds : xarray.Dataset Results from download """ import gzip import shutil import os import xarray as xr # always use compression for network speed. outpath = self.get_file( 'netcdf-coards', key=key, bdate=bdate, edate=edate, bbox=bbox, grid=grid, compress=1, verbose=verbose ) # Uncompress the netcdf file. If encoding is available, apply it if not self.overwrite and os.path.exists(outpath[:-3]): print('Using cached:', outpath[:-3]) else: with gzip.open(outpath, 'rb') as f_in: with open(outpath[:-3], 'wb') as f_out: shutil.copyfileobj(f_in, f_out) f_out.flush() f = xr.open_dataset(outpath[:-3]) if withmeta: metapath = self.get_file( 'netcdf-coards', key=key, bdate=bdate, edate=edate, bbox=bbox, grid=grid, compress=1, request='GetMetadata', verbose=verbose ) with open(metapath, 'r') as metaf: metatxt = metaf.read() f.attrs['metadata'] = metatxt if removegz: os.remove(outpath) return f
[docs]class RsigGui:
[docs] @classmethod def from_api(cls, api): gui = cls() gui._bbwe.value = api.bbox[::2] gui._bbsn.value = api.bbox[1::2] if api.bdate is not None: bdate = pd.to_datetime(api.bdate) gui._dates.value = bdate.floor('1D') gui._hours.value = ( bdate - bdate.floor('1D') ).total_seconds() // 3600 if api.edate is not None: edate = pd.to_datetime(api.edate) gui._datee.value = edate.floor('1D') gui._houre.value = ( edate - edate.floor('1D') ).total_seconds() // 3600 if api.key is not None: gui._prodd.value = api.key if api.grid_kw is not None: gui._gridd.value = api.grid_kw['GDNAM'] if api.workdir is not None: gui._workd.value = api.workdir return gui
def __init__(self): """ RsigGui Object designed for IPython with ipywidgets in Jupyter Example: gui = RsigGui() gui.form # As last line in cell, displays controls for user gui.plotopts() # Plots current options gui.check() # Check bounding box and date options make sense rsigapi = gui.get_api() # Convert gui to standard api # proceed with normal RsigApi usage """ from datetime import date from ipywidgets import Layout, Box, Dropdown, Label, FloatRangeSlider from ipywidgets import DatePicker, Textarea, BoundedIntText, Output api = RsigApi() descdf = api.descriptions().copy() descdf['begin'] = descdf['beginPosition'] descdf['end'] = descdf['endPosition'] descdf['bbox'] = descdf['bbox_str'] descdf['opt_txt'] = descdf.apply( lambda x: '{name}\t({begin}-{end})\t({bbox})'.format(**x), axis=1 ) descdf['sort'] = ~descdf.name.isin(api.keys()) prodopts = descdf.sort_values(by=['sort', 'name'], ascending=True)[ ['opt_txt', 'name'] ].values.tolist() l100 = Layout(width='95%') l50 = Layout(width='30em') self._prodd = prodd = Dropdown( options=prodopts, description='Product', layout=l100, value='tropomi.offl.no2.nitrogendioxide_tropospheric_column' ) self._gridd = gridd = Dropdown( options=list(_def_grid_kw), value='12US1', description='grid', layout=l50 ) self._dates = datesa = DatePicker( description='Start Date', disabled=False, layout=l50, value=( date.today() - pd.to_timedelta('7d') ) ) self._datee = dateea = DatePicker( description='End Date', disabled=False, value=datesa.value, layout=l50 ) self._hours = hours = BoundedIntText( min=0, max=23, value=0, description='Start HR', layout=l50 ) self._houre = houre = BoundedIntText( min=0, max=23, value=23, description='End HR', layout=l50 ) self._bbsn = FloatRangeSlider( min=-90, max=90, value=(24, 50), description='South-North', layout=l100 ) self._bbwe = FloatRangeSlider( min=-180, max=180, value=(-126, -66), description='West-East', layout=l100 ) self._workd = workd = Textarea( value='.', description='Work Dir', layout=l100 ) self._out = Output(layout=l100) form_items = [ Label(value='RSIG Options'), prodd, self._bbsn, self._bbwe, Box([datesa, hours]), Box([dateea, houre]), gridd, workd, self._out ] [ fi.observe(self._update_out, names='value') for fi in form_items + [datesa, hours, dateea, houre] ] self._form = Box(form_items, layout=Layout( display='flex', flex_flow='column', border='solid 2px', align_items='stretch', width='100%' )) def _update_out(self, *args): from IPython.display import clear_output, display fig = self.plotopts() with self._out: clear_output(wait=True) display(fig)
[docs] def date_range(self): import pandas as pd return pd.date_range(self.bdate, self.edate)
@property def form(self): return self._form @property def key(self): return self._prodd.value @property def bdate(self): import pandas as pd return ( pd.to_datetime(self._dates.value) + pd.to_timedelta(self._hours.value, unit='H') ) @property def edate(self): import pandas as pd hms = self._houre.value * 3600 + 3599 return pd.to_datetime( self._datee.value ) + pd.to_timedelta(hms, unit='s') @property def grid_kw(self): return self._gridd.value @property def bbox(self): w, e = self._bbwe.value s, n = self._bbsn.value return (w, s, e, n) @property def workdir(self): return self._workd.value
[docs] def plotopts(self): import pycno import matplotlib.pyplot as plt fig, ax = plt.subplots() bbw, bbs, bbe, bbn = self.bbox c = {True: 'g', False: 'r'}.get(self.check(), 'r') ax.plot( [bbw, bbe, bbe, bbw, bbw], [bbs, bbs, bbn, bbn, bbs], color=c ) if c == 'r': ax.text( .5, .5, 'Invalid Options', horizontalalignment='center', transform=ax.transAxes, color='r', fontsize=30, bbox={'edgecolor': c, 'facecolor': 'white'} ) fig.suptitle(f'Query Options: {self.key}, {self.grid_kw}') ax.set(title=f'{self.bdate:%FT%H:%M:%S} {self.edate:%FT%H:%M:%S}') pycno.cno().drawstates(ax=ax) return fig
[docs] def get_api(self): rsigapi = RsigApi( key=self.key, bdate=self.bdate, edate=self.edate, bbox=self.bbox, grid_kw=self.grid_kw, workdir=self.workdir ) return rsigapi
[docs] def check(self, action='return'): bbw, bbs, bbe, bbn = self.bbox iswe = bbw < bbe issn = bbs < bbn isbe = self.bdate <= self.edate if not iswe: _actionf('West is East of East', action) if not issn: _actionf('South is North of North', action) if not isbe: _actionf('bdate is later than edate', action) return iswe & issn & isbe
# Add easy access defaults _defapi = RsigApi() descriptions = _defapi.descriptions to_dataframe = _defapi.to_dataframe to_ioapi = _defapi.to_ioapi to_netcdf = _defapi.to_netcdf