diff --git a/application/single_app/semantic_kernel_plugins/sql_schema_plugin.py b/application/single_app/semantic_kernel_plugins/sql_schema_plugin.py index 01d89aa2..db27b206 100644 --- a/application/single_app/semantic_kernel_plugins/sql_schema_plugin.py +++ b/application/single_app/semantic_kernel_plugins/sql_schema_plugin.py @@ -1,40 +1,52 @@ """ + SQL Schema Plugin for Semantic Kernel + - Connects to various SQL databases (SQL Server, PostgreSQL, MySQL, SQLite) + - Extracts schema information (tables, columns, data types, relationships) + - Provides structured schema data for query generation + """ import logging from typing import Dict, Any, List, Optional, Union + from semantic_kernel_plugins.base_plugin import BasePlugin from semantic_kernel.functions import kernel_function from functions_appinsights import log_event from semantic_kernel_plugins.plugin_invocation_logger import plugin_function_logger from functions_debug import debug_print + # Helper class to wrap results with metadata class ResultWithMetadata: def __init__(self, data, metadata): self.data = data self.metadata = metadata + def __str__(self): return str(self.data) + def __repr__(self): return f"ResultWithMetadata(data={self.data!r}, metadata={self.metadata!r})" + class SQLSchemaPlugin(BasePlugin): + def __init__(self, manifest: Dict[str, Any]): super().__init__(manifest) self.manifest = manifest - + # Extract parameters from additionalFields if present, otherwise use direct manifest additional_fields = manifest.get('additionalFields', {}) - self.connection_string = manifest.get('connection_string') or additional_fields.get('connection_string') raw_db_type = (manifest.get('database_type') or additional_fields.get('database_type', 'sqlserver')).lower() + # Map azure_sql to sqlserver for compatibility self.database_type = 'sqlserver' if raw_db_type in ['azure_sql', 'azuresql'] else raw_db_type + self.auth_type = manifest.get('auth', {}).get('type', 'connection_string') self.server = manifest.get('server') or additional_fields.get('server') self.database = manifest.get('database') or additional_fields.get('database') @@ -42,7 +54,7 @@ def __init__(self, manifest: Dict[str, Any]): self.password = manifest.get('password') or additional_fields.get('password') self.driver = manifest.get('driver') or additional_fields.get('driver') self._metadata = manifest.get('metadata', {}) - + # Add comprehensive logging log_event(f"[SQLSchemaPlugin] Initializing plugin", extra={ "database_type": self.database_type, @@ -53,21 +65,23 @@ def __init__(self, manifest: Dict[str, Any]): "has_username": bool(self.username), "manifest_keys": list(manifest.keys()) }) - print(f"[SQLSchemaPlugin] Initializing - DB Type: {self.database_type}, Auth: {self.auth_type}, Server: {self.server}, Database: {self.database}") - + + debug_print(f"[SQLSchemaPlugin] Initializing - DB Type: {self.database_type}, Auth: {self.auth_type}, Server: {self.server}, Database: {self.database}") + # Validate required configuration if not self.connection_string and not (self.server and self.database): error_msg = "SQLSchemaPlugin requires either 'connection_string' or 'server' and 'database' in the manifest." log_event(f"[SQLSchemaPlugin] Configuration error: {error_msg}", extra={"manifest": manifest}) - print(f"[SQLSchemaPlugin] ERROR: {error_msg}") + debug_print(f"[SQLSchemaPlugin] ERROR: {error_msg}") raise ValueError(error_msg) - + # Set up database-specific configurations self._setup_database_config() - + # Initialize connection (lazy loading) self._connection = None - print(f"[SQLSchemaPlugin] Initialization complete") + + debug_print(f"[SQLSchemaPlugin] Initialization complete") def _setup_database_config(self): """Setup database-specific configurations and import requirements""" @@ -93,7 +107,7 @@ def _setup_database_config(self): 'default_port': None } } - + if self.database_type not in self.supported_databases: raise ValueError(f"Unsupported database type: {self.database_type}. Supported types: {list(self.supported_databases.keys())}") @@ -118,7 +132,7 @@ def _create_connection(self): else: conn_str += ";Trusted_Connection=yes" return pyodbc.connect(conn_str) - + elif self.database_type == 'postgresql': import psycopg2 if self.connection_string: @@ -130,12 +144,10 @@ def _create_connection(self): user=self.username, password=self.password ) - + elif self.database_type == 'mysql': import pymysql if self.connection_string: - # Parse connection string for MySQL - # This is a simplified parser - you might want to use a proper URL parser return pymysql.connect(self.connection_string) else: return pymysql.connect( @@ -144,16 +156,16 @@ def _create_connection(self): user=self.username, password=self.password ) - + elif self.database_type == 'sqlite': import sqlite3 database_path = self.connection_string or self.database return sqlite3.connect(database_path) - + except ImportError as e: raise ImportError(f"Required database driver not installed for {self.database_type}: {e}") except Exception as e: - log_event(f"[SQLSchemaPlugin] Connection failed: {e}", extra={"database_type": self.database_type}) + log_event(f"[SQLSchemaPlugin] Connection failed", extra={"database_type": self.database_type, "error": str(e)}) raise @property @@ -172,7 +184,6 @@ def metadata(self) -> Dict[str, Any]: "The plugin handles database-specific SQL variations for schema extraction." ) full_desc = f"{user_desc}\n\n{api_desc}" - return { "name": self._metadata.get("name", "sql_schema_plugin"), "type": "sql_schema", @@ -221,38 +232,40 @@ def get_functions(self) -> List[str]: @plugin_function_logger("SQLSchemaPlugin") @kernel_function(description="Get complete database schema including all tables, columns, and relationships") def get_database_schema( - self, + self, include_system_tables: bool = False, table_filter: Optional[str] = None ) -> ResultWithMetadata: """Get complete database schema""" + log_event(f"[SQLSchemaPlugin] get_database_schema called", extra={ "database_type": self.database_type, "database": self.database, "include_system_tables": include_system_tables, "table_filter": table_filter }) - print(f"[SQLSchemaPlugin] Getting database schema - DB: {self.database}, Include System: {include_system_tables}") - + + debug_print(f"[SQLSchemaPlugin] Getting database schema - DB: {self.database}, Include System: {include_system_tables}") + try: conn = self._get_connection() cursor = conn.cursor() - + schema_data = { "database_type": self.database_type, "database_name": self.database, "tables": {}, "relationships": [] } - + # Get tables list tables_query = self._get_tables_query(include_system_tables, table_filter) debug_print(f"[SQLSchemaPlugin] Executing tables query: {tables_query}") cursor.execute(tables_query) tables = cursor.fetchall() - - print(f"[SQLSchemaPlugin] Found {len(tables)} tables") - + + debug_print(f"[SQLSchemaPlugin] Found {len(tables)} tables") + # Get schema for each table for table in tables: if isinstance(table, tuple) and len(table) >= 2: @@ -263,31 +276,33 @@ def get_database_schema( table_name = table[0] if isinstance(table, tuple) else table schema_name = None qualified_table_name = table_name - + try: table_schema = self._get_table_schema_data(cursor, table_name, schema_name) schema_data["tables"][table_name] = table_schema - print(f"[SQLSchemaPlugin] Got schema for table: {qualified_table_name}") - except Exception as e: - print(f"[SQLSchemaPlugin] Error getting schema for table {qualified_table_name}: {e}") + debug_print(f"[SQLSchemaPlugin] Got schema for table: {qualified_table_name}") + except Exception as table_err: + # Log per-table errors with full detail for admin — not exposed to caller + debug_print(f"[SQLSchemaPlugin] Error getting schema for table {qualified_table_name}: {str(table_err)}") log_event(f"[SQLSchemaPlugin] Error getting table schema", extra={ "table_name": qualified_table_name, - "error": str(e) + "error": str(table_err) }) - + # Get relationships try: relationships = self._get_relationships_data(cursor) schema_data["relationships"] = relationships - print(f"[SQLSchemaPlugin] Found {len(relationships)} relationships") - except Exception as e: - print(f"[SQLSchemaPlugin] Error getting relationships: {e}") - + debug_print(f"[SQLSchemaPlugin] Found {len(relationships)} relationships") + except Exception as rel_err: + debug_print(f"[SQLSchemaPlugin] Error getting relationships: {str(rel_err)}") + log_event(f"[SQLSchemaPlugin] Error getting relationships", extra={"error": str(rel_err)}) + log_event(f"[SQLSchemaPlugin] get_database_schema completed", extra={ "tables_count": len(schema_data["tables"]), "relationships_count": len(schema_data["relationships"]) }) - + return ResultWithMetadata( schema_data, { @@ -297,41 +312,21 @@ def get_database_schema( "relationship_count": len(schema_data["relationships"]) } ) - + except Exception as e: - error_msg = f"Failed to get database schema: {str(e)}" - print(f"[SQLSchemaPlugin] ERROR: {error_msg}") + # Log full exception detail for admin/App Insights — NEVER expose str(e) to user + # str(e) can contain hostnames, DB names, table names, connection info, SQL fragments + debug_print(f"[SQLSchemaPlugin] get_database_schema failed: {str(e)}") log_event(f"[SQLSchemaPlugin] get_database_schema failed", extra={ "error": str(e), "database_type": self.database_type, "database": self.database }) + # Sanitized generic message returned to user return ResultWithMetadata( - {"error": error_msg}, + {"error": "An error occurred while retrieving the database schema. Please contact your administrator."}, {"source": "sql_schema_plugin", "success": False} ) - - # Get tables - tables_query = self._get_tables_query(include_system_tables, table_filter) - cursor.execute(tables_query) - tables = cursor.fetchall() - - # Get schema for each table - for table_row in tables: - table_name = table_row[0] if isinstance(table_row, (list, tuple)) else table_row - table_schema = self._get_table_schema_data(cursor, table_name) - schema_data["tables"][table_name] = table_schema - - # Get relationships - relationships = self._get_relationships_data(cursor) - schema_data["relationships"] = relationships - - log_event(f"[SQLSchemaPlugin] Retrieved schema for {len(schema_data['tables'])} tables") - return ResultWithMetadata(schema_data, self.metadata) - - except Exception as e: - log_event(f"[SQLSchemaPlugin] Error getting database schema: {e}") - raise @kernel_function(description="Get detailed schema for a specific table") @plugin_function_logger("SQLSchemaPlugin") @@ -340,20 +335,28 @@ def get_table_schema(self, table_name: str) -> ResultWithMetadata: try: conn = self._get_connection() cursor = conn.cursor() - + table_schema = self._get_table_schema_data(cursor, table_name) - + log_event(f"[SQLSchemaPlugin] Retrieved schema for table: {table_name}") return ResultWithMetadata(table_schema, self.metadata) - + except Exception as e: - log_event(f"[SQLSchemaPlugin] Error getting table schema for {table_name}: {e}") - raise + # Log full detail for admin — sanitized message returned to caller + debug_print(f"[SQLSchemaPlugin] Error getting table schema for {table_name}: {str(e)}") + log_event(f"[SQLSchemaPlugin] get_table_schema failed", extra={ + "table_name": table_name, + "error": str(e) + }) + return ResultWithMetadata( + {"error": "An error occurred while retrieving the table schema. Please contact your administrator."}, + {"source": "sql_schema_plugin", "success": False} + ) @kernel_function(description="Get list of all tables in the database") @plugin_function_logger("SQLSchemaPlugin") def get_table_list( - self, + self, include_system_tables: bool = False, table_filter: Optional[str] = None ) -> ResultWithMetadata: @@ -361,11 +364,11 @@ def get_table_list( try: conn = self._get_connection() cursor = conn.cursor() - + tables_query = self._get_tables_query(include_system_tables, table_filter) cursor.execute(tables_query) tables = cursor.fetchall() - + table_list = [] for table_row in tables: if isinstance(table_row, (list, tuple)): @@ -377,13 +380,18 @@ def get_table_list( else: table_info = {"table_name": table_row, "schema": None, "table_type": "TABLE"} table_list.append(table_info) - + log_event(f"[SQLSchemaPlugin] Retrieved {len(table_list)} tables") return ResultWithMetadata(table_list, self.metadata) - + except Exception as e: - log_event(f"[SQLSchemaPlugin] Error getting table list: {e}") - raise + # Log full detail for admin — sanitized message returned to caller + debug_print(f"[SQLSchemaPlugin] Error getting table list: {str(e)}") + log_event(f"[SQLSchemaPlugin] get_table_list failed", extra={"error": str(e)}) + return ResultWithMetadata( + {"error": "An error occurred while retrieving the table list. Please contact your administrator."}, + {"source": "sql_schema_plugin", "success": False} + ) @kernel_function(description="Get foreign key relationships between tables") def get_relationships(self, table_name: Optional[str] = None) -> ResultWithMetadata: @@ -391,22 +399,27 @@ def get_relationships(self, table_name: Optional[str] = None) -> ResultWithMetad try: conn = self._get_connection() cursor = conn.cursor() - + relationships = self._get_relationships_data(cursor, table_name) - + log_event(f"[SQLSchemaPlugin] Retrieved {len(relationships)} relationships") return ResultWithMetadata(relationships, self.metadata) - + except Exception as e: - log_event(f"[SQLSchemaPlugin] Error getting relationships: {e}") - raise + # Log full detail for admin — sanitized message returned to caller + debug_print(f"[SQLSchemaPlugin] Error getting relationships: {str(e)}") + log_event(f"[SQLSchemaPlugin] get_relationships failed", extra={"error": str(e)}) + return ResultWithMetadata( + {"error": "An error occurred while retrieving relationships. Please contact your administrator."}, + {"source": "sql_schema_plugin", "success": False} + ) def _get_tables_query(self, include_system_tables: bool, table_filter: Optional[str]) -> str: """Get database-specific query for listing tables""" if self.database_type == 'sqlserver': base_query = """ - SELECT TABLE_NAME, TABLE_SCHEMA, TABLE_TYPE - FROM INFORMATION_SCHEMA.TABLES + SELECT TABLE_NAME, TABLE_SCHEMA, TABLE_TYPE + FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE' """ if not include_system_tables: @@ -414,7 +427,7 @@ def _get_tables_query(self, include_system_tables: bool, table_filter: Optional[ if table_filter: base_query += f" AND TABLE_NAME LIKE '{table_filter.replace('*', '%')}'" return base_query - + elif self.database_type == 'postgresql': base_query = """ SELECT tablename, schemaname, 'BASE TABLE' as table_type @@ -425,7 +438,7 @@ def _get_tables_query(self, include_system_tables: bool, table_filter: Optional[ if table_filter: base_query += f" {'AND' if not include_system_tables else 'WHERE'} tablename LIKE '{table_filter.replace('*', '%')}'" return base_query - + elif self.database_type == 'mysql': base_query = f"SHOW TABLES" if self.database: @@ -433,7 +446,7 @@ def _get_tables_query(self, include_system_tables: bool, table_filter: Optional[ if table_filter: base_query += f" LIKE '{table_filter.replace('*', '%')}'" return base_query - + elif self.database_type == 'sqlite': base_query = "SELECT name FROM sqlite_master WHERE type='table'" if not include_system_tables: @@ -452,23 +465,23 @@ def _get_table_schema_data(self, cursor, table_name: str, schema_name: str = Non "foreign_keys": [], "indexes": [] } - + # Get columns columns_query = self._get_columns_query(table_name, schema_name) cursor.execute(columns_query) columns = cursor.fetchall() - + for col in columns: column_info = self._parse_column_info(col) schema_data["columns"].append(column_info) - + # Get primary keys pk_query = self._get_primary_keys_query(table_name, schema_name) if pk_query: cursor.execute(pk_query) pks = cursor.fetchall() schema_data["primary_keys"] = [pk[0] if isinstance(pk, (list, tuple)) else pk for pk in pks] - + return schema_data def _get_columns_query(self, table_name: str, schema_name: str = None) -> str: @@ -478,12 +491,13 @@ def _get_columns_query(self, table_name: str, schema_name: str = None) -> str: if schema_name: where_clause += f" AND TABLE_SCHEMA = '{schema_name}'" return f""" - SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_DEFAULT, + SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_DEFAULT, CHARACTER_MAXIMUM_LENGTH, NUMERIC_PRECISION, NUMERIC_SCALE - FROM INFORMATION_SCHEMA.COLUMNS + FROM INFORMATION_SCHEMA.COLUMNS {where_clause} ORDER BY ORDINAL_POSITION """ + elif self.database_type == 'postgresql': return f""" SELECT column_name, data_type, is_nullable, column_default, @@ -492,8 +506,10 @@ def _get_columns_query(self, table_name: str, schema_name: str = None) -> str: WHERE table_name = '{table_name}' ORDER BY ordinal_position """ + elif self.database_type == 'mysql': return f"DESCRIBE {table_name}" + elif self.database_type == 'sqlite': return f"PRAGMA table_info({table_name})" @@ -509,6 +525,7 @@ def _get_primary_keys_query(self, table_name: str, schema_name: str = None) -> O {where_clause} AND CONSTRAINT_NAME LIKE 'PK_%' """ + elif self.database_type == 'postgresql': return f""" SELECT a.attname @@ -516,12 +533,14 @@ def _get_primary_keys_query(self, table_name: str, schema_name: str = None) -> O JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = '{table_name}'::regclass AND i.indisprimary """ + elif self.database_type == 'mysql': return f""" - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE WHERE TABLE_NAME = '{table_name}' AND CONSTRAINT_NAME = 'PRIMARY' """ + # SQLite primary keys are handled in the table_info query return None @@ -557,10 +576,10 @@ def _parse_column_info(self, col) -> Dict[str, Any]: def _get_relationships_data(self, cursor, table_name: Optional[str] = None) -> List[Dict[str, Any]]: """Get foreign key relationships""" relationships = [] - + if self.database_type == 'sqlserver': query = """ - SELECT + SELECT fk.name AS constraint_name, tp.name AS parent_table, cp.name AS parent_column, @@ -575,7 +594,7 @@ def _get_relationships_data(self, cursor, table_name: Optional[str] = None) -> L """ if table_name: query += f" WHERE tp.name = '{table_name}' OR tr.name = '{table_name}'" - + elif self.database_type == 'postgresql': query = """ SELECT @@ -586,18 +605,18 @@ def _get_relationships_data(self, cursor, table_name: Optional[str] = None) -> L ccu.column_name AS referenced_column FROM information_schema.table_constraints AS tc JOIN information_schema.key_column_usage AS kcu - ON tc.constraint_name = kcu.constraint_name + ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage AS ccu - ON ccu.constraint_name = tc.constraint_name + ON ccu.constraint_name = tc.constraint_name WHERE tc.constraint_type = 'FOREIGN KEY' """ if table_name: query += f" AND (tc.table_name = '{table_name}' OR ccu.table_name = '{table_name}')" - + else: # MySQL and SQLite have different approaches for foreign keys return relationships - + try: cursor.execute(query) fks = cursor.fetchall() @@ -610,8 +629,9 @@ def _get_relationships_data(self, cursor, table_name: Optional[str] = None) -> L "referenced_column": fk[4] }) except Exception as e: - log_event(f"[SQLSchemaPlugin] Error getting relationships: {e}") - + debug_print(f"[SQLSchemaPlugin] Error executing relationships query: {str(e)}") + log_event(f"[SQLSchemaPlugin] Error getting relationships", extra={"error": str(e)}) + return relationships def __del__(self):