Source code for bexchange.auth.tinkauth

# Copyright (C) 2021- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange.  If not, see <http://www.gnu.org/licenses/>.
###############################################################################

## Authentication handling within baltrad-exchange

## @file
## @author Anders Henja, SMHI
## @date 2022-05-02

from bexchange.auth import coreauth
import os
import base64
import json

import logging

logger = logging.getLogger("bexchange.auth")

try:
    from tink import signature
    from tink import core
    from tink import cleartext_keyset_handle
    import tink
except:
    logger.warn("Tink not found. Can't enable support")

[docs] class TinkAuth(coreauth.Auth): """Provide authentication through Tink registered as *tink* in *baltrad.bdbserver.web.auth* entry-point """ def __init__(self, tink_root): """ :param keystore_root: default path to search keys from """ if not os.path.isabs(tink_root): raise ValueError("tink_root must be an absolute path") self._tink_root = tink_root self._private_key = None self._verifiers = {}
[docs] def add_key_config(self, conf): """Adds a key from key config :param conf: The key config :return: the node name this key should be associated with """ handle = cleartext_keyset_handle.read(tink.JsonKeysetReader(json.dumps(conf["key"]))) verifier = handle.primitive(signature.PublicKeyVerify) self._verifiers[conf["name"]] = verifier return conf["name"]
[docs] def add_key(self, name, path): """ :param name: the name to associate the key with for lookups :param path: an absolute or relative path to the key. :raise: :class:`tink.TinkError` if the key can not be read creates a :class:`public key verifier` from the key located at *path* """ if not os.path.isabs(path): path = os.path.join(self._tink_root, path) logger.info("adding key %s from %s", name, path) with open(path, "rt") as kf: handle = cleartext_keyset_handle.read(tink.JsonKeysetReader(kf.read())) verifier = handle.primitive(signature.PublicKeyVerify) self._verifiers[name] = verifier
[docs] def authenticate(self, req, credentials): """Authenticates the request against the credentials. :param req: The http request :param credentials: The credentials that should be verified against :return: True if authenticated, False otherwise. """ try: keyname, sig = credentials.rsplit(":") except ValueError: raise coreauth.AuthError("invalid credentials: %s" % credentials) try: verifier = self._verifiers[keyname] except tink.TinkError: raise coreauth.AuthError("no verifier for key: %s" % keyname) signable = self.create_signable_string(req) signable = bytes(signable, "utf-8") try: result = verifier.verify(base64.b64decode(sig), signable) return True except tink.TinkError as e: logger.exception("unhandled TinkError %s", e.__str__()) return False
[docs] def create_signable_string(self, req): """construct a signable string from a :class:`~.util.Request` """ fragments = [req.method, req.path] for key in ("content-md5", "message-id", "content-type", "date"): if key in req.headers: value = req.headers[key].strip() if value: fragments.append(value) return "\n".join(fragments)
[docs] @classmethod def from_conf(cls, conf): """Create from configuration. :param conf: a :class:`~.config.Properties` instance :raise: :class:`LookupError` if a required configuration parameter is missing. All keys are accessed with prefix *baltrad.bdb.server.auth.keyczar.*. The value of `keystore_root` is passed to the constructor. All values under `keys` are passed to :meth:`add_key` where the configuration key is used as a name and the value is used as the path for the key lookup. """ signature.register() conf = conf.filter("baltrad.exchange.auth.tink.") result = TinkAuth(conf.get("root")) keyconf = conf.filter("keys.") for key in keyconf.get_keys(): result.add_key(key, keyconf.get(key)) result._private_key = conf.get("private.key") return result