from __future__ import annotations
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any
import numpy as np
from xarray.backends.common import (
BACKEND_ENTRYPOINTS,
AbstractDataStore,
BackendArray,
BackendEntrypoint,
_normalize_path,
)
from xarray.backends.file_manager import CachingFileManager
from xarray.backends.locks import HDF5_LOCK, NETCDFC_LOCK, combine_locks
from xarray.backends.locks import ensure_lock
from xarray.backends.store import StoreBackendEntrypoint
from xarray.core import indexing
from xarray.core.utils import Frozen, FrozenDict, close_on_error
from xarray.core.variable import Variable
if TYPE_CHECKING:
import os
from io import BufferedIOBase
from xarray.core.dataset import Dataset
# psuedonetcdf can invoke netCDF libraries internally
PNETCDF_LOCK = combine_locks([HDF5_LOCK, NETCDFC_LOCK])
[docs]
class PncArrayWrapper(BackendArray):
def __init__(self, variable_name, datastore):
self.datastore = datastore
self.variable_name = variable_name
array = self.get_array()
self.shape = array.shape
self.dtype = np.dtype(array.dtype)
[docs]
def get_array(self, needs_lock=True):
ds = self.datastore._manager.acquire(needs_lock)
return ds.variables[self.variable_name]
def __getitem__(self, key):
return indexing.explicit_indexing_adapter(
key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR,
self._getitem
)
def _getitem(self, key):
with self.datastore.lock:
array = self.get_array(needs_lock=False)
return array[key]
[docs]
class PseudoNetCDFDataStore(AbstractDataStore):
"""Store for accessing datasets via PseudoNetCDF"""
[docs]
@classmethod
def open(cls, filename, lock=None, mode=None, **format_kwargs):
from PseudoNetCDF import pncopen
keywords = {"kwargs": format_kwargs}
# only include mode if explicitly passed
if mode is not None:
keywords["mode"] = mode
if lock is None:
lock = PNETCDF_LOCK
manager = CachingFileManager(pncopen, filename, lock=lock, **keywords)
return cls(manager, lock)
def __init__(self, manager, lock=None):
self._manager = manager
self.lock = ensure_lock(lock)
@property
def ds(self):
return self._manager.acquire()
[docs]
def open_store_variable(self, name, var):
data = indexing.LazilyIndexedArray(PncArrayWrapper(name, self))
attrs = {k: getattr(var, k) for k in var.ncattrs()}
return Variable(var.dimensions, data, attrs)
[docs]
def get_variables(self):
return FrozenDict(
(k, self.open_store_variable(k, v))
for k, v in self.ds.variables.items()
)
[docs]
def get_attrs(self):
return Frozen({k: getattr(self.ds, k) for k in self.ds.ncattrs()})
[docs]
def get_dimensions(self):
return Frozen(self.ds.dimensions)
[docs]
def get_encoding(self):
return {
"unlimited_dims": {
k for k in self.ds.dimensions
if self.ds.dimensions[k].isunlimited()
}
}
[docs]
def close(self):
self._manager.close()
[docs]
class PseudoNetCDFBackend(BackendEntrypoint):
"""
Backend for netCDF-like data formats in the air quality field
based on the PseudoNetCDF package.
It can open:
- CAMx
- RACM2 box-model outputs
- Kinetic Pre-Processor outputs
- ICARTT Data files (ffi1001)
- CMAQ Files
- GEOS-Chem Binary Punch/NetCDF files
- and many more
This backend is not selected by default for any files, so make
sure to specify ``engine="pseudonetcdf"`` in ``open_dataset``.
For more information about the underlying library, visit:
https://pseudonetcdf.readthedocs.io
See Also
--------
backends.PseudoNetCDFDataStore
"""
description = (
"Open many atmospheric science data formats using PseudoNetCDF in"
+ " Xarray"
)
# *args and **kwargs are not allowed in open_backend_dataset_ kwargs,
# unless the open_dataset_parameters are explicitly defined like this:
open_dataset_parameters = (
"filename_or_obj",
"mask_and_scale",
"decode_times",
"concat_characters",
"decode_coords",
"drop_variables",
"use_cftime",
"decode_timedelta",
"mode",
"lock",
)
[docs]
def open_dataset(
self,
filename_or_obj: (
str | os.PathLike[Any] | BufferedIOBase | AbstractDataStore
),
mask_and_scale=False,
decode_times=True,
concat_characters=True,
decode_coords=True,
drop_variables: str | Iterable[str] | None = None,
use_cftime=None,
decode_timedelta=None,
mode=None,
lock=None,
**format_kwargs,
) -> Dataset:
filename_or_obj = _normalize_path(filename_or_obj)
store = PseudoNetCDFDataStore.open(
filename_or_obj, lock=lock, mode=mode, **format_kwargs
)
store_entrypoint = StoreBackendEntrypoint()
with close_on_error(store):
ds = store_entrypoint.open_dataset(
store,
mask_and_scale=mask_and_scale,
decode_times=decode_times,
concat_characters=concat_characters,
decode_coords=decode_coords,
drop_variables=drop_variables,
use_cftime=use_cftime,
decode_timedelta=decode_timedelta,
)
return ds
BACKEND_ENTRYPOINTS["pseudonetcdf"] = ("PseudoNetCDF", PseudoNetCDFBackend)