Source code for bexchange.processor.processors
# Copyright (C) 2021- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange. If not, see <http://www.gnu.org/licenses/>.
###############################################################################
## Various types of processors.
## @file
## @author Anders Henja, SMHI
## @date 2022-10-05
import json
import logging
import os
import shutil
import importlib
from bexchange.naming import namer
logger = logging.getLogger("bexchange.processor")
[docs]
class processor:
"""Base class to be used by all processor implementations
"""
def __init__(self, backend, name, active, extra_arguments=None):
"""Constructor
:param backend: The backend that this processor should have access to
:param name: Name that identifies this processor
:param active: If this processor should be active or not
:param extra_arguments: The extra arguments used when creating the processor
"""
self._backend = backend
self._name = name
self._active = active
[docs]
def backend(self):
"""
:return: the backend instance
"""
return self._backend
[docs]
def name(self):
"""
:return: the name of this processor
"""
return self._name
[docs]
def active(self):
"""
:return: if this processor is active or not
"""
return self._active
[docs]
def process(self, path, metadata):
""" The process function. Must be non-blocking. Otherwise this will lock up threads.
This means that the path/metadata should be put on a queue or in some other way passed
on to the actual processing-part without locking up the resources.
"""
raise NotImplementedError()
[docs]
def start(self):
"""Starts this processor. Typically by starting a thread or consumer pool
"""
pass
[docs]
def stop(self):
"""Stops this processor. Typically by joining a number of threads
"""
pass
[docs]
class example_processor(processor):
def __init__(self, backend, name, active, extra_arguments):
super(example_processor, self).__init__(backend, name, active)
self._name = name
[docs]
def process(self, path, meta):
logger.debug("Running processor %s of type %s"%(self.name(), type(self)))
[docs]
class processor_manager:
""" The processor manager. Will ensure that files are passed on to the processors.
"""
def __init__(self):
"""Constructor
"""
self.processors={}
[docs]
def add_processor(self, processor):
"""Adds a processor to the manager
:param processor: The processor that should be added
"""
self.processors[processor.name()] = processor
[docs]
def remove_processor(self, name):
"""Removes a processor from the manager
:param name: The name of the processor that should be removed
"""
if name in self.processors:
try:
self.processors[name].stop()
except:
logger.exception("Failed to stop processor: %s"%name)
try:
del self.processors[name]
except:
logger.exception("Failed to remove processor: %s"%name)
[docs]
def process(self, file, meta):
""" Passes on the file to all registered processors. It is up to the processor to determine how
the provided file should be handled.
"""
for key, processor in self.processors.items():
try:
processor.process(file, meta)
except:
logger.exception("Processor: %s could not process file %s"%(key, file))
[docs]
@classmethod
def create_processor(self, name, clz, backend, active, extra_arguments):
"""Creates an instance of clz with specified arguments
:param clz: class name specified as <module>.<classname>
:param arguments: a list of arguments that should be used to initialize the class
"""
if clz.find(".") > 0:
logger.info("Creating processor '%s'"%clz)
lastdot = clz.rfind(".")
module = importlib.import_module(clz[:lastdot])
classname = clz[lastdot+1:]
return getattr(module, classname)(backend, name, active, extra_arguments)
else:
raise Exception("Must specify class as module.class")
[docs]
@classmethod
def from_conf(self, config, backend):
"""Creates a processor instance from provided json config
:param config: The json config as a dictionary
:param backend: The backend this processor should have access to
:return: the processor instance
"""
name = "unknown"
active = False
extra_arguments = {}
name = config["name"]
processor_clazz = config["class"]
if "extra_arguments" in config:
extra_arguments = config["extra_arguments"]
if "active" in config:
active = config["active"]
if active:
p = self.create_processor(name, processor_clazz, backend, active, extra_arguments)
p.start()
return p
else:
logger.info("Processor with name %s is not active"%name)
return None