Source code for bexchange.matching.metadata_matcher

# Copyright (C) 2021- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange.  If not, see <http://www.gnu.org/licenses/>.
###############################################################################

## Functionality to match metadata against a filter

## @file
## @author Anders Henja, SMHI
## @date 2021-08-18
import operator
import threading
import json
import re, datetime

from baltrad.bdbcommon import expr

from baltrad.bdbcommon.oh5 import (
    Source,
)

##
# Used for matching metadata against an expression
#
[docs] class metadata_matcher: """ Used for matching metadata against an expression """ def __init__(self): """Constructor """ self.init_evaluator()
[docs] def init_evaluator(self): """Sets up all operations available for the filter """ evaluator = expr.Evaluator() evaluator.add_procedure("+", operator.add) evaluator.add_procedure("-", operator.sub) evaluator.add_procedure("*", operator.mul) evaluator.add_procedure("/", operator.floordiv) evaluator.add_procedure("=", operator.eq) evaluator.add_procedure("!=", operator.ne) evaluator.add_procedure("<", operator.lt) evaluator.add_procedure(">", operator.gt) evaluator.add_procedure(">=", operator.ge) evaluator.add_procedure("<=", operator.le) evaluator.add_procedure("and", operator.and_) evaluator.add_procedure("or", operator.or_) evaluator.add_procedure("not", operator.not_) evaluator.add_procedure("like", self.like) evaluator.add_procedure("in", self.in_) evaluator.add_procedure("date", lambda *args: datetime.date(*args)) evaluator.add_procedure("time", lambda *args: datetime.time(*args)) evaluator.add_procedure( "datetime", lambda *args: datetime.datetime(*args) ) evaluator.add_procedure( "interval", lambda *args: datetime.timedelta(*args) ) self.evaluator = evaluator self.lock = threading.Lock()
[docs] def find_value(self, name, ttype): """Finds a value within the metadata with specified name :param name: The name that is requested :param ttype: The type we are looking for :return: the value if found """ if name.startswith("what/source:"): return self.find_source(name, self.meta.source()) elif name.startswith("_bdb/source:"): return self.find_source(name, Source.from_string(self.meta.bdb_source)) elif name.startswith("_bdb/source_name"): return[self.meta.bdb_source_name] elif name.startswith("_exchange/what_age"): whatdt = datetime.datetime(self.meta.what_date.year, self.meta.what_date.month, self.meta.what_date.day, self.meta.what_time.hour, self.meta.what_time.minute, self.meta.what_time.second, tzinfo=datetime.timezone.utc) nowdt = datetime.datetime.now(datetime.timezone.utc) seconds = (nowdt - whatdt).seconds return seconds return self.find_plain(name, ttype)
[docs] def find_source(self, name, source): """Finds a source identifier within the source. :param name: The source identifier :param source: The data source :return the found value """ key = name[name.rfind(":") + 1:] if key in source: return [source[key]] return []
[docs] def find_plain(self, name, ttype): """Finds any name within the metadata. :param name: The name of the attribute :param ttype: Not used :return: The value """ result = [] split_path = name.split("/") for node in self.meta.iternodes(): if self.match_path(split_path, node.path()): result.append(expr.literal(node.value_str())) return result
[docs] def match_path(self, split_path, nodepath): """Matches the paths """ node_split_path = nodepath.split("/") from_index = len(node_split_path) - len(split_path) if from_index < 0: return False return split_path == node_split_path[from_index:]
[docs] def in_(self, lhs, rhs): """Matches if items in lhs exists in the rhs. :param lhs: Left hand side which is a list of values :param rhs: Right hand side which is matched against :return: True or False """ return any(item in rhs for item in lhs)
[docs] def like(self, lhs, rhs): """Matches against a \*-pattern. :param lhs: Left hand side which is a list of value :param rhs: Right hand side which is pattern :return: True or False """ if isinstance(rhs, list) and len(rhs) > 0: rhs = rhs[0] pattern = rhs.replace("*", ".*") p = re.compile(pattern) for i in lhs: if p.match(i): return True return False
#Synchronize!!!
[docs] def match(self, metadata, xpr): """Matches the metadata against the expression. Synchronized. :param metadata: The metadata to be matched against :param xpr: The expression matched against :return: True or False """ with self.lock: self.evaluator.add_procedure("attr", self.find_value) self.meta = metadata try: result = self.evaluator.evaluate(xpr); return result; finally: self.evaluator.add_procedure("attr", None) self.meta = None