Source code for bexchange.server.sqlbackend

# Copyright (C) 2021- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange.  If not, see <http://www.gnu.org/licenses/>.
###############################################################################

## The SQL functionality that is used by the server. As default it uses a sqlite database
## for keeping track of sources and other miscellaneous information

## @file
## @author Anders Henja, SMHI
## @date 2021-08-18
from __future__ import absolute_import

import contextlib
import datetime
import logging
from bexchange.db import util as dbutil

from baltrad.bdbcommon import oh5

from sqlalchemy import engine, event, exc as sqlexc, sql, literal, or_, and_, insert, update, select

from sqlalchemy.types import (
    Integer,
    Text
)

from sqlalchemy import (
    Column,
    ForeignKey,
    MetaData,
    PrimaryKeyConstraint,
    Table,
)

dbmeta = MetaData()

sources = Table("sources", dbmeta,
                Column("id", Integer, primary_key=True),
                Column("name", Text, unique=True, nullable=False),
                Column("parent", Text)
)

source_kvs = Table("source_kvs", dbmeta,
                   Column("source_id", Integer,
                          ForeignKey(sources.c.id, ondelete="CASCADE"),
                          nullable=False),
                   Column("key", Text, nullable=False),
                   Column("value", Text, nullable=False),
                   PrimaryKeyConstraint("source_id", "key"),
)


db_meta = MetaData()


logger = logging.getLogger("bexchange.server")

[docs] def force_sqlite_foreign_keys(dbapi_con, con_record): try: import sqlite3 except ImportError: # built without sqlite support return if isinstance(dbapi_con, sqlite3.Connection): dbapi_con.execute("pragma foreign_keys=ON")
[docs] class SqlAlchemySourceManager(object): """The DB manager providing source handling :param uri: The uri to database where sources are stored :param poolsize: the size of the db connection pool. In the case of sqlite, this will not be used """ def __init__(self, uri="sqlite:///tmp/baltrad-exchange-source.db", poolsize=10): self._engine = dbutil.create_engine_from_url(uri, poolsize) if self._engine.driver == "pysqlite": event.listen(self._engine, "connect", force_sqlite_foreign_keys) self.init_tables() @property def driver(self): """ :return: database driver name """ return self._engine.driver
[docs] def init_tables(self): """Initializes the database tables """ dbmeta.create_all(self._engine)
[docs] def add_sources(self, srclist): """Adds the sources to the source database :param srclist: a list of bdbcommon.oh5.Sources """ with self.get_connection() as conn: count = conn.execute(sql.select(sql.func.count()).select_from(sources)).scalar() if count > 0: return True conn.execute(source_kvs.delete()) conn.execute(sources.delete()) for source in srclist: logger.info("SourceManager. adding: %s"%str(source)) try: source_id = conn.execute( sources.insert(), {"name":source.name, "parent":source.parent} ).inserted_primary_key[0] except sqlexc.IntegrityError: raise RuntimeError("duplicate of source.name") self.insert_source_values(conn, source_id, source) conn.commit()
[docs] def get_source(self, meta, add_parent_object=False): """ :param meta: The metadata containing source :param add_parent_object: This is adding a parent to the source. This will modify the bdb Source object by adding the member parent_object. :return: A complete source from the metadata source identifier """ with self.get_connection() as conn: if meta.what_source == None: raise LookupError("no source in metadata") source_id = get_source_id(conn, meta.source()) if not source_id: raise LookupError("failed to look up source for " + meta.source().to_string()) with self.get_connection() as conn: source = get_source_by_id(conn, source_id) source.parent_object = None if add_parent_object: source.parent_object = get_source_by_id(conn, get_parent_source_id(conn, source.parent)) # We must add file information to the metadata msources = meta.source() for k in msources.keys(): if not source.has_key(k): source[k] = msources[k] return source
[docs] def get_parent_source(self, parent): """ :param parent: The id of the parent. :return: The parent source matching the string in parent. """ with self.get_connection() as conn: return get_source_by_id(conn, get_parent_source_id(conn, parent))
## # Insert Key-values from a source #
[docs] def insert_source_values(self, conn, source_id, source): """Inserts kvs values :param conn: connection :param source_id: the unique source id :param source: the source """ kvs = [] for k, v in source.items(): kvs.append({ "source_id": source_id, "key": k, "value": v, }) conn.execute( source_kvs.insert(), kvs )
[docs] def get_connection(self): """get a context managed connection to the database """ return contextlib.closing(self._engine.connect())
## # Copy-pasted from baltrad-db server functionality
[docs] def get_source_id(conn, source): keys = source.keys() ignoreORG = False if "ORG" in keys: if any(k in keys for k in ["WMO", "NOD", "RAD", "PLC", "WIGOS"]): ignoreORG = True where_clause = literal(False) for key, value in source.items(): if ignoreORG and key == "ORG": continue where_clause = or_( where_clause, and_( source_kvs.c.key == key, source_kvs.c.value == value ) ) qry = ( select( source_kvs.c.source_id, source_kvs.c.key, source_kvs.c.value ) .where(where_clause) .distinct() ) result = conn.execute(qry) source_id_matches = {} max_no_of_matches = 0 best_match_id = None multiple_matches = False for row in result: source_id = row.source_id if source_id not in source_id_matches: source_id_matches[source_id] = 0 row_key = row.key row_value = row.value for key, value in source.items(): if ignoreORG and key == "ORG": continue elif key == row_key and value == row_value: source_id_matches[source_id] += 1 current_matches = source_id_matches[source_id] if current_matches > max_no_of_matches: max_no_of_matches = current_matches best_match_id = source_id multiple_matches = False elif current_matches == max_no_of_matches: multiple_matches = True if multiple_matches: logger.debug(f"Could not determine source due to multiple equally matching sources found for {source}.") best_match_id = None return best_match_id
[docs] def get_source_by_id(conn, source_id): name_qry = select(sources.c.name, sources.c.parent).where(sources.c.id == source_id) kv_qry = select(source_kvs).where(source_kvs.c.source_id == source_id) source = oh5.Source() sourceresult = conn.execute(name_qry).first() if sourceresult is None: raise LookupError(f"Could not identify any source with id={source_id}") source.name = sourceresult.name source.parent = sourceresult.parent result = conn.execute(kv_qry) for row in result: source[row.key] = row.value return source
[docs] def get_parent_source_id(conn, parent): # Parents should always have their parent = None otherwise we probably are trying to identify a standard source and not a parent source_id_qry = sql.select(sources.c.id).where(sources.c.parent==None).where(sources.c.name==parent) result = conn.execute(source_id_qry).scalar() if result is None: raise LookupError(f"Could not identify any parent source with id {parent}") return result