Introduction
The manage_database.py
script provides functionality for interacting with and managing the persistence
layer in the G.O.D Framework. This includes operations for creating, reading, updating, and deleting data, as well
as advanced query execution and connection pooling.
It abstracts away low-level database interactions and provides a high-level API for seamless integration with the application’s modules.
Purpose
The primary goals of this script are:
- To provide an abstraction layer for database operations, making it easier to interact with the application’s persistence layer.
- To manage database connections efficiently, ensuring scalability and robust handling of concurrent operations.
- To execute CRUD (Create, Read, Update, Delete) operations and custom queries.
- To maintain data integrity and consistency in line with business rules.
- To offer logging support to track all database interactions.
Key Features
- Connection Management: Manages a pool of database connections for efficient use of resources.
- CRUD Operations: Provides a simple yet powerful API for basic database operations.
- Transaction Support: Ensures atomicity and consistency by wrapping operations in transactions.
- Custom Query Execution: Supports running raw SQL queries or parameterized queries securely.
- Error Handling: Robust mechanisms for catching and logging database-related errors.
Logic and Implementation
The script follows a modular design to ensure easy adaptation to different types of databases (e.g., MySQL, PostgreSQL, MongoDB). Below is an outline of the implementation:
import logging
import psycopg2
from psycopg2 import pool
class DatabaseManager:
"""
Handles all database operations for the G.O.D Framework.
"""
def __init__(self, db_config):
"""
Initialize the database manager with connection pool.
Args:
db_config (dict): A dictionary containing database configuration.
"""
self.db_config = db_config
self.connection_pool = None
self.logger = logging.getLogger("DatabaseManager")
self.setup_logger()
self.initialize_connection_pool()
def setup_logger(self):
"""
Configures the logger for tracking database operations.
"""
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def initialize_connection_pool(self):
"""
Initializes the connection pool for the database.
"""
try:
self.connection_pool = psycopg2.pool.SimpleConnectionPool(
minconn=1,
maxconn=10,
user=self.db_config['user'],
password=self.db_config['password'],
host=self.db_config['host'],
port=self.db_config['port'],
database=self.db_config['database']
)
self.logger.info("Database connection pool initialized successfully.")
except Exception as e:
self.logger.error(f"Failed to initialize connection pool: {str(e)}")
raise
def execute_query(self, query, params=None):
"""
Executes a query and returns the resultset.
Args:
query (str): The SQL query to execute.
params (tuple): Parameters for the query, if any.
Returns:
list: A list of rows returned by the query.
"""
connection = None
try:
connection = self.connection_pool.getconn()
with connection.cursor() as cursor:
cursor.execute(query, params)
result = cursor.fetchall()
connection.commit()
return result
except Exception as e:
self.logger.error(f"Error executing query: {query}, Error: {str(e)}")
raise
finally:
if connection:
self.connection_pool.putconn(connection)
def close_pool(self):
"""
Closes all connections in the pool.
"""
try:
self.connection_pool.closeall()
self.logger.info("Database connection pool closed.")
except Exception as e:
self.logger.error(f"Error closing connection pool: {str(e)}")
# Example Usage
if __name__ == "__main__":
db_config = {
"user": "username",
"password": "password",
"host": "localhost",
"port": 5432,
"database": "god_db"
}
db_manager = DatabaseManager(db_config)
try:
query = "SELECT * FROM some_table WHERE id = %s"
results = db_manager.execute_query(query, (1,))
print(results)
finally:
db_manager.close_pool()
This example uses PostgreSQL, but the implementation is adaptable to other databases.
Dependencies
- psycopg2: A Python adapter for PostgreSQL.
- logging: For tracking database operations and error handling.
Integration with the G.O.D Framework
The manage_database.py
module is utilized by many other components that depend on database connectivity, including:
- ai_data_registry.py: For storing new datasets or annotations.
- ai_pipeline_audit_logger.py: Logs activities and outputs to the database.
- error_handler.py: Captures and logs database-related exceptions.
Future Enhancements
- Add support for multiple databases for a distributed design.
- Implement automatic synchronization for schema changes.
- Provide integration for NoSQL databases (e.g., MongoDB).
- Perform query optimization and caching for enhanced performance.
- Introduce a connection health check mechanism.