First, create a backup of the current working files:
# In project root
mkdir -p backups/$(date +%Y%m%d)
cp scripts/utils/db_utils.py backups/$(date +%Y%m%d)/
cp scripts/pipeline/text_processor.py backups/$(date +%Y%m%d)/
cp scripts/utils/preprocessing_logger.py backups/$(date +%Y%m%d)/
# scripts/utils/central_logger.py
import logging
from pathlib import Path
from typing import Optional
from datetime import datetime
class LoggerManager:
"""Centralized logging management system"""
_instance = None
_loggers = {}
def __new__(cls):
if cls._instance is None:
cls._instance = super(LoggerManager, cls).__new__(cls)
cls._instance._setup_base_config()
return cls._instance
def _setup_base_config(self):
"""Configure base logging settings"""
self.log_dir = Path("logs")
self.log_dir.mkdir(exist_ok=True)
# Base configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def get_logger(self, name: str) -> logging.Logger:
"""Get or create a logger with specified name"""
if name not in self._loggers:
logger = logging.getLogger(name)
self._setup_logger(logger, name)
self._loggers[name] = logger
return self._loggers[name]
def _setup_logger(self, logger: logging.Logger, name: str):
"""Setup individual logger configuration"""
# Create file handler
log_file = self.log_dir / f"{name}_{datetime.now():%Y%m%d}.log"
handler = logging.FileHandler(log_file)
# Create formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(handler)
# Global instance
logger_manager = LoggerManager()
# scripts/utils/db_manager.py
import sqlite3
from typing import Optional, Dict, Any
from contextlib import contextmanager
from central_logger import logger_manager
class DatabaseManager:
"""Enhanced database management with improved error handling"""
def __init__(self, db_path: str):
self.db_path = db_path
self.logger = logger_manager.get_logger('database')
@contextmanager
def get_connection(self):
"""Context manager for database connections"""
conn = None
try:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
yield conn
except Exception as e:
self.logger.error(f"Database connection error: {str(e)}")
raise
finally:
if conn:
conn.close()
def execute_query(self, query: str, params: tuple = ()) -> list:
"""Execute a query and return results"""
try:
with self.get_connection() as conn:
cursor = conn.execute(query, params)
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
self.logger.error(f"Query execution failed: {str(e)}")
raise
# scripts/pipeline/text_processor.py
from typing import Dict, Any, List
from db_manager import DatabaseManager
from central_logger import logger_manager
import re
class TextProcessor:
"""Enhanced text processing pipeline"""
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
self.logger = logger_manager.get_logger('text_processor')
def process_document(self, text: str) -> Dict[str, Any]:
"""Process a single document"""
try:
# Extract features
features = self._extract_features(text)
# Calculate metrics
metrics = self._calculate_metrics(features)
# Log processing completion
self.logger.info(f"Document processed successfully: {len(features)} features extracted")
return {
'features': features,
'metrics': metrics
}
except Exception as e:
self.logger.error(f"Document processing failed: {str(e)}")
raise
def _extract_features(self, text: str) -> Dict[str, Any]:
"""Extract features from text"""
return {
'length': len(text),
'word_count': len(text.split()),
'sentence_count': len(re.findall(r'[.!?]+', text))
}
def _calculate_metrics(self, features: Dict[str, Any]) -> Dict[str, float]:
"""Calculate metrics from features"""
try:
return {
'avg_word_length': features['length'] / features['word_count'] if features['word_count'] > 0 else 0,
'avg_sentence_length': features['word_count'] / features['sentence_count'] if features['sentence_count'] > 0 else 0
}
except Exception as e:
self.logger.error(f"Metrics calculation failed: {str(e)}")
raise
# tests/test_text_processor.py
import unittest
from scripts.pipeline.text_processor import TextProcessor
from scripts.utils.db_manager import DatabaseManager
class TestTextProcessor(unittest.TestCase):
def setUp(self):
self.db_manager = DatabaseManager(':memory:')
self.processor = TextProcessor(self.db_manager)
def test_process_document(self):
text = "This is a test document. It has two sentences."
result = self.processor.process_document(text)
self.assertIn('features', result)
self.assertIn('metrics', result)
self.assertEqual(result['features']['sentence_count'], 2)
# current/analysis_notebook.ipynb
from scripts.utils.db_manager import DatabaseManager
from scripts.pipeline.text_processor import TextProcessor
from scripts.utils.central_logger import logger_manager
# Initialize components
db_manager = DatabaseManager('path/to/database.db')
processor = TextProcessor(db_manager)
logger = logger_manager.get_logger('notebook')
# Process documents
try:
# Your analysis code here
results = processor.process_document(text)
# Log results
logger.info(f"Analysis completed successfully")
except Exception as e:
logger.error(f"Analysis failed: {str(e)}")