Source code for polar2grid.filters.resample_coverage
#!/usr/bin/env python
# encoding: utf-8
# Copyright (C) 2021 Space Science and Engineering Center (SSEC),
# University of Wisconsin-Madison.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This file is part of the polar2grid software package. Polar2grid takes
# satellite observation data, remaps it, and writes it to a file format for
# input into another program.
# Documentation: http://www.ssec.wisc.edu/software/polar2grid/
"""Filter classes dealing with resampling output coverage."""
import logging
try:
# Python 3.9+
from functools import cache
except ImportError:
from functools import lru_cache as cache
from pyresample.spherical import SphPolygon
from xarray import DataArray
from ._base import BaseFilter
from ._utils import PRGeometry, polygon_for_area
logger = logging.getLogger(__name__)
[docs]
@cache
def _get_intersection_coverage(source_polygon: SphPolygon, target_polygon: SphPolygon) -> float:
"""Get fraction of output grid that will be filled with input data."""
intersect_polygon = source_polygon.intersection(target_polygon)
if intersect_polygon is None:
same_shape = source_polygon.vertices.shape == target_polygon.vertices.shape
if same_shape and (source_polygon.vertices == target_polygon.vertices).all():
# they are exactly the same
return 1.0
return 0.0
return intersect_polygon.area() / target_polygon.area()
[docs]
class ResampleCoverageFilter(BaseFilter):
"""Remove any DataArrays that would not have any results if resampled to the target area.
This filter defaults to checking all products. Pass ``product_filter_critera=None`` to
disable checking for all products. See :class:`BaseFilter` for other options.
"""
def __init__(
self,
product_filter_criteria: dict = True,
target_area: PRGeometry = None,
coverage_fraction: float = None,
):
"""Initialize thresholds and default search criteria."""
super().__init__(product_filter_criteria)
if target_area is None:
raise ValueError("'target_area' is required.")
if coverage_fraction is None:
raise ValueError("'coverage_fraction' is required.")
self._target_area = target_area
self._target_polygon = polygon_for_area(self._target_area)
self._coverage_fraction = coverage_fraction
[docs]
def _filter_data_array(self, data_arr: DataArray, _cache: dict):
"""Check if this DataArray should be removed.
Returns:
True if it meets the condition and does not have enough
day data and should therefore be removed. False otherwise
meaning it should be kept.
"""
if not self._matches_criteria(data_arr):
return False
source_area = data_arr.attrs.get("area")
if source_area is None:
return False
if source_area.ndim == 1:
logger.debug("Source data is 1 dimensional, will not filter based on resampling coverage.")
return False
coverage_fraction = self._get_and_cache_coverage_fraction(data_arr, source_area, _cache)
if coverage_fraction >= self._coverage_fraction:
logger.debug(f"Resampling found {coverage_fraction * 100:0.02f}% coverage.")
return False
logger.warning(
f"Resampling found {coverage_fraction * 100:0.02f}% of the output grid "
f"{self._target_area.area_id!r} covered. Will skip producing this product: {data_arr.attrs['name']}",
)
return True
[docs]
def _get_and_cache_coverage_fraction(self, source_data: DataArray, source_area: PRGeometry, cache: dict) -> float:
platform_name = source_data.attrs.get("platform_name")
sensor_name = source_data.attrs.get("sensor")
if sensor_name and not isinstance(sensor_name, str):
sensor_name = "-".join(sorted(sensor_name))
key = (platform_name, sensor_name)
if key in cache:
logger.debug(f"Reusing cache coverage fraction for {key}")
return cache[key]
logger.debug(f"Computing coverage fraction for {key}")
source_polygon = polygon_for_area(source_area)
coverage_fraction = _get_intersection_coverage(source_polygon, self._target_polygon)
if platform_name is None or sensor_name is None:
logger.debug("Platform and sensor metadata missing from data. Won't cache coverage information.")
return coverage_fraction
cache[key] = coverage_fraction
return coverage_fraction