Source code for bexchange.storage.storages

# Copyright (C) 2021- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange.  If not, see <http://www.gnu.org/licenses/>.
###############################################################################

## Various types of storages.

## @file
## @author Anders Henja, SMHI
## @date 2021-08-18
import json
import logging
import os
import shutil
import stat
import uuid
import threading
from abc import abstractmethod
import importlib
from pathlib import Path

from bexchange.naming import namer
logger = logging.getLogger("bexchange.server.backend")

[docs] class storage(object): """Base class for all storages """ def __init__(self): """Constructor """ super(storage, self).__init__()
[docs] @abstractmethod def name(self): """ :return the name identifying this storage """ raise NotImplementedError()
[docs] @abstractmethod def store(self, path, meta): """ Passes on the file to the storage :param path: The full path to the file to be stored :param meta: The meta data for this file. """ raise NotImplementedError()
[docs] class none_storage(storage): """Simple storage that does nothing """ def __init__(self, name, backend, **kwargs): """Constructor :param name: The name identifying this storage """ super(none_storage, self).__init__() self._name = name
[docs] def store(self, path, meta): """ Does nothing but prints that a file would have been stored :param path: The full path to the file to be stored :param meta: The meta data for this file. """ logger.debug("[none_storage - %s]: %s - %s - %s"%(self.name(), meta.what_source, meta.what_date, meta.what_time))
[docs] def name(self): """ :return the name identifying this storage """ return self._name
[docs] class file_store: def __init__(self, path, name_pattern, naming_operations=[], simulate=False, keep_same_name=False): self.path = path self.name_pattern = name_pattern self.namer = namer.metadata_namer(self.name_pattern) for no in naming_operations: self.namer.register_operation(no.tag(), no) self._simulate = simulate self._keep_same_name = keep_same_name
[docs] def store(self, path, meta): oname = "%s/%s"%(self.path, self.namer.name(meta)) if self._simulate: logger.info("[SIMULATE]: Stored file: %s"%oname) return dname = os.path.dirname(oname) if not os.path.exists(dname): os.makedirs(dname, exist_ok=True) if self._keep_same_name and os.path.exists(oname): oname = oname + "_" + str(uuid.uuid4())[:8] shutil.copy(path, oname) os.chmod(oname, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH)
[docs] class file_storage(storage): """A basic file storage that allows separation of files based on object types. A typical structure passed to kwargs would be "structure": [ { "object":"SCAN", "path":"/tmp/baltrad_bdb", "name_pattern":"${_baltrad/datetime_l:15:%Y/%m/%d/%H/%M}/${_bdb/source:NOD}_${/what/object}.tolower()_${/what/date}T${/what/time}Z_${/dataset1/where/elangle}.h5" }, { "path":"/tmp/baltrad_bdb", "name_pattern":"${_baltrad/datetime_l:15:%Y/%m/%d/%H/%M}/${_bdb/source:NOD}_${/what/object}.tolower()_${/what/date}T${/what/time}Z.h5" }] """ def __init__(self, name, backend, **kwargs): """Constructor :param name: The name of this storage :param backend: The backend (NOT USED) :param kwargs: the configuration required for the file storage. """ super(file_storage, self).__init__() self._name = name self._backend = backend self._simulate = False self.structures = {} if not "structure" in kwargs: raise Exception("Missing key 'structure' in configuration") self.structure_d = kwargs["structure"] if "simulate" in kwargs and isinstance(kwargs["simulate"], bool): self._simulate = kwargs["simulate"] naming_operations = [] if "naming_operations" in kwargs: for op in kwargs["naming_operations"]: naming_operations.append(namer.metadata_namer_manager.from_conf(op, backend)) for s in self.structure_d: if ("object" in s and not s["object"]) or "object" not in s: self.structures["default"]=file_store(s["path"],s["name_pattern"], naming_operations, self._simulate) else: self.structures[s["object"]]=file_store(s["path"],s["name_pattern"], naming_operations, self._simulate)
[docs] def get_attribute_value(self, name, meta): """ :param name: Name of attribute :param meta: Metadata from where value for name should be taken :return: the value for the name or None if not found """ try: return meta.node(name).value except LookupError: return None
[docs] def store(self, path, meta): """ Stores a file in the correct directory structure :param path: The path to the file to be stored :param meta: The meta data for the file """ q = self.get_attribute_value("/what/object", meta) logger.debug("Using storage %s"%str(self.structures)) if q in self.structures: self.structures[q].store(path, meta) elif "default" in self.structures: self.structures["default"].store(path, meta) else: logger.info("Ignoring %s object of type: " % q)
[docs] def name(self): """ :return the name of this storage """ return self._name
[docs] class simple_rotating_file_storage(storage): """A very simple rotating file storage that keeps a limited number of files in a single folder. The files will get two different names. Either: ${_baltrad/source_name}_scan_${/dataset1/where/elangle}_${/what/date}T${/what/time}.h5 - for scans ${_baltrad/source_name}_${/what/object}.tolower()_${/what/date}T${/what/time}.h5 - for any other file type NOTE! The number of files can be set to a maximum of 500 """ def __init__(self, name, backend, **kwargs): """Constructor :param name: The name of this storage :param backend: The backend (NOT USED) :param kwargs: the configuration required for the file storage. """ super(simple_rotating_file_storage, self).__init__() self._name = name self._backend = backend self._number_of_files = 100 self._folder = kwargs["folder"] self._scanstore = file_store(self._folder, "${_baltrad/source_name}_scan_${/dataset1/where/elangle}_${/what/date}T${/what/time}.h5", [], False, True) self._otherstore = file_store(self._folder, "${_baltrad/source_name}_${/what/object}.tolower()_${/what/date}T${/what/time}.h5", [], False, True) self.lock = threading.Lock() if "number_of_files" in kwargs and isinstance(kwargs["number_of_files"], int): if kwargs["number_of_files"] <= 500: self._number_of_files = kwargs["number_of_files"]
[docs] def get_attribute_value(self, name, meta): """ :param name: Name of attribute :param meta: Metadata from where value for name should be taken :return: the value for the name or None if not found """ try: return meta.node(name).value except LookupError: return None
[docs] def store(self, path, meta): """ Stores a file in the correct directory structure :param path: The path to the file to be stored :param meta: The meta data for the file """ q = self.get_attribute_value("/what/object", meta) if q == "SCAN": self._scanstore.store(path, meta) else: self._otherstore.store(path, meta) self.trim_folder(self._folder)
[docs] def name(self): """ :return the name of this storage """ return self._name
[docs] def trim_folder(self, path): with self.lock: paths = sorted(Path(path).iterdir(), key=os.path.getmtime) while len(paths) >= self._number_of_files and len(paths) > 0: fpath = paths.pop(0) os.unlink(fpath)
[docs] class storage_manager: """ The storage manager """ def __init__(self): """Constructor """ self.storage={}
[docs] def add_storage(self, storage): """ Adds a storage instance to the internal list :param storage: The created storage that is a subclass of bexchange.storage.storages.storage """ self.storage[storage.name()] = storage
[docs] def get_storage(self, name): """ :param name: Name of the storage :return: the storage with provided name """ return self.storage[name]
[docs] def has_storage(self, name): """ :param name: Name of the storage :return: if there is a storage with specified name """ return name in self.storage
[docs] def remove_storage(self, name): """ :param name: Name of the storage """ try: del self.storage[name] except: logger.exception("Failed to remove storage: %s"%name)
[docs] def store(self, name, path, meta): """Stores a file in the specified storage. :param name: Name in which the file should be stored :param path: The file name that should be stored :param meta: Metadata about the file that should be stored """ self.storage[name].store(path, meta)
[docs] @classmethod def create_storage(self, clz, name, backend, extra_arguments): """Creates an instance of clz with specified arguments :param clz: class name specified as <module>.<classname> :param arguments: a list of arguments that should be used to initialize the class """ if clz.find(".") > 0: logger.info("Creating storage '%s' with name='%s'"%(clz, name)) lastdot = clz.rfind(".") module = importlib.import_module(clz[:lastdot]) classname = clz[lastdot+1:] return getattr(module, classname)(name, backend, **extra_arguments) else: raise Exception("Must specify class as module.class")
[docs] @classmethod def from_conf(self, config, backend): """Creates a storage from the specified configuration if it is possible :param config: A runner config pattern. Should at least contain the following { "class":"<packagename>.<classname>", "name":<name of storage>, "arguments":{}" } """ arguments = {} storage_clazz = config["class"] name = config["name"] if "arguments" in config: arguments = config["arguments"] p = self.create_storage(storage_clazz, name, backend, arguments) return p