Source code for bexchange.server.sqlbackend

# Copyright (C) 2021- Swedish Meteorological and Hydrological Institute (SMHI)
#
# This file is part of baltrad-exchange.
#
# baltrad-exchange is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# baltrad-exchange is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with baltrad-exchange.  If not, see <http://www.gnu.org/licenses/>.
###############################################################################

## The SQL functionality that is used by the server. As default it uses a sqlite database
## for keeping track of sources and other miscellaneous information

## @file
## @author Anders Henja, SMHI
## @date 2021-08-18
from __future__ import absolute_import

import contextlib
import datetime
import logging
from bexchange.db import util as dbutil

from baltrad.bdbcommon import oh5

from sqlalchemy import engine, event, exc as sqlexc, sql

from sqlalchemy.types import (
    Integer,
    Text
)

from sqlalchemy import (
    Column,
    ForeignKey,
    MetaData,
    PrimaryKeyConstraint,
    Table,
)

dbmeta = MetaData()

sources = Table("sources", dbmeta,
                Column("id", Integer, primary_key=True),
                Column("name", Text, unique=True, nullable=False),
                Column("parent", Text)
)

source_kvs = Table("source_kvs", dbmeta,
                   Column("source_id", Integer,
                          ForeignKey(sources.c.id, ondelete="CASCADE"),
                          nullable=False),
                   Column("key", Text, nullable=False),
                   Column("value", Text, nullable=False),
                   PrimaryKeyConstraint("source_id", "key"),
)


db_meta = MetaData()


logger = logging.getLogger("bexchange.server")

[docs] def force_sqlite_foreign_keys(dbapi_con, con_record): try: import sqlite3 except ImportError: # built without sqlite support return if isinstance(dbapi_con, sqlite3.Connection): dbapi_con.execute("pragma foreign_keys=ON")
[docs] class SqlAlchemySourceManager(object): """The DB manager providing source handling :param uri: The uri to database where sources are stored :param poolsize: the size of the db connection pool. In the case of sqlite, this will not be used """ def __init__(self, uri="sqlite:///tmp/baltrad-exchange-source.db", poolsize=10): self._engine = dbutil.create_engine_from_url(uri, poolsize) if self._engine.driver == "pysqlite": event.listen(self._engine, "connect", force_sqlite_foreign_keys) self.init_tables() @property def driver(self): """ :return: database driver name """ return self._engine.driver
[docs] def init_tables(self): """Initializes the database tables """ dbmeta.create_all(self._engine)
[docs] def add_sources(self, srclist): """Adds the sources to the source database :param srclist: a list of bdbcommon.oh5.Sources """ with self.get_connection() as conn: count = conn.execute("select count(*) from sources").fetchone()["count(*)"] if count > 0: return True conn.execute("delete from source_kvs") conn.execute("delete from sources") for source in srclist: logger.info("SourceManager. adding: %s"%str(source)) try: source_id = conn.execute( sources.insert(), name=source.name, parent=source.parent ).inserted_primary_key[0] except sqlexc.IntegrityError: raise RuntimeError("duplicate of source.name") self.insert_source_values(conn, source_id, source)
[docs] def get_source(self, meta, add_parent_object=False): """ :param meta: The metadata containing source :param add_parent_object: This is adding a parent to the source. This will modify the bdb Source object by adding the member parent_object. :return: A complete source from the metadata source identifier """ with self.get_connection() as conn: if meta.what_source == None: raise LookupError("no source in metadata") source_id = get_source_id(conn, meta.source()) if not source_id: raise LookupError("failed to look up source for " + meta.source().to_string()) with self.get_connection() as conn: source = get_source_by_id(conn, source_id) source.parent_object = None if add_parent_object: source.parent_object = get_source_by_id(conn, get_parent_source_id(conn, source.parent)) # We must add file information to the metadata msources = meta.source() for k in msources.keys(): if not source.has_key(k): source[k] = msources[k] return source
[docs] def get_parent_source(self, parent): """ :param parent: The id of the parent. :return: The parent source matching the string in parent. """ with self.get_connection() as conn: return get_source_by_id(conn, get_parent_source_id(conn, parent))
## # Insert Key-values from a source #
[docs] def insert_source_values(self, conn, source_id, source): """Inserts kvs values :param conn: connection :param source_id: the unique source id :param source: the source """ kvs = [] for k, v in source.items(): kvs.append({ "source_id": source_id, "key": k, "value": v, }) conn.execute( source_kvs.insert(), kvs )
[docs] def get_connection(self): """get a context managed connection to the database """ return contextlib.closing(self._engine.connect())
## # Copy-pasted from baltrad-db server functionality
[docs] def get_source_id(conn, source): where = sql.literal(False) keys = source.keys() ignoreORG=False if "ORG" in keys: if "WMO" in keys or "NOD" in keys or "RAD" in keys or "PLC" in keys or "WIGOS" in keys: ignoreORG=True for key, value in source.items(): if ignoreORG and key == "ORG": continue #print("ADDING %s=%s"%(key,value)) where = sql.or_( where, sql.and_( source_kvs.c.key==key, source_kvs.c.value==value ) ) qry = sql.select( [source_kvs.c.source_id, source_kvs.c.key, source_kvs.c.value], where, distinct=True ) result = conn.execute(qry) source_id_matches = {} max_no_of_matches = 0 best_match_id = None multiple_matches = False for row in result: source_id = row[source_kvs.c.source_id] if not source_id in source_id_matches: source_id_matches[source_id] = 0 row_key = row[source_kvs.c.key] row_value = row[source_kvs.c.value] for key, value in source.items(): if ignoreORG and key == "ORG": continue elif key == row_key and value == row_value: source_id_matches[source_id] += 1 if source_id_matches[source_id] > max_no_of_matches: max_no_of_matches = source_id_matches[source_id] best_match_id = source_id multiple_matches = False elif source_id_matches[source_id] == max_no_of_matches: multiple_matches = True if multiple_matches: logger.debug("Could not determine source due to multiple equally matching sources found for %s." % (str(source))) best_match_id = None return best_match_id
[docs] def get_source_by_id(conn, source_id): name_qry = sql.select( [sources.c.name, sources.c.parent], sources.c.id==source_id ) kv_qry = sql.select( [source_kvs], source_kvs.c.source_id==source_id ) source = oh5.Source() sourceresult = conn.execute(name_qry).first() if sourceresult is None: raise LookupError(f"Could not identify any source with id={source_id}") source.name = sourceresult["name"] source.parent = sourceresult["parent"] for row in conn.execute(kv_qry).fetchall(): source[row["key"]] = row["value"] return source
[docs] def get_parent_source_id(conn, parent): # Parents should always have their parent = None otherwise we probably are trying to identify a standard source and not a parent source_id_qry = sql.select([sources.c.id]).filter(sources.c.parent==None).filter(sources.c.name==parent) result = conn.execute(source_id_qry).scalar() if result is None: raise LookupError(f"Could not identify any parent source with id {parent}") return result