# Copyright (C) 2021- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange. If not, see <http://www.gnu.org/licenses/>.
###############################################################################
## Rest client API
## @file
## @author Anders Henja, SMHI
## @date 2021-08-18
from abc import abstractmethod, ABCMeta
import abc
import json
import os
import socket
import urllib.parse as urlparse
import pkg_resources
import ssl
import base64
import uuid
from datetime import datetime
import hashlib
from http import client as httplibclient
from baltradcrypto.crypto import keyczarcrypto
from baltradcrypto import crypto
from bexchange.net.exceptions import DuplicateException
try:
import tink
from tink import cleartext_keyset_handle
from tink import signature
except:
pass
[docs]
class Request(object):
def __init__(self, method, path, data=None, headers={}):
self.method = method
self.path = path
self.data = data
self.headers = headers
[docs]
class RestfulServer(object):
"""Access database over the RESTful interface
"""
def __init__(self, server_url, auth):
self._server_url_str = server_url
self._server_url = urlparse.urlparse(server_url)
self._auth = auth
[docs]
def server_url(self):
return self._server_url_str
[docs]
def store(self, data):
"""stores the data in the exchange server.
:param data: The data
"""
request = Request(
"POST", "/file/", data.read(),
headers={
"content-type": "application/x-hdf5",
"message-id": str(uuid.uuid4()),
"date":datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
}
)
response = self.execute_request(request)
if response.status == httplibclient.OK:
return True
elif response.status == httplibclient.CONFLICT:
raise DuplicateException("Duplicate file, response status: %s"%response.status)
else:
raise RuntimeError(
"Unhandled response code: %s" % response.status
)
[docs]
def post_json_message(self, json_message):
"""posts a json message to the exchange server.
:param data: The data
"""
request = Request(
"POST", "/json_message/", json_message,
headers={
"content-type": "application/json",
"message-id": str(uuid.uuid4()),
"date":datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
}
)
response = self.execute_request(request)
if response.status == httplibclient.OK:
return True
else:
raise RuntimeError(
"Unhandled response code: %s" % response.status
)
[docs]
def get_statistics(self, spid, sources, totals=False, qmethod=None, qfilter=None, object_type=None, origins=None):
"""posts a json message to the exchange server.
:param data: The data
"""
json_message_d = {
"spid":spid,
"sources":sources,
"totals":totals,
"method":qmethod,
"filter":qfilter,
"object_type":object_type,
"origins":origins
}
request = Request(
"GET", "/statistics/", json.dumps(json_message_d),
headers={
"content-type": "application/json",
"message-id": str(uuid.uuid4()),
"date":datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
}
)
response = self.execute_request(request)
return response
[docs]
def list_statistics(self):
"""posts a json message to the exchange server.
:param data: The data
"""
request = Request(
"GET", "/statistics/ids",
headers={
"content-type": "application/json",
"message-id": str(uuid.uuid4()),
"date":datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
}
)
response = self.execute_request(request)
return response
[docs]
def get_server_info(self, subcommand, **kwargs):
"""posts a json message to the exchange server.
:param data: The data
"""
request = Request(
"GET", "/serverinfo/%s"%subcommand,
headers={
"content-type": "application/json",
"message-id": str(uuid.uuid4()),
"date":datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
}
)
response = self.execute_request(request)
return response
[docs]
def file_arrival(self, source, object_type, limit):
"""posts a json message to the exchange server.
:param data: The data
"""
json_message_d = {
"source":source,
"object_type":object_type,
"limit":limit
}
request = Request(
"GET", "/filearrival/",json.dumps(json_message_d),
headers={
"content-type": "application/json",
"message-id": str(uuid.uuid4()),
"date":datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
}
)
response = self.execute_request(request)
return response
[docs]
def supervise(self, infotype, source, origins, object_type, limit, entrylimit, delay, count):
"""posts a json message to the exchange server.
:param data: The data
"""
json_message_d = {
"info_type":infotype,
"source":source,
"origins":origins,
"object_type":object_type,
"limit":limit,
"entrylimit":entrylimit,
"delay":delay,
"count":count
}
request = Request(
"GET", "/supervise/",json.dumps(json_message_d),
headers={
"content-type": "application/json",
"message-id": str(uuid.uuid4()),
"date":datetime.utcnow().strftime("%Y%m%d%H%M%S%f")
}
)
response = self.execute_request(request)
return response
[docs]
def execute_request(self, req):
"""Exececutes the actual rest request over http or https. Will also add credentials to the request
:param req: The REST request
"""
conn = None
if self._server_url.scheme == "https":
conn = httplibclient.HTTPSConnection(
self._server_url.hostname,
self._server_url.port,
context = ssl._create_unverified_context()) # TO ignore problems related to certificate chains etc..
else:
conn = httplibclient.HTTPConnection(
self._server_url.hostname,
self._server_url.port)
self._auth.add_credentials(req)
try:
basepath = "/"
subpath = req.path
if self._server_url.path:
basepath = self._server_url.path
if subpath.startswith("/"):
subpath=subpath[1:]
path = os.path.join(basepath, subpath)
conn.request(req.method, path, req.data, req.headers)
except socket.error:
raise RuntimeError(
"Could not send request to %s" % self._server_url_str
)
return conn.getresponse()
[docs]
class Auth(object):
__meta__ = abc.ABCMeta
[docs]
@abc.abstractmethod
def add_credentials(self, req):
"""add authorization credentials to the request
:param req: a :class:`Request` to update
"""
raise NotImplementedError()
[docs]
class NoAuth(Auth):
"""no authentication
"""
[docs]
def add_credentials(self, req):
pass
[docs]
class CryptoAuth(Auth):
"""authenicate by signing messages with internal crypto-functionality
"""
def __init__(self, signer, nodename):
if isinstance(signer, crypto.private_key):
self._signer = signer
elif isinstance(signer, str):
self._signer = crypto.load_key(signer)
if not isinstance(self._signer, crypto.private_key):
raise Exception("Can't use key: %s for signing"%signer)
else:
raise Exception("Unknown signer format")
self._nodename = nodename
[docs]
def add_credentials(self, req):
signable = create_signable_string(req)
signature = self._signer.sign(signable)
auth = "exchange-crypto %s:%s" % (self._nodename, signature)
req.headers["authorization"] = auth
[docs]
class TinkAuth(Auth):
"""authenicate by signing messages with Tink
"""
def __init__(self, key_path, key_name=None):
signature.register()
with open(key_path, "rt") as kf:
handle = cleartext_keyset_handle.read(tink.JsonKeysetReader(kf.read()))
self._signer = handle.primitive(signature.PublicKeySign)
if key_name:
self._key_name = key_name
else:
self._key_name = os.path.basename(os.path.dirname(key_path))
[docs]
def add_credentials(self, req):
signable = create_signable_string(req)
signature = self._signer.sign(bytes(signable, "utf-8"))
signature = str(base64.b64encode(signature), "utf-8")
auth = "exchange-tink %s:%s" % (self._key_name, signature)
req.headers["authorization"] = auth
[docs]
def create_signable_string(req):
"""construct a signable string from a :class:`.Request`
See :ref:`doc-rest-crypto-authentication` for details.
"""
fragments = [req.method]
for key in ("content-md5", "content-type", "date", "message-id"):
if key in req.headers:
value = req.headers[key].strip()
if value:
fragments.append(value)
return "\n".join(fragments)