DKIM key management front end - ok

This commit is contained in:
nahakubuilde
2025-06-07 14:43:00 +01:00
parent ce0f7e0ac9
commit ed3d28d34e
17 changed files with 1030 additions and 949 deletions

View File

@@ -20,6 +20,7 @@ import requests
import dns.resolver
import re
from datetime import datetime
from datetime import datetime
from typing import Optional, Dict, List, Tuple
# Import email server modules
@@ -46,15 +47,32 @@ email_bp = Blueprint('email', __name__,
def get_public_ip() -> str:
"""Get the public IP address of the server."""
try:
response = requests.get('https://api.ipify.org', timeout=5)
return response.text.strip()
response1 = requests.get('https://ifconfig.me/ip', timeout=3, verify=False)
ip = response1.text.strip()
if ip and ip != 'unknown':
return ip
except Exception:
try:
# Fallback method
response = requests.get('https://httpbin.org/ip', timeout=5)
return response.json()['origin'].split(',')[0].strip()
except Exception:
return 'unknown'
response = requests.get('https://httpbin.org/ip', timeout=3, verify=False)
ip = response.json()['origin'].split(',')[0].strip()
if ip and ip != 'unknown':
return ip
except Exception as e:
pass
# Use fallback from settings.ini if available
try:
settings = load_settings()
fallback_ip = settings.get('DKIM', 'SPF_SERVER_IP', fallback=None)
if fallback_ip and fallback_ip.strip() and fallback_ip != '""':
# Check if it's a valid IPv4 address (basic check)
parts = fallback_ip.split('.')
if len(parts) == 4 and all(part.isdigit() and 0 <= int(part) <= 255 for part in parts):
return fallback_ip.strip()
except Exception as e:
return {'success': False, 'message': f'DNS lookup error, If it continues, consider setting up public IP in settings - SPF_SERVER_IP. Details: {str(e)}'}
def check_dns_record(domain: str, record_type: str, expected_value: str = None) -> Dict:
"""Check DNS record for a domain."""
@@ -94,10 +112,15 @@ def generate_spf_record(domain: str, public_ip: str, existing_spf: str = None) -
parts = spf_clean.split()
base_mechanisms = [part for part in parts[1:] if not part.startswith('ip4:') and part != 'all' and part != '-all' and part != '~all']
# Add our server IP
our_ip = f"ip4:{public_ip}"
if our_ip not in base_mechanisms:
base_mechanisms.append(our_ip)
# Add our server IP if it's not unknown
if public_ip and public_ip != 'unknown':
our_ip = f"ip4:{public_ip}"
if our_ip not in base_mechanisms:
base_mechanisms.append(our_ip)
# If no IP available, just use existing mechanisms
if not base_mechanisms and public_ip == 'unknown':
return existing_spf or 'v=spf1 ~all'
# Construct SPF record
spf_parts = ['v=spf1'] + base_mechanisms + ['~all']
@@ -712,14 +735,26 @@ def dkim_list():
"""List all DKIM keys and DNS records."""
session = Session()
try:
dkim_keys = session.query(DKIMKey, Domain).join(Domain, DKIMKey.domain_id == Domain.id).order_by(Domain.domain_name).all()
# Get active DKIM keys
active_dkim_keys = session.query(DKIMKey, Domain).join(
Domain, DKIMKey.domain_id == Domain.id
).filter(DKIMKey.is_active == True).order_by(Domain.domain_name).all()
# Get old/inactive DKIM keys (prioritize replaced keys over disabled ones)
old_dkim_keys = session.query(DKIMKey, Domain).join(
Domain, DKIMKey.domain_id == Domain.id
).filter(DKIMKey.is_active == False).order_by(
Domain.domain_name,
DKIMKey.replaced_at.desc().nullslast(), # Replaced keys first, then disabled ones
DKIMKey.created_at.desc()
).all()
# Get public IP for SPF records
public_ip = get_public_ip()
# Prepare DKIM data with DNS information
dkim_data = []
for dkim_key, domain in dkim_keys:
# Prepare active DKIM data with DNS information
active_dkim_data = []
for dkim_key, domain in active_dkim_keys:
# Get DKIM DNS record
dkim_manager = DKIMManager()
dns_record = dkim_manager.get_dkim_public_key_record(domain.domain_name)
@@ -736,7 +771,7 @@ def dkim_list():
# Generate recommended SPF
recommended_spf = generate_spf_record(domain.domain_name, public_ip, existing_spf)
dkim_data.append({
active_dkim_data.append({
'dkim_key': dkim_key,
'domain': domain,
'dns_record': dns_record,
@@ -745,7 +780,20 @@ def dkim_list():
'public_ip': public_ip
})
return render_template('dkim.html', dkim_data=dkim_data)
# Prepare old DKIM data with status information
old_dkim_data = []
for dkim_key, domain in old_dkim_keys:
old_dkim_data.append({
'dkim_key': dkim_key,
'domain': domain,
'public_ip': public_ip,
'is_replaced': dkim_key.replaced_at is not None,
'status_text': 'Replaced' if dkim_key.replaced_at else 'Disabled'
})
return render_template('dkim.html',
dkim_data=active_dkim_data,
old_dkim_data=old_dkim_data)
finally:
session.close()
@@ -756,21 +804,116 @@ def regenerate_dkim(domain_id: int):
try:
domain = session.query(Domain).get(domain_id)
if not domain:
if request.headers.get('Content-Type') == 'application/json':
return jsonify({'success': False, 'message': 'Domain not found'})
flash('Domain not found', 'error')
return redirect(url_for('email.dkim_list'))
# Deactivate existing keys
# Get the current active DKIM key's selector to preserve it
existing_keys = session.query(DKIMKey).filter_by(domain_id=domain_id, is_active=True).all()
current_selector = None
if existing_keys:
# Use the selector from the first active key (there should typically be only one)
current_selector = existing_keys[0].selector
# Mark existing keys as replaced
for key in existing_keys:
key.is_active = False
key.replaced_at = datetime.now() # Mark when this key was replaced
# Generate new DKIM key
# Generate new DKIM key preserving the existing selector
dkim_manager = DKIMManager()
if dkim_manager.generate_dkim_keypair(domain.domain_name):
if dkim_manager.generate_dkim_keypair(domain.domain_name, selector=current_selector, force_new_key=True):
session.commit()
# Get the new key data for AJAX response
new_key = session.query(DKIMKey).filter_by(
domain_id=domain_id, is_active=True
).order_by(DKIMKey.created_at.desc()).first()
if not new_key:
session.rollback()
if request.headers.get('Content-Type') == 'application/json' or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'message': f'Failed to create new DKIM key for {domain.domain_name}'})
flash(f'Failed to create new DKIM key for {domain.domain_name}', 'error')
return redirect(url_for('email.dkim_list'))
if request.headers.get('Content-Type') == 'application/json' or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
# Get updated DNS record for the new key
dns_record = dkim_manager.get_dkim_public_key_record(domain.domain_name)
public_ip = get_public_ip()
# Check existing SPF record
spf_check = check_dns_record(domain.domain_name, 'TXT')
existing_spf = None
if spf_check['success']:
for record in spf_check['records']:
if 'v=spf1' in record:
existing_spf = record
break
recommended_spf = generate_spf_record(domain.domain_name, public_ip, existing_spf)
# Get replaced keys for the Old DKIM section update
old_keys = session.query(DKIMKey, Domain).join(
Domain, DKIMKey.domain_id == Domain.id
).filter(
DKIMKey.domain_id == domain_id,
DKIMKey.is_active == False
).order_by(DKIMKey.created_at.desc()).all()
old_dkim_data = []
for old_key, old_domain in old_keys:
status_text = "Replaced" if old_key.replaced_at else "Disabled"
old_dkim_data.append({
'dkim_key': {
'id': old_key.id,
'selector': old_key.selector,
'created_at': old_key.created_at.strftime('%Y-%m-%d %H:%M'),
'replaced_at': old_key.replaced_at.strftime('%Y-%m-%d %H:%M') if old_key.replaced_at else None,
'is_active': old_key.is_active
},
'domain': {
'id': old_domain.id,
'domain_name': old_domain.domain_name
},
'status_text': status_text,
'public_ip': public_ip
})
# Additional null check for new_key before accessing its attributes
if not new_key:
logger.error(f"new_key is None after generation for domain {domain.domain_name}")
return jsonify({'success': False, 'message': f'Failed to retrieve new DKIM key for {domain.domain_name}'})
return jsonify({
'success': True,
'message': f'DKIM key regenerated for {domain.domain_name}',
'new_key': {
'id': new_key.id,
'selector': new_key.selector,
'created_at': new_key.created_at.strftime('%Y-%m-%d %H:%M'),
'is_active': new_key.is_active
},
'dns_record': {
'name': dns_record['name'] if dns_record else '',
'value': dns_record['value'] if dns_record else ''
},
'existing_spf': existing_spf,
'recommended_spf': recommended_spf,
'public_ip': public_ip,
'domain': {
'id': domain.id,
'domain_name': domain.domain_name
},
'old_dkim_data': old_dkim_data
})
flash(f'DKIM key regenerated for {domain.domain_name}', 'success')
else:
session.rollback()
if request.headers.get('Content-Type') == 'application/json' or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'message': f'Failed to regenerate DKIM key for {domain.domain_name}'})
flash(f'Failed to regenerate DKIM key for {domain.domain_name}', 'error')
return redirect(url_for('email.dkim_list'))
@@ -778,6 +921,8 @@ def regenerate_dkim(domain_id: int):
except Exception as e:
session.rollback()
logger.error(f"Error regenerating DKIM: {e}")
if request.headers.get('Content-Type') == 'application/json' or request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': False, 'message': f'Error regenerating DKIM: {str(e)}'})
flash(f'Error regenerating DKIM: {str(e)}', 'error')
return redirect(url_for('email.dkim_list'))
finally:
@@ -844,16 +989,25 @@ def toggle_dkim(dkim_id: int):
if not dkim_key:
flash('DKIM key not found', 'error')
return redirect(url_for('email.dkim_list'))
domain = session.query(Domain).get(dkim_key.domain_id)
old_status = dkim_key.is_active
if not old_status:
# About to activate this key, so deactivate any other active DKIM for this domain
other_active_keys = session.query(DKIMKey).filter(
DKIMKey.domain_id == dkim_key.domain_id,
DKIMKey.is_active == True,
DKIMKey.id != dkim_id
).all()
for key in other_active_keys:
key.is_active = False
key.replaced_at = datetime.now()
dkim_key.is_active = not old_status
if dkim_key.is_active:
dkim_key.replaced_at = None
session.commit()
status_text = "enabled" if dkim_key.is_active else "disabled"
flash(f'DKIM key for {domain.domain_name} (selector: {dkim_key.selector}) has been {status_text}', 'success')
return redirect(url_for('email.dkim_list'))
except Exception as e:
session.rollback()
logger.error(f"Error toggling DKIM status: {e}")
@@ -895,15 +1049,27 @@ def check_dkim_dns():
"""Check DKIM DNS record via AJAX."""
domain = request.form.get('domain')
selector = request.form.get('selector')
expected_value = request.form.get('expected_value')
if not all([domain, selector, expected_value]):
return jsonify({'success': False, 'message': 'Missing parameters'})
if not all([domain, selector]):
return jsonify({'success': False, 'message': 'Missing domain or selector parameters'})
dns_name = f"{selector}._domainkey.{domain}"
result = check_dns_record(dns_name, 'TXT', expected_value)
return jsonify(result)
# Get the expected DKIM value from the DKIM manager
try:
dkim_manager = DKIMManager()
dns_record = dkim_manager.get_dkim_public_key_record(domain)
if not dns_record or not dns_record.get('value'):
return jsonify({'success': False, 'message': 'No DKIM key found for domain'})
expected_value = dns_record['value']
dns_name = f"{selector}._domainkey.{domain}"
result = check_dns_record(dns_name, 'TXT', expected_value)
return jsonify(result)
except Exception as e:
logger.error(f"Error checking DKIM DNS: {e}")
return jsonify({'success': False, 'message': f'Error checking DKIM DNS: {str(e)}'})
@email_bp.route('/dkim/check_spf', methods=['POST'])
def check_spf_dns():
@@ -1049,3 +1215,38 @@ def internal_error(error):
error_code=500,
error_message='Internal server error',
current_time=datetime.now()), 500
@email_bp.route('/dkim/create', methods=['POST'], endpoint='create_dkim')
def create_dkim():
"""Create a new DKIM key for a domain, optionally with a custom selector."""
from flask import request, jsonify
data = request.get_json() if request.is_json else request.form
domain_name = data.get('domain')
selector = data.get('selector', None)
session = Session()
try:
if not domain_name:
return jsonify({'success': False, 'message': 'Domain is required.'}), 400
domain = session.query(Domain).filter_by(domain_name=domain_name).first()
if not domain:
return jsonify({'success': False, 'message': 'Domain not found.'}), 404
# Deactivate any existing active DKIM key for this domain
active_keys = session.query(DKIMKey).filter_by(domain_id=domain.id, is_active=True).all()
for key in active_keys:
key.is_active = False
key.replaced_at = datetime.now()
# Create new DKIM key
dkim_manager = DKIMManager()
created = dkim_manager.generate_dkim_keypair(domain_name, selector=selector, force_new_key=True)
if created:
session.commit()
return jsonify({'success': True, 'message': f'DKIM key created for {domain_name}.'})
else:
session.rollback()
return jsonify({'success': False, 'message': f'Failed to create DKIM key for {domain_name}.'}), 500
except Exception as e:
session.rollback()
logger.error(f"Error creating DKIM: {e}")
return jsonify({'success': False, 'message': f'Error creating DKIM: {str(e)}'}), 500
finally:
session.close()