first commit

This commit is contained in:
ghostersk
2025-05-25 20:26:18 +01:00
commit 5375ef6121
77 changed files with 9073 additions and 0 deletions
+23
View File
@@ -0,0 +1,23 @@
"""
Utils package for the Domain Logons application.
This package contains utility modules for database logging, timezone handling,
rate limiting, security headers, and health checks.
"""
# Import commonly used functions for easy access
from .toolbox import (
get_app_timezone,
get_current_timestamp,
get_utc_timestamp,
convert_to_app_timezone,
format_timestamp_for_display
)
__all__ = [
'get_app_timezone',
'get_current_timestamp',
'get_utc_timestamp',
'convert_to_app_timezone',
'format_timestamp_for_display'
]
+267
View File
@@ -0,0 +1,267 @@
import logging
import traceback
import uuid
from datetime import datetime, timezone
from flask import request, g, has_request_context
from extensions import db
from api.models import ErrorLog
import threading
import pytz
from .toolbox import get_app_timezone, get_current_timestamp
class DatabaseLogHandler(logging.Handler):
"""
Custom logging handler that stores log records in the database.
Configurable logging level via database settings.
"""
def __init__(self):
super().__init__()
self.setLevel(logging.WARNING) # Default level, will be updated from config
self._app = None # Store app reference
self._processed_records = set() # Track processed records to avoid duplicates
self._max_cache_size = 1000 # Limit cache size to prevent memory issues
def emit(self, record):
"""
Emit a log record to the database.
This runs in a separate thread to avoid blocking the main application.
"""
# Import filter functions
from .toolbox import get_filtered_loggers, get_filtered_message_patterns
# Get configured filters
filtered_loggers = get_filtered_loggers()
filtered_patterns = get_filtered_message_patterns()
# Skip database logging for filtered loggers to prevent feedback loops
if record.name in filtered_loggers:
return
# Also filter out messages containing specific patterns
message = record.getMessage().lower()
for pattern in filtered_patterns:
if pattern.lower() in message:
return
# Create a unique identifier for this record to prevent duplicates
record_id = (
record.name,
record.levelname,
record.getMessage(),
record.created,
getattr(record, 'pathname', ''),
getattr(record, 'lineno', 0)
)
# Check if we've already processed this exact record
if record_id in self._processed_records:
return
# Add to processed records cache
self._processed_records.add(record_id)
# Clean cache if it gets too large
if len(self._processed_records) > self._max_cache_size:
# Remove oldest half of entries (simple cleanup)
self._processed_records = set(list(self._processed_records)[self._max_cache_size//2:])
# Store the app reference if we have an application context
if not self._app:
try:
from flask import current_app
self._app = current_app._get_current_object()
except RuntimeError:
# No application context available, try to import app
try:
from app import app
self._app = app
except ImportError:
pass
# Use a thread to avoid blocking the main application
threading.Thread(target=self._emit_to_db, args=(record,), daemon=True).start()
def _emit_to_db(self, record):
"""
Actually write the log record to the database.
This method runs in a separate thread.
"""
try:
# Use the stored app reference or try to get it
app = self._app
if not app:
try:
from flask import current_app
app = current_app._get_current_object()
except RuntimeError:
# No application context, try to import app
try:
from app import app
except ImportError:
print("Could not import app for database logging")
return
with app.app_context():
# Extract request information if available
request_id = None
user_id = None
remote_addr = None
if has_request_context():
try:
request_id = getattr(g, 'request_id', str(uuid.uuid4())[:8])
user_id = getattr(g, 'user_id', None)
remote_addr = request.remote_addr
except Exception:
# If we can't get request context, continue without it
pass
# Format exception info if present
exception_text = None
if record.exc_info:
exception_text = ''.join(traceback.format_exception(*record.exc_info))
# Create error log entry
error_log = ErrorLog(
level=record.levelname,
logger_name=record.name,
message=self.format(record),
timestamp=get_current_timestamp(),
pathname=record.pathname if hasattr(record, 'pathname') else None,
lineno=record.lineno if hasattr(record, 'lineno') else None,
request_id=request_id,
user_id=user_id,
remote_addr=remote_addr,
exception=exception_text
)
db.session.add(error_log)
db.session.commit()
except Exception as e:
# If database logging fails, fall back to console logging
# Don't raise the exception to avoid breaking the application
print(f"Failed to log to database: {e}")
def setup_database_logging(app):
"""
Set up database logging for the Flask application.
"""
# Create and configure the database handler
db_handler = DatabaseLogHandler()
db_handler._app = app # Store app reference
# Set initial logging level from settings (will be updated dynamically)
update_logging_level(app, db_handler)
# Set a formatter for the database logs
formatter = logging.Formatter(
'%(name)s - %(levelname)s - %(message)s'
)
db_handler.setFormatter(formatter)
# Store handler reference in app for dynamic updates
app.db_handler = db_handler
# Only add to root logger to avoid duplicate logging
# (app.logger propagates to root logger by default)
root_logger = logging.getLogger()
# Check if we already have this handler to avoid duplicates
handler_exists = False
for handler in root_logger.handlers:
if isinstance(handler, DatabaseLogHandler):
handler_exists = True
break
if not handler_exists:
root_logger.addHandler(db_handler)
# Add request ID to flask request context
@app.before_request
def add_request_id():
g.request_id = str(uuid.uuid4())[:8]
if hasattr(g, 'api_key') and g.api_key and g.api_key.user_id:
g.user_id = g.api_key.user_id
elif hasattr(g, 'current_user') and g.current_user and hasattr(g.current_user, 'id'):
g.user_id = g.current_user.id
app.logger.info("Database logging initialized")
def update_logging_level(app, db_handler=None):
"""
Update the database logging level from settings.
"""
try:
with app.app_context():
from auth.models import Settings
settings = Settings.query.first()
if settings and hasattr(settings, 'log_level'):
# Convert string to logging level
level_map = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}
new_level = level_map.get(settings.log_level, logging.WARNING)
# Update the handler if provided
if db_handler:
db_handler.setLevel(new_level)
# Or get it from app if stored
elif hasattr(app, 'db_handler'):
app.db_handler.setLevel(new_level)
app.logger.info(f"Database logging level updated to: {settings.log_level}")
else:
# Default to WARNING if no setting found
if db_handler:
db_handler.setLevel(logging.WARNING)
elif hasattr(app, 'db_handler'):
app.db_handler.setLevel(logging.WARNING)
except Exception as e:
print(f"Failed to update logging level: {e}")
def get_available_log_levels():
"""
Get list of available logging levels for admin configuration.
"""
return [
('DEBUG', 'Debug - All messages'),
('INFO', 'Info - General information and above'),
('WARNING', 'Warning - Warnings and above (default)'),
('ERROR', 'Error - Errors and above'),
('CRITICAL', 'Critical - Only critical errors')
]
def log_error(message, exception=None, level=logging.ERROR):
"""
Convenience function to log errors to the database.
Args:
message: Error message
exception: Exception object (optional)
level: Logging level (default: ERROR)
"""
logger = logging.getLogger(__name__)
if exception:
logger.log(level, message, exc_info=True)
else:
logger.log(level, message)
def log_warning(message):
"""Convenience function to log warnings."""
log_error(message, level=logging.WARNING)
def log_critical(message, exception=None):
"""Convenience function to log critical errors."""
log_error(message, exception, level=logging.CRITICAL)
View File
+228
View File
@@ -0,0 +1,228 @@
import time
import logging
from functools import wraps
from flask import request, jsonify, g
import redis
from werkzeug.exceptions import TooManyRequests
import os
logger = logging.getLogger(__name__)
# Configure Redis connection (if available)
def configure_redis(config=None):
"""Configure Redis connection from config"""
global redis_client
redis_url = None
if config and config.has_option('rate_limiting', 'REDIS_URL'):
redis_url = config.get('rate_limiting', 'REDIS_URL')
if not redis_url:
redis_url = os.environ.get('REDIS_URL', None)
if redis_url:
try:
redis_client = redis.from_url(redis_url)
redis_client.ping() # Test connection
logger.info("Redis connected for rate limiting")
except Exception as e:
logger.warning(f"Redis connection failed for rate limiting: {str(e)}")
redis_client = None
else:
logger.info("No Redis URL configured, using in-memory rate limiting")
# Initialize Redis to None
redis_client = None
# In-memory rate limit storage (fallback if Redis is not available)
rate_limit_storage = {}
def rate_limit(limit=60, per=60, scope_func=None):
"""
Rate limiting decorator for routes.
Args:
limit (int): Maximum number of requests allowed in the time period
per (int): Time period in seconds
scope_func (callable): Function to determine rate limit scope (default: by IP)
Example usage:
@app.route('/api/endpoint')
@rate_limit(limit=10, per=60) # 10 requests per minute
def api_endpoint():
return jsonify({"status": "success"})
"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
# Get scope key (default to IP address)
if scope_func:
scope = scope_func()
else:
# Default to client IP
scope = get_remote_address()
# Create a unique key for this route and scope
key = f"rate_limit:{request.path}:{scope}"
# Check rate limit
current = get_rate_limit_value(key)
# If this is a new key or expired, initialize it
if current is None:
set_rate_limit_value(key, 1, per)
current = 1
else:
# Increment counter
current = increment_rate_limit_value(key)
# Add rate limit headers
g.rate_limit_headers = {
'X-RateLimit-Limit': str(limit),
'X-RateLimit-Remaining': str(max(0, limit - current)),
'X-RateLimit-Reset': str(int(time.time() + get_ttl(key)))
}
# If over limit, return 429 Too Many Requests
if current > limit:
logger.warning(f"Rate limit exceeded for {scope} on {request.path}")
response = jsonify({
'error': 'Too many requests',
'message': f'Rate limit of {limit} requests per {per} seconds exceeded'
})
response.status_code = 429
# Add rate limit headers to response
for key, value in g.rate_limit_headers.items():
response.headers[key] = value
return response
# Execute the original route function
response = f(*args, **kwargs)
# Convert string response to Response object if needed
from flask import make_response
if isinstance(response, str):
response = make_response(response)
# Add rate limit headers to response
if hasattr(response, 'headers'):
for key, value in g.rate_limit_headers.items():
response.headers[key] = value
return response
return wrapped
return decorator
def get_remote_address():
"""Get the client's IP address, respecting proxy headers"""
# Check X-Forwarded-For header (used by Traefik and other proxies)
if request.headers.get('X-Forwarded-For'):
# Get the first IP in the chain (original client IP)
forwarded_for = request.headers.get('X-Forwarded-For').split(',')[0].strip()
logger.debug(f"Using X-Forwarded-For IP: {forwarded_for} (from: {request.headers.get('X-Forwarded-For')})")
return forwarded_for
# Check X-Real-IP header (alternative proxy header)
if request.headers.get('X-Real-IP'):
real_ip = request.headers.get('X-Real-IP').strip()
logger.debug(f"Using X-Real-IP: {real_ip}")
return real_ip
# Fallback to direct connection IP
remote_addr = request.remote_addr
logger.debug(f"Using direct remote_addr: {remote_addr}")
return remote_addr
# Redis implementations
def get_rate_limit_value(key):
"""Get current rate limit counter value"""
if redis_client:
value = redis_client.get(key)
return int(value) if value else None
else:
# Fallback to in-memory storage
if key in rate_limit_storage:
# Check if expired
if time.time() > rate_limit_storage[key]['expires']:
del rate_limit_storage[key]
return None
return rate_limit_storage[key]['value']
return None
def set_rate_limit_value(key, value, ttl):
"""Set rate limit counter with TTL"""
if redis_client:
redis_client.setex(key, ttl, value)
else:
# Fallback to in-memory storage
rate_limit_storage[key] = {
'value': value,
'expires': time.time() + ttl
}
def increment_rate_limit_value(key):
"""Increment rate limit counter"""
if redis_client:
return redis_client.incr(key)
else:
# Fallback to in-memory storage
if key in rate_limit_storage:
rate_limit_storage[key]['value'] += 1
return rate_limit_storage[key]['value']
return 1
def get_ttl(key):
"""Get remaining TTL for a key"""
if redis_client:
ttl = redis_client.ttl(key)
return max(0, ttl)
else:
# Fallback to in-memory storage
if key in rate_limit_storage:
return max(0, rate_limit_storage[key]['expires'] - time.time())
return 0
# Utility to apply rate limits to entire blueprints
def apply_rate_limits(app, config=None):
"""Apply rate limits to sensitive routes"""
if not config:
return
# Check if rate limiting is enabled
if not config.getboolean('rate_limiting', 'ENABLE_RATE_LIMITING', fallback=True):
return
# Configure Redis connection
configure_redis(config)
# Get rate limiting configuration
login_limit = config.getint('rate_limiting', 'LOGIN_LIMIT', fallback=10)
login_period = config.getint('rate_limiting', 'LOGIN_PERIOD', fallback=60)
register_limit = config.getint('rate_limiting', 'REGISTER_LIMIT', fallback=5)
register_period = config.getint('rate_limiting', 'REGISTER_PERIOD', fallback=300)
api_limit = config.getint('rate_limiting', 'API_LIMIT', fallback=60)
api_period = config.getint('rate_limiting', 'API_PERIOD', fallback=60)
# Login endpoint
if 'auth.login' in app.view_functions:
app.view_functions['auth.login'] = rate_limit(
limit=login_limit, per=login_period
)(app.view_functions['auth.login'])
# Register endpoint
if 'auth.register' in app.view_functions:
app.view_functions['auth.register'] = rate_limit(
limit=register_limit, per=register_period
)(app.view_functions['auth.register'])
# API endpoints
api_routes = [route for route in app.url_map.iter_rules()
if route.rule.startswith('/api/')]
for route in api_routes:
if route.endpoint in app.view_functions:
app.view_functions[route.endpoint] = rate_limit(
limit=api_limit, per=api_period
)(app.view_functions[route.endpoint])
+67
View File
@@ -0,0 +1,67 @@
import os
import logging
from flask import request, current_app
logger = logging.getLogger(__name__)
def add_security_headers(response):
"""
Add security headers to HTTP responses
Headers added:
- Content-Security-Policy: Prevents XSS attacks by specifying content sources
- X-Content-Type-Options: Prevents MIME type sniffing
- X-Frame-Options: Prevents clickjacking
- X-XSS-Protection: Additional XSS protection for older browsers
- Referrer-Policy: Controls referrer information
- Strict-Transport-Security: Enforces HTTPS
- Permissions-Policy: Controls browser features
"""
# Content Security Policy - restricts sources of content
csp = current_app.config.get('CONTENT_SECURITY_POLICY',
"default-src 'self'; "
"script-src 'self' 'unsafe-inline'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:; "
"font-src 'self'; "
"connect-src 'self'; "
"frame-ancestors 'self'; "
"form-action 'self'; "
"base-uri 'self'"
)
response.headers['Content-Security-Policy'] = csp
# Prevent MIME type sniffing
response.headers['X-Content-Type-Options'] = 'nosniff'
# Prevent clickjacking
response.headers['X-Frame-Options'] = 'SAMEORIGIN'
# Additional XSS protection for older browsers
response.headers['X-XSS-Protection'] = '1; mode=block'
# Control referrer information
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
# HSTS - force HTTPS (only in production)
if not current_app.debug and not current_app.testing:
hsts_enabled = current_app.config.get('HSTS_ENABLED', True)
if hsts_enabled:
hsts_max_age = current_app.config.get('HSTS_MAX_AGE', 31536000)
response.headers['Strict-Transport-Security'] = f'max-age={hsts_max_age}; includeSubDomains'
# Permissions Policy (formerly Feature-Policy)
response.headers['Permissions-Policy'] = (
'camera=(), microphone=(), geolocation=(), interest-cohort=()'
)
return response
def setup_security_headers(app, config=None):
"""Register security headers middleware with Flask app"""
if config and config.getboolean('security', 'ENABLE_SECURITY_HEADERS', fallback=True):
# Set CSP from config if available
if config.has_option('security', 'CONTENT_SECURITY_POLICY'):
app.config['CONTENT_SECURITY_POLICY'] = config.get('security', 'CONTENT_SECURITY_POLICY')
app.after_request(add_security_headers)
+179
View File
@@ -0,0 +1,179 @@
"""
Timezone utility functions for the Domain Logons application.
This module provides timezone-related utilities that can be used throughout
the application without causing circular import issues.
"""
from datetime import datetime, timezone
import pytz
def get_app_timezone():
"""Get the application timezone from config."""
try:
# Try to get app from Flask context first
from flask import current_app
app = current_app._get_current_object()
if app and hasattr(app, 'config'):
tz_name = app.config.get('TIMEZONE', 'Europe/London')
return pytz.timezone(tz_name)
except RuntimeError:
# No application context, try to get config from os.environ or config file
try:
import configparser
import os
config = configparser.ConfigParser()
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config.ini')
if os.path.exists(config_path):
config.read(config_path)
tz_name = config.get('app', 'TIMEZONE', fallback='Europe/London')
return pytz.timezone(tz_name)
except Exception:
pass
except Exception:
pass
# Fallback to UTC
return pytz.UTC
def get_current_timestamp():
"""Get current timestamp in the application's configured timezone."""
app_tz = get_app_timezone()
# Use timezone.utc instead of deprecated utcnow()
utc_now = datetime.now(timezone.utc)
# Convert to application timezone
return utc_now.astimezone(app_tz).replace(tzinfo=None) # Store as naive datetime in app timezone
def get_utc_timestamp():
"""Get current UTC timestamp as timezone-aware datetime."""
return datetime.now(timezone.utc)
def convert_to_app_timezone(dt):
"""
Convert a datetime to the application's configured timezone.
Args:
dt: datetime object (can be naive or timezone-aware)
Returns:
datetime: timezone-aware datetime in application timezone
"""
app_tz = get_app_timezone()
if dt.tzinfo is None:
# If naive, assume it's already in the application timezone
return app_tz.localize(dt)
else:
# If timezone-aware, convert to application timezone
return dt.astimezone(app_tz)
def format_timestamp_for_display(dt):
"""
Format a datetime for display with timezone information.
Args:
dt: datetime object
Returns:
str: formatted timestamp string
"""
if dt is None:
return ""
# Convert to app timezone if needed
if dt.tzinfo is None:
# Assume naive datetime is already in app timezone
app_tz = get_app_timezone()
localized_dt = app_tz.localize(dt)
else:
# Convert timezone-aware datetime to app timezone
localized_dt = convert_to_app_timezone(dt)
return localized_dt.strftime('%Y-%m-%d %H:%M:%S %Z')
def get_filtered_loggers():
"""
Get list of logger names that should be filtered out of database logging.
This helps prevent feedback loops and reduces noise.
"""
default_filters = []
try:
# Try to get app from Flask context first
from flask import current_app
app = current_app._get_current_object()
if app and hasattr(app, 'config'):
# Get filters from app config (which loads from config.ini)
filter_string = app.config.get('DB_LOGGING_FILTERED_LOGGERS', '')
if filter_string:
return [logger.strip() for logger in filter_string.split(',') if logger.strip()]
except RuntimeError:
# No application context, try to read config directly
try:
import configparser
import os
config = configparser.ConfigParser()
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config.ini')
if os.path.exists(config_path):
config.read(config_path)
filter_string = config.get('logging', 'DB_LOGGING_FILTERED_LOGGERS', fallback='')
if filter_string:
return [logger.strip() for logger in filter_string.split(',') if logger.strip()]
except Exception:
pass
except Exception:
pass
# Fallback defaults if config not available
return [
'watchfiles.main',
'watchfiles.watcher',
'watchdog',
'uvicorn.access'
]
def get_filtered_message_patterns():
"""
Get list of message patterns that should be filtered out of database logging.
"""
default_patterns = []
try:
# Try to get app from Flask context first
from flask import current_app
app = current_app._get_current_object()
if app and hasattr(app, 'config'):
# Get patterns from app config (which loads from config.ini)
pattern_string = app.config.get('DB_LOGGING_FILTERED_PATTERNS', '')
if pattern_string:
return [pattern.strip() for pattern in pattern_string.split(',') if pattern.strip()]
except RuntimeError:
# No application context, try to read config directly
try:
import configparser
import os
config = configparser.ConfigParser()
config_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'config.ini')
if os.path.exists(config_path):
config.read(config_path)
pattern_string = config.get('logging', 'DB_LOGGING_FILTERED_PATTERNS', fallback='')
if pattern_string:
return [pattern.strip() for pattern in pattern_string.split(',') if pattern.strip()]
except Exception:
pass
except Exception:
pass
# Fallback defaults if config not available
return [
'database.db',
'instance/',
'file changed',
'reloading'
]