Data Handler Class
The class has a few helper methods to aid in sending data between Microsoft SQL and Pandas data frames. The class uses the PandasSqlHandler class.
Methods
__init__
1 2 |
def __init__(self, db_handler:PandasSqlHandler): self.db_handler = db_handler |
Assign the PanadaSqlHandler class.
_adjust_dataframe_types
1 2 3 4 5 6 7 8 9 10 11 |
def _adjust_dataframe_types(self, df): """ Adjusts the DataFrame's data types to be compatible with SQL Server """ for column in df.columns: match df[column].dtype: case 'float64': df[column] = df[column].astype('float32') # Adjust precision if needed case 'int64': df[column] = df[column].astype('int32') # Adjust for SQL Server int handling case 'object': df[column] = df[column].astype('str') # Ensure text data type compatibility return df |
Loop through the columns and set the convert the pandas data type to a data type that Microsoft SQL can support.
__sql_to_pandas_dtype
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
def __sql_to_pandas_dtype(self, sql_type:str): """converts sql data type to appropirate pandas datatype Parameters ---------- sql_type : str """ match sql_type.lower(): case 'int': return np.int32 case 'varchar': return str case 'nvarchar': return str case 'date' | 'datetime' | 'datetime2': return 'datetime64[ns]' case 'bit': return 'bool' case _ : return object |
Convert Microsoft SQL datatypes to applicable Pandas data frame datatype.
get_table_columns
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
def get_table_columns(self, query:str, filter_columns:list, query_params:tuple=None): """ Get the meta data about the table, including the column name and sql data type. The sql data type will be converted to a pandas data type. Parameters ---------- query : str query used to find the list of columns and data types filter_columns : list to filter columns to show to user query_params : tuple, optional parameters to be inserted into the query """ # convert filter_columns to a string filter_columns = "','".join(filter_columns) # get the data frame and apply filter to it metadata_df = self.db_handler.read_table(query=query, params=query_params) metadata_df = self._adjust_dataframe_types(metadata_df).query(f"COLUMN_NAME in ['{filter_columns}']") column_info = dict(zip(metadata_df["COLUMN_NAME"], metadata_df["DATA_TYPE"].apply(self.__sql_to_pandas_dtype) )) return column_info |
Filter columns are stored in the config / sql_column_config.yaml file. We may only want to show certain columns to the user.
Next we query the database and read the column meta data. Then we drop any rows that are not in the filter columns list.
Lastly we return a dictionary with the column name and the converted the data_type from Microsoft SQL data type to a Pandas datatype.
add_row
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
def add_row(self, table_name:str, data: pd.DataFrame): """ Ensure compatible data types for SQL Server before inserting Parameters ---------- table_name : str The name table to add a row to. Table name should include the schema and table name. i.e - dbo.my_table_name - log.my_other_table data : pandas.DataFrame Data frame holding the data to be inserted into the source table """ if not isinstance(data, pd.DataFrame) or len(data) != 1: raise ValueError("row data should be a pandas DataFrame with a single row") # Pass the row data to the PandasSQLHandler to handle the database insertion #self.db_handler.insert_row(table_name, row_data) row_dict = data.iloc[0].to_dict() # Create an SQL INSERT statement with placeholders columns = ', '.join(row_dict.keys()) placeholders = ', '.join([f":{col}" for col in row_dict.keys()]) query = f"INSERT INTO {table_name} ({columns}) VALUES ({placeholders})" self.db_handler.execute_query(query, row_dict) |
We check that the data is a Pandas data frame and there is only one data frame. If there is an issue we raise an error. The method does allow to insert multiple rows at the same time.
We use the first row to create a dictionary variable called row_dict. It will be used to create the placeholders.
Next we grab the list of column names and store them in the columns variable.
Next we create the placeholders, and join them together as a string. The placeholders are the values to insert into the table.
Next we will format the query string, inserting the table name, columns, and placeholders.
Lastly will execute the query.
update_row
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
def update_row(self, table:str, update_data:dict, primary_key_column:str, row_id): """ Build an UPDATE query dynamically based on changed column values Parameters ---------- table_name : str The name table to update row data. Table name should include the schema and table name. i.e - dbo.my_table_name - log.my_other_table update_data : dict Dictionary holding column name value to be updated in the source table primary_key : str column name that represents the primary key. row_id : any The row to be upated. """ # Convert all values to native Python types (e.g., int, str) update_data = {key: (value.item() if hasattr(value, 'item') else value) for key, value in update_data.items()} row_id = int(row_id) set_clause = ", ".join([f"{col} = :{col}" for col in update_data.keys()]) query = f"UPDATE {table} SET {set_clause} WHERE {primary_key_column} = :row_id" # Execute the query with the values for each column and row ID params = {**update_data, 'row_id': row_id} self.db_handler.execute_query(query, params) |
Similiar to the add_row method, we will prepare the query and merge into a string and execute.
First create a dictionary update_data for all the columns and their applicable values to be updated.
Next due a conditional conversion on row_id so that it is converted to int 32 if an int 64 value.
Next create the string that will hold the column names and the updated value.
Next merge the update_data dictionary and row_id value into one dictionary that can be passed to the execute_query method.
Lastly execute the query.
delete_row
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
def delete_row(self, table_name:str, primary_key_column:str, row_id): """ Build a delete query dynamically based on changed column values Parameters ---------- table_name : str The name table to update row data. Table name should include the schema and table name. i.e - dbo.my_table_name - log.my_other_table update_data : dict Dictionary holding column name value to be updated in the source table primary_key : str column name that represents the primary key. row_id : any The row to be upated. """ query = f"DELETE FROM {table_name} WHERE {primary_key_column} = {row_id}" self.db_handler.execute_query(query) |
Create a query string to indicate which row to delete from the table.
First create the string merging the table_name and the row_id.
Execute the sql query using the execute_query method.
Next page will talk about the PandasSQLHandler Class
Leave a Reply