Source code for qualipy.project

import json
import os
import copy
import codecs
import datetime
import pandas as pd
from typing import List, Union, Dict

try:
    from collections.abc import Callable
except ImportError:
    from collections import Callable

from sqlalchemy import create_engine
import dill

from qualipy.util import HOME, copy_function_spec, get_latest_insert_only
from qualipy._sql import DB_ENGINES
from qualipy.reflect.column import Column
from qualipy.reflect.table import Table
from qualipy._schema import config_schema
from qualipy.backends.data_types import PANDAS_TYPES
from qualipy.config import QualipyConfig


DATA_TYPES = {"pandas": PANDAS_TYPES, "sql": {}}


def _validate_project_name(project_name):
    assert "-" not in project_name


def inspect_db_connection(url):
    # TODO: should grab dialect instead of this
    for backend in DB_ENGINES.keys():
        if backend in url:
            return backend


[docs]class Project(object): """The project class points to a specific configuration, and holds all mappings. It also includes a lot of useful utility functions for working with the management of projects """
[docs] def __init__(self, project_name: str, config_dir: str, re_init: bool = False): """ Args: project_name: The name of the project. This will be important for referencing in report generation later. The project_name can not be changed - as it used internally when storing data config_dir: A path to the configuration directory, as created using the CLI command ``qualipy generate-config``. See the (link here)``config`` section for more information """ self._initialize( project_name=project_name, config_dir=config_dir, re_init=re_init )
def _initialize( self, project_name: str, config_dir: str, re_init: bool = False, ): # need to come up with way that project does not need to redefined and re-imported # "QUALIPY_DB": "sqlite:////data/baasman/.omop-prod2/qualipy.db", self.project_name = project_name self.value_table = "{}_values".format(self.project_name) self.value_custom_table = "{}_values_custom".format(self.project_name) self.anomaly_table = "{}_anomaly".format(self.project_name) if not re_init: self.columns = {} config_dir = os.path.expanduser(config_dir) self.config_dir = config_dir if not os.path.isdir(config_dir): raise Exception( f"""Directory {config_dir} does not exist. \nRun 'qualipy generate-config' before instantiating a project.""" ) self.config = QualipyConfig( config_dir=self.config_dir, project_name=project_name ) self.config.set_default_project_config(project_name) self.projects = self.config.get_projects() engine = self.config["QUALIPY_DB"] self.engine = create_engine(engine) self.db_schema = self.config.get("SCHEMA") self.sql_helper = DB_ENGINES[inspect_db_connection(str(self.engine.url))]( self.engine, self.db_schema ) if re_init: exists = self.sql_helper.does_table_exist(self.project_name) if exists is None: raise Exception(f"Project {project_name} does not exist.") if not re_init: self.sql_helper.create_schema_if_not_exists() self.sql_helper.create_table(self.project_name) self.sql_helper.create_anomaly_table(self.anomaly_table) self.sql_helper.reflect_tables(self.project_name, self.anomaly_table) if not re_init: self._functions_used_in_project = {} else: pass # TODO: def change_config_dir(self, config_dir): self._initialize(self.project_name, config_dir, True)
[docs] def add_column( self, column: Column, name: str = None, column_stage_collection_name: str = None ) -> None: """Add a mapping to this project This is the method to use when adding a column mapping to the project. Once added, it will automatically be executed when running the pipeline. Args: column: The column object. Can either be created through the function method or class method. name: This is useful when you don't want to run all mappings at once. Often, you'll do analysis on different subsets of the same dataset. Use name to reference it later on and only execute it for a specific subset. This name is also essential if you want to analyze the same column, but in a different subset of the data. Returns: None """ if isinstance(column, list): for col in column: self._add_column(col) self._get_unique_functions(col) else: if isinstance(column, Callable): self._add_column_func(column, name) else: self._add_column_class(column, name) self._get_unique_functions(name)
def _get_unique_functions(self, name): if name is None: for column_name in self.columns: column = self.columns[column_name] for function in column["functions"] + column["extra_functions"]: if function[0] not in self._functions_used_in_project: self._functions_used_in_project[function[0]] = { "display_name": function[1].display_name, "description": function[1].description, } else: column = self.columns[name] for function in column["functions"] + column["extra_functions"]: if function[0] not in self._functions_used_in_project: self._functions_used_in_project[function[0]] = { "display_name": function[1].display_name, "description": function[1].description, } def add_table(self, table: Table) -> None: for column in table.columns: self.add_column(column) def _add_column_class( self, column: Union[Column, List[Column]], name: str = None ) -> None: if name is None: name = column.column_name if isinstance(name, list): for n in name: self.columns[n] = column._as_dict(name=n) else: self.columns[name] = column._as_dict(name=name) def _add_column_func( self, column: Union[Column, List[Column]], name: str = None ) -> None: if name is None: name = column.column_name if isinstance(name, list): for n in name: self.columns[n] = column(name=n) else: con_inst = column(name) if con_inst["split_on"] is not None: name = f'{name}||{con_inst["split_on"][1]}' self.columns[name] = column(name) def add_external_columns(self, columns): self.columns = {**self.columns, **columns} def get_project_table( self, expand_meta=False, latest_insert_only=True ) -> pd.DataFrame: table = self.sql_helper.get_project_table() if latest_insert_only: table = get_latest_insert_only(table) if expand_meta: table.meta = table.meta.apply( lambda x: json.loads(x) if x is not None else {} ) table = pd.concat( [ table.drop("meta", axis=1).reset_index(drop=True), pd.DataFrame(table.meta.values.tolist()).reset_index(drop=True), ], axis=1, ) return table def get_anomaly_table(self) -> pd.DataFrame: return self.sql_helper.get_anomaly_table() def delete_data(self, recreate=True): self.sql_helper.delete_data( name=self.project_name, anomaly_name=self.anomaly_table, recreate=recreate, ) def delete_existing_batch(self, trans, batch_name): self.sql_helper.delete_existing_batch(trans, batch_name) def delete_from_project_config(self): self.projects.pop(self.project_name, None) def write_functions_to_config(self): if "DISPLAY_NAMES" not in self.config[self.project_name]: self.config[self.project_name]["DISPLAY_NAMES"] = {} self.config[self.project_name]["DISPLAY_NAMES"][ "DEFAULT" ] = self._functions_used_in_project def serialize_project(self, use_dill: bool = True) -> None: to_dict_cols = copy.deepcopy(self.columns) serialized_dict = {} for col_name, schema in to_dict_cols.items(): new_schema = schema for idx, function in enumerate(schema["functions"]): fun_name = function[0] function_obj = function[1] fun = codecs.encode(dill.dumps(function_obj), "base64").decode() new_schema["functions"][idx] = (function_obj.__module__, fun) for idx, function in enumerate(schema["extra_functions"]): fun_name = function[0] function_obj = function[1] fun = codecs.encode(dill.dumps(function_obj), "base64").decode() new_schema["extra_functions"][idx] = (function_obj.__module__, fun) new_schema["type"] = str(new_schema["type"]) serialized_dict[col_name] = new_schema self.projects[self.project_name] = serialized_dict with open(os.path.join(self.config_dir, "projects.json"), "w") as f: json.dump(self.projects, f) def _load_from_dict(self, column_dict: dict): self.columns = column_dict def update_config_and_project_files(self): self.config.dump() with open(os.path.join(self.config_dir, "projects.json"), "w") as f: json.dump(self.projects, f)
def load_project( config_dir: Union[str, QualipyConfig], project_name: str, backend: str = "sql", reload_functions: bool = None, ) -> Project: if isinstance(config_dir, str): config_dir = os.path.expanduser(config_dir) config = QualipyConfig(config_dir=config_dir, project_name=project_name) elif isinstance(config_dir, QualipyConfig): config = config_dir projects = config.get_projects() if project_name not in projects: raise Exception(f"{project_name} has not yet been created or serialized") project = projects[project_name] data_types = DATA_TYPES[backend] reconstructed_dict = {} for col_name, schema in project.items(): new_schema = schema if not reload_functions: for idx, function in enumerate(schema["functions"]): fun_name = function[0] function_obj = function[1] fun = dill.loads(codecs.decode(function_obj.encode(), "base64")) new_schema["functions"][idx] = (fun_name, fun) for idx, function in enumerate(schema["extra_functions"]): fun_name = function[0] function_obj = function[1] fun = dill.loads(codecs.decode(function_obj.encode(), "base64")) new_schema["extra_functions"][idx] = (fun_name, fun) else: for idx, function in enumerate(schema["functions"]): fun_name = function[0] function_obj = function[1] function_obj = copy_function_spec(fun_name) new_schema["functions"][idx] = ( function_obj.__name__, function_obj, ) for idx, function in enumerate(schema["extra_functions"]): fun_name = function[0] function_obj = function[1] function_obj = copy_function_spec(fun_name) new_schema["extra_functions"][idx] = ( function_obj.__name__, function_obj, ) if backend == "pandas": new_schema["type"] = data_types[new_schema["type"]]() reconstructed_dict[col_name] = new_schema project = Project(project_name=project_name, config_dir=config.config_dir) project._load_from_dict(reconstructed_dict) return project