Source code for bexchange.server.subscription
# Copyright (C) 2022- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange. If not, see <http://www.gnu.org/licenses/>.
###############################################################################
## The subscription class
## @file
## @author Anders Henja, SMHI
## @date 2022-10-25
from bexchange.matching import filters
from bexchange.matching.filters import filter_manager
from bexchange.matching import filters, metadata_matcher
from bexchange.statistics.statistics import statistics_manager
import logging
logger = logging.getLogger("bexchange.server.subscription")
[docs]
class subscription(object):
def __init__(self, storages, subscription_id=None, active=True, ifilter=None, allow_duplicates=False, allowed_ids=[]):
"""Constructor
:param storages: List of storage names
:param subscription_id: Id this subscription should be identified if tunneling
:param active: If this subscription is active or not
:param ifilter: A filter instance
:param allow_duplicates: If duplicates should be allowed or not
:param allowed_ids: A list of nodenames that should be allowed.
"""
self._subscription_id = subscription_id
self._storages = storages
self._active = active
self._filter = ifilter
self._allow_duplicates = allow_duplicates
self._allowed_ids = allowed_ids
self._statistics_plugins = []
[docs]
def storages(self):
"""
:return the storage names
"""
return self._storages
[docs]
def id(self):
"""
:return the subscription id (or None if there is none)
"""
return self._subscription_id
[docs]
def isactive(self):
"""
:return if this subscription is active or not
"""
return self._active
[docs]
def setactive(self, active):
""" Sets this subscription active or not
:param active: If subscription should be set to active or not
"""
self._active = active
[docs]
def filter(self):
"""
:return the filter
"""
return self._filter
[docs]
def allow_duplicates(self):
"""
:return if duplicates are handled by this subscription or not
"""
return self._allow_duplicates
[docs]
def allowed_ids(self):
"""
:return the nodenames
"""
return self._allowed_ids
[docs]
def filter_matching(self, meta):
"""Matches the meta against the filter.
:param meta: The metadata
:return True if filter is None or if metadata matches the filter.
"""
if self._filter:
matcher = metadata_matcher.metadata_matcher()
return matcher.match(meta, self._filter.to_xpr())
return True
[docs]
def set_statistics_plugins(self, plugins):
"""
"""
self._statistics_plugins = plugins
[docs]
def get_statistics_plugins(self):
"""
:returns the stat plugins belonging to this subscription
"""
return self._statistics_plugins
[docs]
class subscription_manager:
def __init__(self):
pass
[docs]
@classmethod
def create_subscription(self, storages, subscription_id, active, ifilter, allow_duplicates, allowed_ids):
"""Creates a subscription instance
:param storages: List of storage names
:param subscription_id: Subscription id
:param active: If subscription should be set to active or not
:param ifilter: A filter instances
:param allow_duplicate: If duplicates should be allowed or not
:param allowed_ids: A list of ids that should be allowed for this subscription
"""
return subscription(storages, subscription_id, active, ifilter, allow_duplicates, allowed_ids)
[docs]
@classmethod
def from_conf(self, config, backend):
filter_manager = filters.filter_manager()
storages=[]
subscription_id=None
active=True
ifilter = None
allow_duplicates=False
statplugins=[]
allowed_ids = []
if "storage" in config:
storages = config["storage"]
if not isinstance(storages, list):
storages = [storages]
if "id" in config:
subscription_id = config["id"]
if "active" in config:
active = config["active"]
if not active:
return None
if "statdef" in config:
statplugins = statistics_manager.plugins_from_conf(config["statdef"], backend.get_statistics_manager())
ifilter = filter_manager.from_value({"filter_type":"always_filter", "value":{}})
if "filter" in config:
ifilter = filter_manager.from_value(config["filter"])
if "allow_duplicates" in config:
allow_duplicates = config["allow_duplicates"]
if "allowed_ids" in config:
allowed_ids.extend(config["allowed_ids"])
if "cryptos" in config:
for crypto in config["cryptos"]:
nodename = backend.get_auth_manager().add_key_config(crypto)
if nodename not in allowed_ids:
allowed_ids.append(nodename)
s = self.create_subscription(storages, subscription_id, active, ifilter, allow_duplicates, allowed_ids)
s.set_statistics_plugins(statplugins)
return s