Files
PyMTA-server/email_frontend/blueprint.py
2025-06-07 10:48:03 +01:00

647 lines
23 KiB
Python

"""
Flask Blueprint for Email Server Management Frontend
This module provides a comprehensive web interface for managing the SMTP server:
- Domain management
- User authentication and authorization
- DKIM key management with DNS record verification
- Server settings configuration
- Email logs and monitoring
Security features:
- Authentication management per domain
- IP whitelisting capabilities
- SPF and DKIM DNS validation
"""
from flask import Blueprint, render_template, request, redirect, url_for, flash, jsonify
import socket
import requests
import dns.resolver
import re
from datetime import datetime
from typing import Optional, Dict, List, Tuple
# Import email server modules
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from email_server.models import (
Session, Domain, User, WhitelistedIP, DKIMKey, CustomHeader, EmailLog, AuthLog,
hash_password, create_tables, get_user_by_email, get_domain_by_name, get_whitelisted_ip
)
from email_server.dkim_manager import DKIMManager
from email_server.settings_loader import load_settings, generate_settings_ini, SETTINGS_PATH
from email_server.tool_box import get_logger
logger = get_logger()
# Create Blueprint
email_bp = Blueprint('email', __name__,
template_folder='templates',
static_folder='static',
url_prefix='/email')
def get_public_ip() -> str:
"""Get the public IP address of the server."""
try:
response = requests.get('https://api.ipify.org', timeout=5)
return response.text.strip()
except Exception:
try:
# Fallback method
response = requests.get('https://httpbin.org/ip', timeout=5)
return response.json()['origin'].split(',')[0].strip()
except Exception:
return 'unknown'
def check_dns_record(domain: str, record_type: str, expected_value: str = None) -> Dict:
"""Check DNS record for a domain."""
try:
answers = dns.resolver.resolve(domain, record_type)
records = [str(answer) for answer in answers]
if expected_value:
found = any(expected_value in record for record in records)
return {
'success': True,
'found': found,
'records': records,
'message': f"Record {'found' if found else 'not found'}"
}
else:
return {
'success': True,
'records': records,
'message': f"Found {len(records)} {record_type} record(s)"
}
except dns.resolver.NXDOMAIN:
return {'success': False, 'message': 'Domain not found'}
except dns.resolver.NoAnswer:
return {'success': False, 'message': f'No {record_type} records found'}
except Exception as e:
return {'success': False, 'message': f'DNS lookup error: {str(e)}'}
def generate_spf_record(domain: str, public_ip: str, existing_spf: str = None) -> str:
"""Generate SPF record including the current server IP."""
base_mechanisms = []
if existing_spf:
# Parse existing SPF record
spf_clean = existing_spf.replace('"', '').strip()
if spf_clean.startswith('v=spf1'):
parts = spf_clean.split()
base_mechanisms = [part for part in parts[1:] if not part.startswith('ip4:') and part != 'all' and part != '-all' and part != '~all']
# Add our server IP
our_ip = f"ip4:{public_ip}"
if our_ip not in base_mechanisms:
base_mechanisms.append(our_ip)
# Construct SPF record
spf_parts = ['v=spf1'] + base_mechanisms + ['~all']
return ' '.join(spf_parts)
@email_bp.route('/')
def dashboard():
"""Main dashboard showing overview of the email server."""
session = Session()
try:
# Get counts
domain_count = session.query(Domain).filter_by(is_active=True).count()
user_count = session.query(User).filter_by(is_active=True).count()
dkim_count = session.query(DKIMKey).filter_by(is_active=True).count()
# Get recent email logs
recent_emails = session.query(EmailLog).order_by(EmailLog.created_at.desc()).limit(10).all()
# Get recent auth logs
recent_auths = session.query(AuthLog).order_by(AuthLog.created_at.desc()).limit(10).all()
return render_template('dashboard.html',
domain_count=domain_count,
user_count=user_count,
dkim_count=dkim_count,
recent_emails=recent_emails,
recent_auths=recent_auths)
finally:
session.close()
@email_bp.route('/domains')
def domains_list():
"""List all domains."""
session = Session()
try:
domains = session.query(Domain).order_by(Domain.domain_name).all()
return render_template('domains.html', domains=domains)
finally:
session.close()
@email_bp.route('/domains/add', methods=['GET', 'POST'])
def add_domain():
"""Add new domain."""
if request.method == 'POST':
domain_name = request.form.get('domain_name', '').strip().lower()
if not domain_name:
flash('Domain name is required', 'error')
return redirect(url_for('email.add_domain'))
session = Session()
try:
# Check if domain already exists
existing = session.query(Domain).filter_by(domain_name=domain_name).first()
if existing:
flash(f'Domain {domain_name} already exists', 'error')
return redirect(url_for('email.domains_list'))
# Create domain
domain = Domain(domain_name=domain_name)
session.add(domain)
session.commit()
# Generate DKIM key for the domain
dkim_manager = DKIMManager()
dkim_manager.generate_dkim_keypair(domain_name)
flash(f'Domain {domain_name} added successfully with DKIM key', 'success')
return redirect(url_for('email.domains_list'))
except Exception as e:
session.rollback()
logger.error(f"Error adding domain: {e}")
flash(f'Error adding domain: {str(e)}', 'error')
return redirect(url_for('email.add_domain'))
finally:
session.close()
return render_template('add_domain.html')
@email_bp.route('/domains/<int:domain_id>/delete', methods=['POST'])
def delete_domain(domain_id: int):
"""Delete domain."""
session = Session()
try:
domain = session.query(Domain).get(domain_id)
if not domain:
flash('Domain not found', 'error')
return redirect(url_for('email.domains_list'))
domain_name = domain.domain_name
domain.is_active = False
session.commit()
flash(f'Domain {domain_name} deactivated', 'success')
return redirect(url_for('email.domains_list'))
except Exception as e:
session.rollback()
logger.error(f"Error deleting domain: {e}")
flash(f'Error deleting domain: {str(e)}', 'error')
return redirect(url_for('email.domains_list'))
finally:
session.close()
@email_bp.route('/users')
def users_list():
"""List all users."""
session = Session()
try:
users = session.query(User, Domain).join(Domain, User.domain_id == Domain.id).order_by(User.email).all()
return render_template('users.html', users=users)
finally:
session.close()
@email_bp.route('/users/add', methods=['GET', 'POST'])
def add_user():
"""Add new user."""
session = Session()
try:
domains = session.query(Domain).filter_by(is_active=True).order_by(Domain.domain_name).all()
if request.method == 'POST':
email = request.form.get('email', '').strip().lower()
password = request.form.get('password', '').strip()
domain_id = request.form.get('domain_id', type=int)
can_send_as_domain = request.form.get('can_send_as_domain') == 'on'
if not all([email, password, domain_id]):
flash('All fields are required', 'error')
return redirect(url_for('email.add_user'))
# Validate email format
if '@' not in email:
flash('Invalid email format', 'error')
return redirect(url_for('email.add_user'))
# Check if user already exists
existing = session.query(User).filter_by(email=email).first()
if existing:
flash(f'User {email} already exists', 'error')
return redirect(url_for('email.users_list'))
# Create user
user = User(
email=email,
password_hash=hash_password(password),
domain_id=domain_id,
can_send_as_domain=can_send_as_domain
)
session.add(user)
session.commit()
flash(f'User {email} added successfully', 'success')
return redirect(url_for('email.users_list'))
return render_template('add_user.html', domains=domains)
except Exception as e:
session.rollback()
logger.error(f"Error adding user: {e}")
flash(f'Error adding user: {str(e)}', 'error')
return redirect(url_for('email.add_user'))
finally:
session.close()
@email_bp.route('/users/<int:user_id>/delete', methods=['POST'])
def delete_user(user_id: int):
"""Delete user."""
session = Session()
try:
user = session.query(User).get(user_id)
if not user:
flash('User not found', 'error')
return redirect(url_for('email.users_list'))
user_email = user.email
user.is_active = False
session.commit()
flash(f'User {user_email} deactivated', 'success')
return redirect(url_for('email.users_list'))
except Exception as e:
session.rollback()
logger.error(f"Error deleting user: {e}")
flash(f'Error deleting user: {str(e)}', 'error')
return redirect(url_for('email.users_list'))
finally:
session.close()
@email_bp.route('/ips')
def ips_list():
"""List all whitelisted IPs."""
session = Session()
try:
ips = session.query(WhitelistedIP, Domain).join(Domain, WhitelistedIP.domain_id == Domain.id).order_by(WhitelistedIP.ip_address).all()
return render_template('ips.html', ips=ips)
finally:
session.close()
@email_bp.route('/ips/add', methods=['GET', 'POST'])
def add_ip():
"""Add new whitelisted IP."""
session = Session()
try:
domains = session.query(Domain).filter_by(is_active=True).order_by(Domain.domain_name).all()
if request.method == 'POST':
ip_address = request.form.get('ip_address', '').strip()
domain_id = request.form.get('domain_id', type=int)
if not all([ip_address, domain_id]):
flash('All fields are required', 'error')
return redirect(url_for('email.add_ip'))
# Basic IP validation
try:
socket.inet_aton(ip_address)
except socket.error:
flash('Invalid IP address format', 'error')
return redirect(url_for('email.add_ip'))
# Check if IP already exists for this domain
existing = session.query(WhitelistedIP).filter_by(ip_address=ip_address, domain_id=domain_id).first()
if existing:
flash(f'IP {ip_address} already whitelisted for this domain', 'error')
return redirect(url_for('email.ips_list'))
# Create whitelisted IP
whitelist = WhitelistedIP(
ip_address=ip_address,
domain_id=domain_id
)
session.add(whitelist)
session.commit()
flash(f'IP {ip_address} added to whitelist', 'success')
return redirect(url_for('email.ips_list'))
return render_template('add_ip.html', domains=domains)
except Exception as e:
session.rollback()
logger.error(f"Error adding IP: {e}")
flash(f'Error adding IP: {str(e)}', 'error')
return redirect(url_for('email.add_ip'))
finally:
session.close()
@email_bp.route('/ips/<int:ip_id>/delete', methods=['POST'])
def delete_ip(ip_id: int):
"""Delete whitelisted IP."""
session = Session()
try:
ip_record = session.query(WhitelistedIP).get(ip_id)
if not ip_record:
flash('IP record not found', 'error')
return redirect(url_for('email.ips_list'))
ip_address = ip_record.ip_address
ip_record.is_active = False
session.commit()
flash(f'IP {ip_address} removed from whitelist', 'success')
return redirect(url_for('email.ips_list'))
except Exception as e:
session.rollback()
logger.error(f"Error deleting IP: {e}")
flash(f'Error deleting IP: {str(e)}', 'error')
return redirect(url_for('email.ips_list'))
finally:
session.close()
@email_bp.route('/dkim')
def dkim_list():
"""List all DKIM keys and DNS records."""
session = Session()
try:
dkim_keys = session.query(DKIMKey, Domain).join(Domain, DKIMKey.domain_id == Domain.id).order_by(Domain.domain_name).all()
# Get public IP for SPF records
public_ip = get_public_ip()
# Prepare DKIM data with DNS information
dkim_data = []
for dkim_key, domain in dkim_keys:
# Get DKIM DNS record
dkim_manager = DKIMManager()
dns_record = dkim_manager.get_dkim_public_key_record(domain.domain_name)
# Check existing SPF record
spf_check = check_dns_record(domain.domain_name, 'TXT')
existing_spf = None
if spf_check['success']:
for record in spf_check['records']:
if 'v=spf1' in record:
existing_spf = record
break
# Generate recommended SPF
recommended_spf = generate_spf_record(domain.domain_name, public_ip, existing_spf)
dkim_data.append({
'dkim_key': dkim_key,
'domain': domain,
'dns_record': dns_record,
'existing_spf': existing_spf,
'recommended_spf': recommended_spf,
'public_ip': public_ip
})
return render_template('dkim.html', dkim_data=dkim_data)
finally:
session.close()
@email_bp.route('/dkim/<int:domain_id>/regenerate', methods=['POST'])
def regenerate_dkim(domain_id: int):
"""Regenerate DKIM key for domain."""
session = Session()
try:
domain = session.query(Domain).get(domain_id)
if not domain:
flash('Domain not found', 'error')
return redirect(url_for('email.dkim_list'))
# Deactivate existing keys
existing_keys = session.query(DKIMKey).filter_by(domain_id=domain_id, is_active=True).all()
for key in existing_keys:
key.is_active = False
# Generate new DKIM key
dkim_manager = DKIMManager()
if dkim_manager.generate_dkim_keypair(domain.domain_name):
session.commit()
flash(f'DKIM key regenerated for {domain.domain_name}', 'success')
else:
session.rollback()
flash(f'Failed to regenerate DKIM key for {domain.domain_name}', 'error')
return redirect(url_for('email.dkim_list'))
except Exception as e:
session.rollback()
logger.error(f"Error regenerating DKIM: {e}")
flash(f'Error regenerating DKIM: {str(e)}', 'error')
return redirect(url_for('email.dkim_list'))
finally:
session.close()
@email_bp.route('/dkim/<int:dkim_id>/update_selector', methods=['POST'])
def update_dkim_selector(dkim_id: int):
"""Update DKIM selector name."""
new_selector = request.form.get('selector', '').strip()
if not new_selector:
flash('Selector name is required', 'error')
return redirect(url_for('email.dkim_list'))
# Validate selector (alphanumeric only)
if not re.match(r'^[a-zA-Z0-9]+$', new_selector):
flash('Selector must contain only letters and numbers', 'error')
return redirect(url_for('email.dkim_list'))
session = Session()
try:
dkim_key = session.query(DKIMKey).get(dkim_id)
if not dkim_key:
flash('DKIM key not found', 'error')
return redirect(url_for('email.dkim_list'))
old_selector = dkim_key.selector
dkim_key.selector = new_selector
session.commit()
flash(f'DKIM selector updated from {old_selector} to {new_selector}', 'success')
return redirect(url_for('email.dkim_list'))
except Exception as e:
session.rollback()
logger.error(f"Error updating DKIM selector: {e}")
flash(f'Error updating DKIM selector: {str(e)}', 'error')
return redirect(url_for('email.dkim_list'))
finally:
session.close()
@email_bp.route('/dkim/check_dns', methods=['POST'])
def check_dkim_dns():
"""Check DKIM DNS record via AJAX."""
domain = request.form.get('domain')
selector = request.form.get('selector')
expected_value = request.form.get('expected_value')
if not all([domain, selector, expected_value]):
return jsonify({'success': False, 'message': 'Missing parameters'})
dns_name = f"{selector}._domainkey.{domain}"
result = check_dns_record(dns_name, 'TXT', expected_value)
return jsonify(result)
@email_bp.route('/dkim/check_spf', methods=['POST'])
def check_spf_dns():
"""Check SPF DNS record via AJAX."""
domain = request.form.get('domain')
if not domain:
return jsonify({'success': False, 'message': 'Domain is required'})
result = check_dns_record(domain, 'TXT')
# Look for SPF record
spf_record = None
if result['success']:
for record in result['records']:
if 'v=spf1' in record:
spf_record = record
break
if spf_record:
result['spf_record'] = spf_record
result['message'] = 'SPF record found'
else:
result['success'] = False
result['message'] = 'No SPF record found'
return jsonify(result)
@email_bp.route('/settings')
def settings():
"""Display and edit server settings."""
settings = load_settings()
return render_template('settings.html', settings=settings)
@email_bp.route('/settings/update', methods=['POST'])
def update_settings():
"""Update server settings."""
try:
# Load current settings
config = load_settings()
# Update settings from form
for section_name in config.sections():
for key in config[section_name]:
if not key.startswith(';'): # Skip comment lines
form_key = f"{section_name}.{key}"
if form_key in request.form:
config.set(section_name, key, request.form[form_key])
# Save settings
with open(SETTINGS_PATH, 'w') as f:
config.write(f)
flash('Settings updated successfully. Restart the server to apply changes.', 'success')
return redirect(url_for('email.settings'))
except Exception as e:
logger.error(f"Error updating settings: {e}")
flash(f'Error updating settings: {str(e)}', 'error')
return redirect(url_for('email.settings'))
@email_bp.route('/logs')
def logs():
"""Display email and authentication logs."""
session = Session()
try:
# Get filter parameters
filter_type = request.args.get('type', 'all')
page = request.args.get('page', 1, type=int)
per_page = 50
if filter_type == 'emails':
# Email logs only
total_query = session.query(EmailLog)
logs_query = session.query(EmailLog).order_by(EmailLog.created_at.desc())
elif filter_type == 'auth':
# Auth logs only
total_query = session.query(AuthLog)
logs_query = session.query(AuthLog).order_by(AuthLog.created_at.desc())
else:
# Combined view (default)
email_logs = session.query(EmailLog).order_by(EmailLog.created_at.desc()).limit(per_page//2).all()
auth_logs = session.query(AuthLog).order_by(AuthLog.created_at.desc()).limit(per_page//2).all()
# Convert to unified format
combined_logs = []
for log in email_logs:
combined_logs.append({
'type': 'email',
'timestamp': log.created_at,
'data': log
})
for log in auth_logs:
combined_logs.append({
'type': 'auth',
'timestamp': log.created_at,
'data': log
})
# Sort by timestamp
combined_logs.sort(key=lambda x: x['timestamp'], reverse=True)
return render_template('logs.html',
logs=combined_logs[:per_page],
filter_type=filter_type,
page=page,
has_next=len(combined_logs) > per_page,
has_prev=page > 1)
# Pagination for single type logs
offset = (page - 1) * per_page
total = total_query.count()
logs = logs_query.offset(offset).limit(per_page).all()
has_next = offset + per_page < total
has_prev = page > 1
return render_template('logs.html',
logs=logs,
filter_type=filter_type,
page=page,
has_next=has_next,
has_prev=has_prev)
finally:
session.close()
# Error handlers
@email_bp.errorhandler(404)
def not_found(error):
"""Handle 404 errors."""
from datetime import datetime
return render_template('error.html',
error_code=404,
error_message='Page not found',
current_time=datetime.now()), 404
@email_bp.errorhandler(500)
def internal_error(error):
"""Handle 500 errors."""
from datetime import datetime
logger.error(f"Internal error: {error}")
return render_template('error.html',
error_code=500,
error_message='Internal server error',
current_time=datetime.now()), 500