# Copyright (C) 2022- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange. If not, see <http://www.gnu.org/licenses/>.
###############################################################################
## statistics-manager and some plugins
## @file
## @author Anders Henja, SMHI
## @date 2022-11-29
from abc import abstractmethod
import logging
import datetime
import json, re
from bexchange.db.sqldatabase import statistics, statentry
from bexchange import util
RE_DTFILTER_PATTERN=re.compile(r"^\s*(datetime|entrytime|optime|delay)\s*([<>!=]+)\s*([0-9:\-T\.]+)\s*$")
logger = logging.getLogger("bexchange.statistics.statistics")
[docs]
class simple_stat_plugin(object):
""" Simple statistics plugin that will call the stat mgrs increment function
"""
def __init__(self, spid, sptype, statmgr):
""" Constructor
:param spid: The statistics plugin id
:param sptype: The type. Either add == only add entries to statentry, count == only update count or both == do both
:param statmgr: The statistics manager keeping track of the dbinstance
"""
self._spid = spid
self._sptype = sptype
self._statmgr = statmgr
self._save_post = False
self._increment_counter = True
if sptype == "add":
self._save_post = True
self._increment_counter = False
elif sptype == "both":
self._save_post = True
self._increment_counter = True
[docs]
def increment(self, origin, meta):
""" Calls the statistics managers increment method
:param origin: The origin
:param meta: The metadata
"""
self._statmgr.increment(self._spid, origin, meta, self._save_post, self._increment_counter)
[docs]
class statistics_manager:
""" The statistics manager. Will create and register the statistics plugin
"""
def __init__(self, db):
"""Constructor
"""
self._sqldatabase = db
[docs]
def sqldatabase(self):
"""
:return the sqlalchemy database instance used
"""
return self._sqldatabase
[docs]
def list_statistic_ids(self, nid):
"""
:returns the list of available ids
"""
result = []
ids = self._sqldatabase.list_statistic_ids()
for i in ids:
result.append({"spid":i, "totals":True})
ids = self._sqldatabase.list_statentry_ids()
for i in ids:
result.append({"spid":i, "totals":False})
return json.dumps(result)
[docs]
def parse_filter(self, parsestr):
result = []
tokens = parsestr.split("&&")
for dtstr in tokens:
m = RE_DTFILTER_PATTERN.match(dtstr)
if m:
dt = None
name=m.group(1)
if name in ["datetime", "entrytime"]:
try:
tstr = m.group(3)
if len(tstr) == 12:
dt = datetime.datetime.strptime(tstr, '%Y%m%d%H%M')
else:
dt = datetime.datetime.strptime(tstr, '%Y%m%d%H%M%S')
except ValueError:
dt = datetime.datetime.strptime(m.group(3), '%Y%m%d%H%M%S')
criteria = [m.group(1), m.group(2), dt]
result.append(criteria)
elif name in ["optime", "delay"]:
criteria = [m.group(1), m.group(2), int(m.group(3))]
result.append(criteria)
else:
raise Exception("Format of dtfilter must be name[OP]value where name is one of datetime, entrytime or optime")
return result
[docs]
def get_statistics_entries(self, nid, querydata):
"""Returns the statistic entries for the specified post, origin and source_name"""
spid = None
origin = None
totals = False
hashid = None
sources = []
filters = None
object_type = None
origins = []
logger.debug("querydata: %s"%querydata)
if "spid" in querydata:
spid = querydata["spid"]
if "origins" in querydata:
sorigins = querydata["origins"]
if sorigins:
origins = sorigins.split(",")
if "sources" in querydata:
ssources = querydata["sources"]
if ssources:
sources = ssources.split(",")
if "hashid" in querydata:
hashid = querydata["hashid"]
if "totals" in querydata:
totals = querydata["totals"]
if "filter" in querydata and querydata["filter"]:
opfilter = querydata["filter"].strip()
filters = self.parse_filter(opfilter)
if "object_type" in querydata:
object_type = querydata["object_type"]
qmethod=None
if "method" in querydata:
qmethod = querydata["method"]
if totals:
entries = self._sqldatabase.find_statistics(spid, origins, sources)
else:
if qmethod is None or qmethod != "average":
entries = self._sqldatabase.find_statentries(spid, origins, sources, hashid=hashid, filters=filters, object_type=object_type)
else:
entries = self._sqldatabase.get_average_statentries(spid, origins, sources)
return entries
[docs]
def get_statistics(self, nid, querydata):
"""Returns the statistics for the specified post, origin and source_name"""
entries = self.get_statistics_entries(nid, querydata)
jslist = [e.json_repr() for e in entries]
return json.dumps(jslist)
[docs]
def cleanup_statentry(self, age):
now = datetime.datetime.now()
maxage = now - datetime.timedelta(hours=age)
self._sqldatabase.cleanup_statentries(maxage)
[docs]
def increment(self, spid, origin, meta, save_post=False, increment_counter=True, optime=0, optime_info=None):
""" Increments a counter that is defined by:
:param spid: The id used to identify this statistics post
:param orgin: the origin (may be null, indicating unknown origin)
:param meta: the metadata
"""
try:
source = meta.bdb_source_name
if increment_counter:
self._sqldatabase.increment_statistics(spid, origin, source)
except:
logger.exception("An error occured when incrementing statistics for spid:%s, origin:%s, ID:%s"%(spid, origin, util.create_fileid_from_meta(meta)))
if save_post:
file_datetime = datetime.datetime(meta.what_date.year, meta.what_date.month, meta.what_date.day, meta.what_time.hour, meta.what_time.minute, meta.what_time.second, 0, tzinfo=datetime.timezone.utc)
file_object = meta.what_object
file_elangle = None
if file_object == "SCAN":
mn = meta.find_node("/dataset1/where/elangle")
if not mn:
mn = meta.find_node("/where/elangle")
if mn:
file_elangle = mn.value
entrytime = datetime.datetime.now(datetime.timezone.utc)
delay = (entrytime - file_datetime).seconds
try:
self._sqldatabase.add(statentry(spid, origin, source, meta.bdb_metadata_hash, datetime.datetime.now(datetime.timezone.utc), optime, optime_info, delay, file_datetime, file_object, file_elangle))
except:
logger.exception("An error occured when adding statentry for spid:%s, origin:%s, ID:%s"%(spid, origin, util.create_fileid_from_meta(meta)))
[docs]
@classmethod
def plugin_from_conf(self, conf, statmgr):
"""Creates a stat plugin from the configuration
:param conf: The config as a dictionary
:param statmgr: The statistics manager to use
:returns a stat plugin
"""
spid = None
sptype = "count"
if "id" in conf:
spid = conf["id"]
else:
raise Exception("Invalid configuration statistics: %s"%conf)
if "type" in conf:
sptype = conf["type"]
return simple_stat_plugin(spid, sptype, statmgr)
[docs]
@classmethod
def plugins_from_conf(self, conf, statmgr):
"""Creates a stat plugin from the configuration
:param conf: The config as a dictionary
:param statmgr: The statistics manager to use
:returns a stat plugin
"""
result = []
if not isinstance(conf, list):
conf = list(conf)
for c in conf:
result.append(self.plugin_from_conf(c, statmgr))
return result