#!/usr/bin/env python3
# encoding: utf-8
# Copyright (C) 2014 Space Science and Engineering Center (SSEC),
# University of Wisconsin-Madison.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# This file is part of the polar2grid software package. Polar2grid takes
# satellite observation data, remaps it, and writes it to a file format for
# input into another program.
# Documentation: http://www.ssec.wisc.edu/software/polar2grid/
#
# Written by David Hoese September 2014
# University of Wisconsin-Madison
# Space Science and Engineering Center
# 1225 West Dayton Street
# Madison, WI 53706
# david.hoese@ssec.wisc.edu
"""Helper functions and classes used by multiple polar2grid scripts.
:author: David Hoese (davidh)
:contact: david.hoese@ssec.wisc.edu
:organization: Space Science and Engineering Center (SSEC)
:copyright: Copyright (c) 2013 University of Wisconsin SSEC. All rights reserved.
:date: Sept 2014
:license: GNU GPLv3
"""
__docformat__ = "restructuredtext en"
import argparse
import logging
import os
import sys
from collections import defaultdict
LOG = logging.getLogger(__name__)
# class SatPyWarningFilter(logging.Formatter):
# def format(self, record):
# if record.name.startswith('satpy') and record.levelno == logging.WARNING:
# record.levelno = logging.DEBUG
# record.levelname = 'DEBUG'
# return super(SatPyWarningFilter, self).format(record)
[docs]
class SatPyWarningFilter(logging.Filter):
[docs]
def filter(self, record):
is_satpy = record.name.startswith("satpy")
is_msg = "The following datasets were not created and may require resampling" in record.msg
return 0 if is_satpy and is_msg else 1
[docs]
class ThirdPartyFilter(logging.Filter):
def __init__(self, ignored_packages, level=logging.WARNING, name=""):
self.ignored_packages = ignored_packages
self.level_filter = level
super(ThirdPartyFilter, self).__init__(name)
[docs]
def filter(self, record):
for pkg in self.ignored_packages:
if record.name.startswith(pkg) and record.levelno < self.level_filter:
return 0
return 1
[docs]
def setup_logging(console_level=logging.INFO, log_filename="polar2grid.log", log_numpy=True):
"""Set up the logger to the console to the logging level defined in the command line (default INFO).
Sets up a file logging for everything,
regardless of command line level specified. Adds extra logger for
tracebacks to go to the log file if the exception is caught. See
`exc_handler` for more information.
:param console_level: Python logging level integer (ex. logging.INFO).
:param log_filename: Log messages to console and specified log_filename (None for no file log)
:param log_numpy: Tell numpy to log invalid values encountered
"""
# set the root logger to DEBUG so that handlers can have all possible messages to filter
root_logger = logging.getLogger("")
root_logger.setLevel(min(console_level, logging.DEBUG))
# Console output is minimal
console = logging.StreamHandler(sys.stderr)
console_format = "%(levelname)-8s : %(message)s"
console.setFormatter(logging.Formatter(console_format))
console.setLevel(console_level)
console.addFilter(SatPyWarningFilter())
if console_level > logging.DEBUG:
# if we are only showing INFO/WARNING/ERROR messages for P2G then
# filter out messages from these packages
console.addFilter(
ThirdPartyFilter(["satpy", "pyresample", "pyspectral", "trollimage", "pyorbital", "trollsift"])
)
root_logger.addHandler(console)
# Log file messages have a lot more information
if log_filename:
file_handler = logging.FileHandler(log_filename)
file_format = "[%(asctime)s] : %(levelname)-8s : %(name)s : %(funcName)s : %(message)s"
file_handler.setFormatter(logging.Formatter(file_format))
file_handler.setLevel(logging.DEBUG)
root_logger.addHandler(file_handler)
# Make a traceback logger specifically for adding tracebacks to log file
traceback_log = logging.getLogger("traceback")
traceback_log.propagate = False
traceback_log.setLevel(logging.ERROR)
traceback_log.addHandler(file_handler)
if log_numpy:
import numpy
class TempLog(object):
def write(self, msg):
logging.getLogger("numpy").debug(msg)
numpy.seterr(invalid="log")
numpy.seterrcall(TempLog())
[docs]
def rename_log_file(new_filename):
"""Rename the file handler for the root logger and the traceback logger."""
# the traceback logger only has 1 handler so let's get that
traceback_log = logging.getLogger("traceback")
if not traceback_log.handlers:
LOG.error("Tried to change the log filename, but no log file was configured")
raise RuntimeError("Tried to change the log filename, but no log file was configured")
h = traceback_log.handlers[0]
fn = h.baseFilename
h.stream.close()
root_logger = logging.getLogger("")
root_logger.removeHandler(h)
traceback_log.removeHandler(h)
# move the old file
if os.path.isfile(new_filename):
with open(new_filename, "a") as new_file:
with open(fn, "r") as old_file:
new_file.write(old_file.read())
os.remove(fn)
else:
os.rename(fn, new_filename)
# create the new handler
file_handler = logging.FileHandler(new_filename)
file_format = "[%(asctime)s] : PID %(process)6d : %(levelname)-8s : %(name)s : %(funcName)s : %(message)s"
file_handler.setFormatter(logging.Formatter(file_format))
file_handler.setLevel(logging.DEBUG)
root_logger.addHandler(file_handler)
traceback_log.addHandler(file_handler)
LOG.debug("Log renamed from '%s' to '%s'", fn, new_filename)
[docs]
def create_exc_handler(glue_name):
def exc_handler(exc_type, exc_value, traceback):
"""Handle logging/printing an exception if it occurs.
This will save us from print tracebacks or unrecognizable errors to the user's console.
Note, however, that this doesn't effect code in a separate process as the
exception never gets raised in the parent.
"""
logging.getLogger(glue_name).error(
"Unexpected error. Enable debug messages (-vvv) or see log file for details."
)
logging.getLogger(glue_name).debug("Unexpected error exception: ", exc_info=(exc_type, exc_value, traceback))
tb_log = logging.getLogger("traceback")
if tb_log.handlers:
tb_log.error(exc_value, exc_info=(exc_type, exc_value, traceback))
return exc_handler
[docs]
class NumpyDtypeList(list):
"""Magic list to allow dtype objects to match string versions of themselves."""
def __contains__(self, item):
if super(NumpyDtypeList, self).__contains__(item):
return True
try:
return super(NumpyDtypeList, self).__contains__(item().dtype.name)
except AttributeError:
return False
[docs]
class ExtendAction(argparse.Action):
"""Similar to the 'append' action, but expects multiple elements instead of just one."""
def __call__(self, parser, namespace, values, option_string=None):
current_values = getattr(namespace, self.dest, []) or []
current_values.extend(values)
setattr(namespace, self.dest, current_values)
[docs]
class ExtendConstAction(argparse.Action):
"""Similar to the 'append' action, but expects multiple elements instead of just one."""
def __init__(self, option_strings, dest, nargs=0, **kwargs):
if nargs:
raise ValueError("nargs is not allowed")
super(ExtendConstAction, self).__init__(option_strings, dest, nargs=0, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
current_values = getattr(namespace, self.dest, []) or []
current_values.extend(self.const)
setattr(namespace, self.dest, current_values)
[docs]
class ArgumentParser(argparse.ArgumentParser):
[docs]
def _get_group_actions(self, group):
"""Get all the options/actions in a group including those from subgroups of the provided group.
.. note::
This does not group the subgroup options as their own dictionaries.
"""
these_actions = [action for action in group._group_actions]
# get actions if this group has even more subgroups
for subgroup in group._action_groups:
these_actions += self._get_group_actions(subgroup)
return these_actions
[docs]
def parse_args(self, *args, **kwargs):
"""Parse arguments to support custom polar2grid 'subgroup' behavior.
:param subgroup_titles: Groups and their arguments that will be put
in a separate dictionary in the 'subgroup_args' attribute
:param global_keywords: Keywords/arguments that should be added to all subgroup dictionaries
"""
subgroup_titles = kwargs.pop("subgroup_titles", [])
global_keywords = kwargs.pop("global_keywords", [])
args = super(ArgumentParser, self).parse_args(*args, **kwargs)
args.global_kwargs = {kw: getattr(args, kw) for kw in global_keywords}
# a dictionary that holds arguments from the specified subgroups (defaultdict for easier user by caller)
args.subgroup_args = defaultdict(dict)
for subgroup_title in subgroup_titles:
try:
subgroup = [x for x in self._action_groups if x.title == subgroup_title][0]
except IndexError:
# we don't have any loggers configured at this point
print("WARNING: Couldn't find argument group '%s' in configured parser" % (subgroup_title,))
continue
subgroup_args = {}
for action in self._get_group_actions(subgroup):
if hasattr(args, action.dest):
subgroup_args[action.dest] = getattr(args, action.dest)
delattr(args, action.dest)
# Add 'global' arguments
for kw in global_keywords:
if hasattr(args, kw):
subgroup_args[kw] = getattr(args, kw)
args.subgroup_args[subgroup_title] = subgroup_args
return args
[docs]
def create_basic_parser(*args, **kwargs):
parser = ArgumentParser(*args, **kwargs)
parser.add_argument(
"-v",
"--verbose",
dest="verbosity",
action="count",
default=0,
help="each occurrence increases verbosity 1 level through ERROR-WARNING-INFO-DEBUG-TRACE (default INFO)",
)
parser.add_argument("-l", "--log", dest="log_fn", default=None, help="specify the log filename")
parser.add_argument(
"--debug",
dest="keep_intermediate",
default=False,
action="store_true",
help="Keep intermediate files for future use.",
)
parser.add_argument(
"--overwrite",
dest="overwrite_existing",
action="store_true",
help="Overwrite intermediate or output files if they exist already",
)
parser.add_argument(
"--exit-on-error",
dest="exit_on_error",
action="store_true",
help="exit on first error including non-fatal errors",
)
return parser