Source code for expectmine.storage.stores.sqlite3_store

import atexit
import os
import pickle
import sqlite3
from pathlib import Path
from typing import Any, Dict, Optional, Type

from expectmine.storage.base_storage import BaseStore, T
from expectmine.storage.utils import validate_key, validate_storage_init, validate_value


[docs] class Sqlite3Store(BaseStore):
[docs] def __init__( self, step_name: str, persistent_path: Path, working_directory: Path, **kwargs: Dict[Any, Any], ): validate_storage_init(step_name, persistent_path, working_directory) self.step_name = step_name self.persistent_path = persistent_path self.working_directory = working_directory self.kwargs = kwargs os.makedirs(persistent_path, exist_ok=True) self.conn = sqlite3.connect(persistent_path / "sqlite.db", isolation_level=None) self._setup() atexit.register(self._cleanup)
[docs] def put(self, key: str, value: object | Path): validate_key(key) validate_value(value) cur = self.conn.cursor() match value: case bool(): value_type = "boolean" case str(): value_type = "string" case int(): value_type = "int" case float(): value_type = "float" case Path(): value_type = "file" case _: value_type = "blob" cur.execute( """ INSERT OR REPLACE INTO kv_table (stepid, key, type, boolean_value, string_value, int_value, float_value, blob_value) SELECT step.id, ?, ?, ?, ?, ?, ?, ? FROM steps_table step WHERE step.name = ?; """, ( key, value_type, value if value_type == "boolean" else None, value if value_type == "string" else (value.suffix if isinstance(value, Path) else None), value if value_type == "int" else None, value if value_type == "float" else None, pickle.dumps(value) if value_type == "blob" else (value.read_bytes() if isinstance(value, Path) else None), self.step_name, ), )
[docs] def get(self, key: str, returning: Type[T]) -> Optional[T]: validate_key(key) cur = self.conn.cursor() res = cur.execute( """ SELECT type, int_value, float_value, string_value, boolean_value, blob_value FROM step_table JOIN kv_table ON step_table.id = kv_table.stepid WHERE step_table.name = ? AND kv_table.key = ?; """, (self.step_name, key), ).fetchone() if res is None: return None match res[0]: case "int": return_object = int(res[1]) case "float": return_object = float(res[2]) case "string": return_object = str(res[3]) case "boolean": return_object = bool(res[4]) case "file": return_object = ( self.working_directory / f"{self.step_name}-{key}{res[3]}" ) with open(return_object, "wb") as f: f.write(res[5]) case "blob": return_object = pickle.loads(bytes(res[5])) case _: raise ValueError("Value and return type do not match.") if not isinstance(return_object, returning | None): print(type(return_object), return_object) print(type(returning), returning) raise ValueError("Value and return type do not match.") return return_object
[docs] def delete(self, key: str): validate_key(key) cur = self.conn.cursor() cur.execute( """ DELETE FROM kv_table WHERE key = ? AND stepid IN (SELECT id FROM step_table WHERE name = ?); """, (key, self.step_name), )
[docs] def list(self) -> list[str]: cur = self.conn.cursor() res = cur.execute( """ SELECT key FROM kv_table WHERE stepid IN (SELECT id FROM step_table WHERE name = ?); """, (self.step_name,), ) return [k[0] for k in res]
[docs] def exists(self, key: str) -> bool: cur = self.conn.cursor() res = cur.execute( """ SELECT 1 FROM kv_table WHERE key = ? AND stepid IN (SELECT id from step_table WHERE name = ?) """, (key, self.step_name), ).fetchone() return res is not None
def _setup(self): """ Sets up the database, creates the necessary tables (if they don't exist) and checks that the necessary namespace (given by the step) exists. """ cur = self.conn.cursor() cur.executescript( """ BEGIN; PRAGMA foreign_keys = ON; CREATE TABLE IF NOT EXISTS step_table ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE ); CREATE TABLE IF NOT EXISTS kv_table ( stepid INTEGER, key TEXT NOT NULL, type TEXT NOT NULL CHECK (type IN ('int', 'float', 'string', 'boolean', 'blob', 'file')), int_value INTEGER, float_value REAL, string_value TEXT, boolean_value BOOLEAN, blob_value BLOB, PRIMARY KEY (stepid, key), FOREIGN KEY (stepid) REFERENCES step_table(id) ); COMMIT; """ ) cur.execute( """ INSERT OR IGNORE INTO step_table(name) VALUES (?); """, (self.step_name,), ) def _cleanup(self): """ On exit, closes the sqlite3 connection. """ self.conn.close()