Source code for qualipy.backends.sql_backend.dataset

from qualipy.backends.base import BaseData
from qualipy.backends.pandas_backend.dataset import PandasData

import sqlalchemy as sa
import pandas as pd

try:
    import pyspark
except ImportError:
    pass

import uuid
import logging


logger = logging.getLogger(__name__)


[docs]class SQLData(BaseData): """This is used when tracking a relational table"""
[docs] def __init__( self, engine: sa.engine.base.Engine = None, table_name: str = None, schema: str = None, conn_string: str = None, custom_select_sql: str = None, create_temp: bool = False, backend="sql", ): """ Args: engine: A sqlalchemy engine to the database containing the table we want to track table_name: The name of the table we want to track schema: The schema the table is in conn_string: If engine is None, you can just pass the sqlalchemy database connection custom_select_sql: Must be proper SQL for whatever DB you are using. This will instantiate a temporary table that Qualipy will run against. This is useful if you dont need the entire table, or need to run any joins before running Qualipy. However, often it might be better to just create a view of what you need. """ if engine is not None: self.engine = engine else: self.engine = sa.create_engine(conn_string) self.dialect = self.engine.dialect.name.lower() self.custom_select_sql = custom_select_sql if custom_select_sql is not None and create_temp: if table_name is None: table_name = str(uuid.uuid4())[:8] if self.dialect == "mssql": table_name = "#" + table_name else: table_name = f"tmp_{table_name}" self._create_temp_table(table_name, custom_select_sql, schema) if self.dialect != "oracle": schema = None self.table_name = table_name self.schema = schema self._table = sa.Table(table_name, sa.MetaData(), schema=schema) insp = sa.engine.reflection.Inspector.from_engine(self.engine) self.table_reflection = insp.get_columns(table_name, schema=schema) self.custom_where = None # TODO: implement stratify logic. icluding when converted to pandas self.stratify = False self.backend = backend
def get_data(self): if self.backend == "pandas": if self.custom_select_sql is None: query = sa.select("*").select_from(self._table) else: query = self.custom_select_sql # # temp # query = query.replace("\\", "") logger.info(query) data = pd.read_sql(query, self.engine) return data return self def _create_temp_table(self, table_name, sql_statement, schema=None, truncate=True): # TODO: definitely not tested with all databases # check if exists - if it does, truncate? try: exists = self.engine.dialect.has_table( self.engine, table_name, schema=schema ) except: exists = self._table_exists_fallback(table_name) schema = schema + "." if schema is not None else "" if not exists: if self.dialect == "oracle": create_query = f""" CREATE GLOBAL TEMPORARY TABLE {schema}{table_name} ON COMMIT PRESERVE ROWS AS ({sql_statement}) """ else: create_query = f""" CREATE TEMPORARY TABLE IF NOT EXISTS {table_name} ON COMMIT PRESERVE ROWS AS ({sql_statement}) """ with self.engine.connect() as conn: conn.execute(create_query) else: if truncate: self.engine.execute(f"truncate table {schema}{table_name}") with self.engine.connect() as conn: conn.execute(f"INSERT INTO {schema}{table_name} ({sql_statement})") def _drop_temp_table(self): self._table.drop()
[docs] def set_custom_where(self, custom_where: str): """Set this when you want a function to run on a subset of the table Args: custom_where: The where portion of a sql statement. This can then be used in a function. See example in the documentation for more information """ self.custom_where = custom_where
def _table_exists_fallback(self, table_name): try: self.engine.execute( sa.select([sa.text("*")]).select_from(sa.text(table_name)) ) except: return False return True
class SparkSQLData(BaseData): """This is used when tracking a relational table""" def __init__( self, spark_config: dict = None, engine: sa.engine.base.Engine = None, table_name: str = None, schema: str = None, conn_string: str = None, custom_select_sql: str = None, backend="spark-sql", ): # spark_config = [] conf = pyspark.SparkConf() conf.setAll(pairs=[("spark.ui.port", 8081)]) self.spark = ( pyspark.sql.SparkSession.builder.appName("qualipy") .master("local[32]") .config(conf=conf) .getOrCreate() ) if engine is not None: self.engine = engine else: self.engine = sa.create_engine(conn_string) self.table_name = table_name self.schema = schema self._table = sa.Table(table_name, sa.MetaData(), schema=schema) insp = sa.engine.reflection.Inspector.from_engine(self.engine) self.table_reflection = insp.get_columns(table_name, schema=schema) self.custom_where = None # TODO: implement stratify logic. icluding when converted to pandas self.stratify = False self.backend = backend query_string = ( custom_select_sql if custom_select_sql is not None else f"select * from {self.table_name}" ) # temp query_string = query_string.replace("\\", "") table_df = ( self.spark.read.format("jdbc") .option("url", spark_config["jdbc_url"]) .option("user", spark_config["username"]) .option("password", spark_config["password"]) .option("driver", "oracle.jdbc.driver.OracleDriver") .option("dbtable", f"({query_string}) t") ) self.table_df = table_df.load() def get_data(self): if self.backend == "pandas": return self.table_df.toPandas() return self def set_custom_where(self, custom_where: str): """Set this when you want a function to run on a subset of the table Args: custom_where: The where portion of a sql statement. This can then be used in a function. See example in the documentation for more information """ self.custom_where = custom_where def _table_exists_fallback(self, table_name): try: self.engine.execute( sa.select([sa.text("*")]).select_from(sa.text(table_name)) ) except: return False return True