Source code for qualipy.run

import pandas as pd
import numpy as np

import os
import datetime
from typing import Any, Dict, Optional, Union, Dict, List, Callable
import warnings
import logging
import copy

from qualipy.backends.pandas_backend.generator import BackendPandas
from qualipy.backends.sql_backend.generator import BackendSQL
from qualipy.exceptions import FailException, NullableError
from qualipy.project import Project
from qualipy.util import setup_logging
from qualipy.backends.base import MetricResult

try:
    from qualipy.backends.spark_backend.generator import BackendSpark
except Exception as e:
    BackendSpark = None

# supress numpy future warning for now
warnings.simplefilter(action="ignore", category=FutureWarning)


HOME = os.path.expanduser("~")


GENERATORS = {"pandas": BackendPandas, "spark": BackendSpark, "sql": BackendSQL}

# types
Measure = List[Dict[str, Any]]


# TODO: dont really need this method now
def _create_value(
    value: Any,
    metric: str,
    name: str,
    date: datetime.datetime,
    type: str,
    return_format: str,
    run_name,
):
    metric_res = MetricResult(
        value=value,
        metric=metric,
        date=date,
        column_name=name,
        return_format=return_format,
        type=type,
        run_name=run_name,
    )
    return metric_res


[docs]class Qualipy(object): """ This is the main entrypoint to Qualipy. This is the object that will actually execute on your data. """
[docs] def __init__( self, project: Project, backend: str = "pandas", time_of_run: Optional[datetime.datetime] = None, batch_name: str = None, overwrite_arguments: dict = None, ): """ Args: project: Your defined qualipy.Project backend: Can be either "pandas", "sql", or "spark" depending on what kind of data you are tracking time_of_run: If None, this will be the current datetime. Note, this is very important for analysis, as time_of_run is essentially your x_axis in all time series analysis. Being able to set it to a specific date can be useful when generating retrospective statistics. batch_name: Useful for comparing specific time points by name during analysis. By default it will take the time_of_run as batch_name """ self.project = project self.time_of_run = ( datetime.datetime.now() if time_of_run is None else time_of_run ) self.batch_name = batch_name if batch_name is not None else self.time_of_run self.current_data = None self.total_measures = [] self.generator = GENERATORS[backend](project.config_dir) self.chunk = False self.run_n = 0 self.schema = {} self.from_step = None self.stratify = False self.backend = backend self.overwrite_arguments = overwrite_arguments self._setup_logger() self.logger = logging.getLogger(__name__) self.logger.info(f"Working on batch {self.batch_name}")
def run(self, autocommit: bool = False, profile_batch=False) -> None: """The method that runs the execution Note: You must first set a dataset using either ``set_dataset`` or ``set_chunked_dataset`` Args: autocommit: If set to True, qualipy will automatically write to it's backend. If set to False, the user will have to manually run the ``commit`` function. profile_batch: If set to True, Qualipy will generate metadata used to construct a batch report by using the ``produce_batch_report`` CLI command. Returns: None """ if not self.chunk: self._run_with_optional_stratify(autocommit, profile_batch=profile_batch) self.run_n += 1 else: if profile_batch: raise Exception("Can only profile batch without chunking data") for chunk in self.time_chunks: self.logger.info(f"Running on chunk: {chunk['batch_name']}") self.current_data = chunk["chunk"] if self.current_data.shape[0] == 0: self.current_data = self.fallback_data if self.current_data is not None: self.batch_name = str(chunk["batch_name"]) self.time_of_run = chunk["batch_name"] self._run_with_optional_stratify(autocommit) def _run_with_optional_stratify(self, autocommit, profile_batch=False): if self.stratify: self.original_data = self.current_data.copy() self.original_name = self.current_name for stratify_value in self.stratify_values: self.current_data = self.stratify_function( self.current_data, stratify_value ) self.current_name = f"{self.current_name}_{stratify_value}" self._generate_metrics( autocommit=autocommit, profile_batch=profile_batch ) # turn back name and data self.current_name = self.original_name self.current_data = self.original_data else: self._generate_metrics(autocommit=autocommit, profile_batch=profile_batch) def _setup_logger(self): setup_logging()
[docs] def set_dataset( self, df, columns: Optional[List[str]] = None, run_name: str = None ) -> None: """This specified the exact subset of data you want to run on. Use this method when you don't have all of the data (a live process) and want to only run on one batch of data. Args: df: Can be either PandasData, SQLData, or SparkData columns: If you don't want to run all mappings on this specific subset of data, you can specify just the columns you want to run. Note - this corresponds to the ``name`` argument when adding a column to a project run_name: If you're running metrics from a project on many different subsets any iterations of the data, you might want to give each specific subset a name. This is especially necessary when running aggregates on a column where the column name itself stays the same, but the meaning changes based on the subset. By default, this will take the value of '0' Returns: None """ # NOTE: if sqldata but pandas backend, should pull data and work on that! # also give option of query or taking last x rows self._set_data( df, allowed_dataclasses=["SQLData", "PandasData", "SparkData", "SparkSQLData"], ) self.current_name = run_name if run_name is not None else self.run_n self._set_stratification(df) self.columns = self._set_columns(columns) self._set_schema(self.current_data)
[docs] def set_chunked_dataset( self, df, columns: Optional[List[str]] = None, run_name: str = None, time_freq: str = "1D", time_column=None, ): """This specified the exact subset of data you want to run on. Use this method when you already have all data available, and want to retrospectively analyze all historical as if it was a live process. Note - There's nothing stopping you from first running this on the available data and then running on a batch-per-batch basis afterwards using regular ``set_dataset``. Args: df: Can be either PandasData, SQLData, or SparkData columns: If you don't want to run all mappings on this specific subset of data, you can specify just the columns you want to run. Note - this corresponds to the ``name`` argument when adding a column to a project run_name: If you're running metrics from a project on many different subsets any iterations of the data, you might want to give each specific subset a name. This is especially necessary when running aggregates on a column where the column name itself stays the same, but the meaning changes based on the subset. By default, this will take the value of '0' time_freq: A pandas-like timeseries frequency term. Use this page to know what you can use: https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#timeseries-offset-aliases (turn to link) time_column: The time series column qualipy should use to chunk the data Returns: None """ self._set_data(df, allowed_dataclasses=["SQLData", "PandasData", "SparkData"]) self.current_name = run_name if run_name is not None else self.run_n self._set_stratification(df) self.columns = self._set_columns(columns) self._set_schema(self.current_data) self.chunk = True try: time_column = ( time_column if time_column is not None else self.project.time_column ) except AttributeError: raise Exception( "No time_column specified. Must be given if chunking dataset" ) self.time_chunks = self.generator.get_chunks( self.current_data, time_freq, time_column )
def _set_data(self, df, allowed_dataclasses): if df.__class__.__name__ in allowed_dataclasses: self.current_data = df.get_data() try: self.fallback_data = df.set_fallback_data() except: pass else: raise Exception(f"{df.__class__.__name__} is not yet a supported datatype") def _set_stratification(self, df): # stratification only implemented in Pandas for now if self.backend == "pandas": if df.stratify: self.stratify = True self.stratify_values = df.stratify_values self.stratify_function = df.subset_function() def _set_schema(self, df): schema = self.generator.set_schema(df, self.columns, self.current_name) self.schema = {**self.schema, **schema} def _set_columns(self, columns: Optional[List[str]]): if columns is None: ret_columns = self.project.columns else: ret_columns = {} for col, items in self.project.columns.items(): stage_name = items.get("column_stage_collection_name") if stage_name in columns: ret_columns[col] = items return ret_columns def commit(self, delete_existing_batch=False): with self.project.engine.begin() as conn: if delete_existing_batch: self._delete_existing_batch(conn) self._write(conn=conn, measures=self.total_measures) self.project.write_functions_to_config() self.project.update_config_and_project_files() def _set_default_view(self): self.data_view = self.generator.return_data_copy(self.current_data) self.current_name_view = self.current_name def _generate_metrics( self, autocommit: bool = True, profile_batch: bool = False ) -> None: measures = [] self._set_default_view() for col, specs in self.project.columns.items(): if col not in self.columns: continue self.logger.info(f"Analyzing column: {col}") if specs["split_on"] is not None: column_name = specs["name"].split("||")[0] self.data_view = self.generator.return_split_subset( self.current_data, specs["split_on"][0], specs["split_on"][1] ) self.current_name_view = f"{self.current_name}-{specs['split_on'][1]}" else: column_name = specs["name"] self.data_view = self.generator.return_data_copy(self.current_data) self.current_name_view = self.current_name # enforce type for function # TODO: fix types when sql data is converted to pandas data try: if specs["type"] is not None: self.generator.check_type( data=self.data_view, column=column_name, desired_type=specs["type"], force=specs["force_type"], ) overwrite_type = specs["overwrite_type"] if overwrite_type: self.data_view = self.generator.overwrite_type( self.data_view, column_name, specs["type"] ) except AttributeError: pass # get default column info measures = self._get_column_specific_general_info(specs, measures) for function_name, function in ( specs["functions"] + specs["extra_functions"] ): should_fail = function.fail arguments = function.arguments return_format = function.return_format # return_format_repr = types[return_format] viz_type = self._set_viz_type(function, function_name) # generate result row result = self.generator.generate_description( function=function, data=self.data_view, column=column_name, function_name=function_name, date=self.time_of_run, viz_type=viz_type, return_format=return_format, run_name=self.current_name_view, kwargs=arguments, overwrite_kwargs=self.overwrite_arguments, ) # set value type result.set_return_value_type() if should_fail and not result["value"]: raise FailException( "Program halted by function '{}' for variable '{}' with " "parameter 'fail=True'".format(function_name, col) ) if return_format == "custom": copy_of_result = copy.deepcopy(result) copy_of_result.update_keys(value=None) for sub_value in result.value: new_result = copy.deepcopy(copy_of_result) new_result.update_keys( value=sub_value["value"], run_name=sub_value["run_name"] ) new_result.update_keys( return_format=function.custom_value_return_format ) if "metric_name" in sub_value: new_result.update_keys(metric=sub_value["metric_name"]) if "meta" in sub_value: new_result.update_keys(meta=sub_value["meta"]) measures.append(new_result) else: measures.append(result) measures = self._get_general_info(measures) # measures = [{**m, **{"run_name": self.current_name_view}} for m in measures] self._add_to_total_measures(measures) if profile_batch: self.generator.profile_batch( self.data_view, self.batch_name, self.current_name_view, self.columns, self.project.config_dir, self.project.project_name, ) if autocommit: self.commit() def _add_to_total_measures(self, measures: List[Dict]): self.total_measures.extend(measures) def _get_column_specific_general_info(self, specs, measures: Measure): col_name = specs["name"] ( unique, perc_missing, value_props, distinct, ) = self.generator.generate_column_general_info( specs, self.data_view, self.time_of_run, self.current_name_view ) if unique is not None: measures.append(unique) if value_props is not None: measures.append(value_props) if distinct is not None: measures.append(distinct) measures.append(perc_missing) if perc_missing.value > 0 and specs["force_null"] and not specs["null"]: raise NullableError( "Column {} has {} percent missing even" " though it is not nullable".format(col_name, perc_missing["value"]) ) measures.append( _create_value( str(self.generator.get_dtype(self.data_view, col_name)), "dtype", col_name, self.time_of_run, "data-characteristic", str, self.current_name_view, ) ) return measures def _get_general_info(self, measures: Measure) -> Measure: rows, cols = self.generator.get_shape(self.data_view) measures.append( _create_value( rows, "count", "rows", self.time_of_run, "data-characteristic", int, self.current_name, ) ) measures.append( _create_value( cols, "count", "columns", self.time_of_run, "data-characteristic", int, self.current_name, ) ) return measures def _set_viz_type(self, function: Callable, function_name: str) -> str: return_format = function.return_format if return_format == "custom": return_format = function.custom_value_return_format types = { float: "numerical", int: "numerical", bool: "boolean", dict: "categorical", str: "not_sure", } viz_type = types[return_format] return viz_type def _write(self, conn, measures: Measure) -> None: # TODO: move this to sql class - not generator!!!! if self.chunk: batch_name = "from_chunked" else: batch_name = self.batch_name self.generator.write( conn, measures, self.project, batch_name, schema=self.project.db_schema ) def _delete_existing_batch(self, conn): if self.chunk: batch_name = "from_chunked" else: batch_name = self.batch_name self.project.delete_existing_batch(conn, batch_name=batch_name)