Source code for bexchange.auth.coreauth

# Copyright (C) 2021- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange.  If not, see <http://www.gnu.org/licenses/>.
###############################################################################

## Authentication handling within baltrad-exchange

## @file
## @author Anders Henja, SMHI
## @date 2022-05-02

import abc
import logging
import pkg_resources
import os

from bexchange import util
from bexchange import config
from baltradcrypto import crypto

logger = logging.getLogger("bexchange.auth")

[docs] class AuthError(Exception): """expected authentication errors """ pass
[docs] class auth_manager(object): def __init__(self): """Constructor """ self._providers = {}
[docs] def authenticate(self, req): """authenticate a :class:`~.util.Request` """ try: provider_key, credentials = self.get_credentials(req) except AuthError as e: logger.error("failed to parse authorization credentials: %s" % e) return False, None logger.debug("provider_key=%s, credentials=%s"%(provider_key, credentials)) try: provider = self._providers[provider_key] except LookupError: logger.error("auth provider not available: %s" % provider_key) return False, None logger.debug("authenticating with %s: %s", provider_key, credentials) try: return provider.authenticate(req, credentials), provider_key except AuthError as e: logger.warning("authentication failed: %s", e) except Exception as e: logger.exception("unhandled error while authenticating %s", e) return False, None
[docs] def get_credentials(self, req): """get authorization credentials from a :class:`~.util.Request` :raise: :class:`~.AuthError` if the authorization header is illegally formed (for the purposes of Baltrad-BDB) :return: a 2-tuple of provider and credential strings extracted from the header See :ref:`doc-rest-crypto-authentication` for details. Note that ("noauth", None) is returned if authorization header is missing. """ try: authstr = req.headers["authorization"] except LookupError: return ("noauth", None) try: provider, credentials = authstr.split(" ") except ValueError: if "Node-Name" in req.headers and req.base_url.endswith("post_file.htm"): provider = "exchange-keyczar" # Backward compatibility credentials = authstr else: raise AuthError("invalid Authorization header: %s" % authstr) provider = provider.strip() if not provider.startswith("exchange-"): raise AuthError( "invalid auth provider in Authorization header: %s" % provider ) else: provider = provider[9:] return provider, credentials.strip()
[docs] def get_nodename(self, req): """Returns the node name from the credentials / request :param req: The request :return the nodename if found, otherwise None """ provider, credentials = self.get_credentials(req) # Currently we know that node name always exist in the credentials so take it from there if credentials and credentials.find(":") > 0: return credentials.split(":")[0] return None
[docs] @classmethod def from_conf(cls, conf): """ :param app: the WSGI application receiving the request if authenication is successful. :param conf: a :class:`~.config.Properties` instance to configure from """ providers_key = "baltrad.exchange.auth.providers" try: providers = conf.get_list(providers_key, sep=",") except config.PropertyLookupError: logger.warning("No authorization provider(s) supplied, defaulting to 'noauth'") providers = ["noauth"] result = auth_manager() for provider in providers: try: result.add_provider_from_conf(provider, conf) except LookupError: raise config.Error("could not create provider: %s" % provider) return result
[docs] def add_provider_from_conf(self, name, conf): """load an :class:`Auth` implementation and add it as a provider :param name: the name of the :class:`Auth` implementation to load :param conf: a :class:`~.config.Properties` instance to configure from :raise: `LookupError` if an implementation with this name doesn't exist """ provider_cls = Auth.get_impl(name) provider = provider_cls.from_conf(conf) self._providers[name] = provider logger.info("Added provider: %s as %s"%(provider, name))
[docs] def add_key_config(self, conf): """Adds a key from key config :param conf: The key config :return: the node name this key should be associated with """ if conf["auth"] in self._providers: return self.get_provider(conf["auth"]).add_key_config(conf["conf"]) return None
[docs] def get_provider(self, name): """Returns the specified provider :param name: The name of the provider :return: the provider for specified name """ return self._providers[name]
[docs] def get_private_key(self, _type): """Returns the private key for the provided type :param _type: The encryption type :return: the private key """ return self._privatekeys[_type]
[docs] def get_providers(self): """ :return: all providers """ return self._providers
[docs] class Auth(object): """interface for authentication providers """ __metaclass__ = abc.ABCMeta
[docs] @abc.abstractmethod def authenticate(self, req, credentials): """authenticate the request with provided crendentials :param req: the request to authenticate :type req: :class:`~.util.Request` :param credentials: implementation specific credential string :return: True if success, otherwise False """ raise NotImplementedError()
[docs] @abc.abstractmethod def add_key_config(self, jsonstr): """Adds a key config to this provider :param jsonstr: THe key config :return the nodename this key should be associated with """ raise NotImplementedError()
[docs] @util.abstractclassmethod def from_conf(cls, conf): """construct an instance from configuration :param conf: a :class:`~.config.Properties` instance """ raise NotImplementedError()
[docs] @classmethod def get_impl(cls, name): """ :param name: The name of the entrypoint as defined in setup.py, e.g. crypto, keyczar... :return: the class represented by name. Used to create an instance of correct type """ return pkg_resources.load_entry_point( "bexchange", "bexchange.auth", name )
[docs] class NoAuth(Auth): """No authentication, allow everyone registered as *noauth* in *baltrad.bdbserver.web.auth* entry-point """
[docs] def authenticate(self, req, credentials): """Validates the content in the request against the credentials. :param req: the request to authenticate :type req: :class:`~.util.Request` :param credentials: implementation specific credential string :return: always True """ return True
[docs] @classmethod def from_conf(cls, conf): """Creates a NoAuth instance from configuration. :param conf: the configuration entry :return: a NoAuth instance. """ return NoAuth()
[docs] class CryptoAuth(Auth): """Provide authentication through the internal crypto registered as *exchange-crypto* in *baltrad.bdbserver.web.auth* entry-point """ def __init__(self, key_root): """ :param key_root: default path to search keys from """ if not os.path.isabs(key_root): raise ValueError("key_root must be an absolute path") self._key_root = key_root self._private_key = None self._verifiers = {}
[docs] def setPrivateKey(self, privkey, nodename=None): """Sets the private key (and associates it with a nodename) so that it is possible to setup the private key from the properties-file. :param privkey: Path to the private key :param nodename: The nodename that should be associated with the private key """ if privkey and not os.path.exists(privkey): raise Exception("If providing a private key it must point at a valid file") self._private_key = privkey if self._private_key and nodename: privkey = crypto.load_key(self._private_key) if nodename not in self._verifiers: logger.info("adding public key from private for %s", nodename) self._verifiers[nodename] = privkey.publickey()
[docs] def getPublicKey(self, nodename): """Returns the public key associated with the nodename :param nodename: Node name :return: the public key """ return self._verifiers[nodename]
[docs] def add_key_config(self, conf): """Adding key from a json-conf. Will ensure that creator is baltradcrypto.crypto. :param conf: The json configuration """ if "creator" in conf and conf["creator"] == "baltradcrypto.crypto": if conf["type"] == "public": if "pubkey" in conf: if not os.path.isabs(conf["pubkey"]): key = crypto.load_key("%s/%s"%(self._key_root, conf["pubkey"])) else: key = crypto.load_key(conf["pubkey"]) else: key = crypto.import_key(conf["key"]) if conf["nodename"] not in self._verifiers: logger.info("adding key config %s", conf["nodename"]) self._verifiers[conf["nodename"]] = key return conf["nodename"] else: raise AuthError("Exchange auth expects public keys") else: raise AuthError("Could not handle config")
[docs] def add_key(self, name, path): """ :param name: the name to associate the key with for lookups :param path: an absolute or relative path to the key. :raise: :class:`Exception` if the key can not be read creates a :class:`public key verifier` from the key located at *path* """ if not os.path.isabs(path): path = os.path.join(self._key_root, path) logger.info("adding key %s from %s", name, path) key = crypto.load_key(path) if not isinstance(key, crypto.public_key): raise AuthError("Exchange auth expects public keys") self._verifiers[name] = key
[docs] def authenticate(self, req, credentials): """Authenticates the request against the credentials. :param req: The http request :param credentials: The credentials that should be verified against :return: True if authenticated, False otherwise. """ logger.debug("CryptoAuth - authenticate: %s"%credentials) try: keyname, sig = credentials.rsplit(":") except ValueError: raise AuthError("invalid credentials: %s" % credentials) try: verifier = self._verifiers[keyname] except Exception: raise AuthError("no verifier for key: %s" % keyname) try: result = verifier.verify(self.create_signable_string(req), sig) return result except Exception as e: logger.exception("Failed to verify message") return False
[docs] def create_signable_string(self, req): """construct a signable string from a :class:`~.util.Request` See :ref:`doc-rest-crypto-authentication` for details. """ fragments = [req.method] for key in ("content-md5", "content-type", "date", "message-id"): if key in req.headers: value = req.headers[key].strip() if value: fragments.append(value) return "\n".join(fragments)
[docs] @classmethod def from_conf(cls, conf): """Create from configuration. :param conf: a :class:`~.config.Properties` instance :raise: :class:`LookupError` if a required configuration parameter is missing. All keys are accessed with prefix *baltrad.bdb.server.auth.keyczar.*. The value of `keystore_root` is passed to the constructor. All values under `keys` are passed to :meth:`add_key` where the configuration key is used as a name and the value is used as the path for the key lookup. """ cconf = conf.filter("baltrad.exchange.auth.crypto.") result = CryptoAuth(cconf.get("root")) keyconf = cconf.filter("keys.") for key in keyconf.get_keys(): result.add_key(key, keyconf.get(key)) result.setPrivateKey(cconf.get("private.key"), conf.get("baltrad.exchange.node.name")) return result