moved server files to its own directory and update tests

This commit is contained in:
nahakubuilde
2025-05-30 07:12:42 +01:00
parent a01aa22a4b
commit 98ecce507e
11 changed files with 11 additions and 6 deletions

View File

@@ -0,0 +1,212 @@
"""
DKIM key management and email signing functionality.
"""
import logging
import dkim
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa
from datetime import datetime
from models import Session, Domain, DKIMKey
from config import DKIM_SELECTOR, DKIM_KEY_SIZE
logger = logging.getLogger(__name__)
class DKIMManager:
"""Manages DKIM keys and email signing."""
def __init__(self):
self.selector = DKIM_SELECTOR
def generate_dkim_keypair(self, domain_name):
"""Generate DKIM key pair for a domain."""
session = Session()
try:
# Check if domain exists
domain = session.query(Domain).filter_by(domain_name=domain_name).first()
if not domain:
logger.error(f"Domain {domain_name} not found")
return False
# Check if DKIM key already exists
existing_key = session.query(DKIMKey).filter_by(domain_id=domain.id, is_active=True).first()
if existing_key:
logger.info(f"DKIM key already exists for domain {domain_name}")
return True
# Generate RSA key pair
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=DKIM_KEY_SIZE
)
# Get private key in PEM format
private_pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
).decode('utf-8')
# Get public key in PEM format
public_key = private_key.public_key()
public_pem = public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).decode('utf-8')
# Store in database
dkim_key = DKIMKey(
domain_id=domain.id,
selector=self.selector,
private_key=private_pem,
public_key=public_pem,
created_at=datetime.now(),
is_active=True
)
session.add(dkim_key)
session.commit()
logger.info(f"Generated DKIM key for domain: {domain_name}")
return True
except Exception as e:
session.rollback()
logger.error(f"Error generating DKIM key for {domain_name}: {e}")
return False
finally:
session.close()
def get_dkim_private_key(self, domain_name):
"""Get DKIM private key for a domain."""
session = Session()
try:
domain = session.query(Domain).filter_by(domain_name=domain_name).first()
if not domain:
return None
dkim_key = session.query(DKIMKey).filter_by(
domain_id=domain.id,
is_active=True
).first()
if dkim_key:
return dkim_key.private_key
return None
except Exception as e:
logger.error(f"Error getting DKIM private key for {domain_name}: {e}")
return None
finally:
session.close()
def get_dkim_public_key_record(self, domain_name):
"""Get DKIM public key DNS record for a domain."""
session = Session()
try:
domain = session.query(Domain).filter_by(domain_name=domain_name).first()
if not domain:
return None
dkim_key = session.query(DKIMKey).filter_by(
domain_id=domain.id,
is_active=True
).first()
if dkim_key:
# Extract public key from PEM format for DNS record
public_key_lines = dkim_key.public_key.strip().split('\n')
public_key_data = ''.join(public_key_lines[1:-1]) # Remove header/footer
return {
'name': f'{self.selector}._domainkey.{domain_name}',
'type': 'TXT',
'value': f'v=DKIM1; k=rsa; p={public_key_data}'
}
return None
except Exception as e:
logger.error(f"Error getting DKIM public key record for {domain_name}: {e}")
return None
finally:
session.close()
def sign_email(self, email_content, domain_name):
"""Sign email content with DKIM."""
try:
private_key = self.get_dkim_private_key(domain_name)
if not private_key:
logger.warning(f"No DKIM key found for domain: {domain_name}")
return email_content
# Convert content to bytes if it's a string
if isinstance(email_content, str):
email_bytes = email_content.encode('utf-8')
else:
email_bytes = email_content
# Sign the email
signature = dkim.sign(
email_bytes,
self.selector.encode('utf-8'),
domain_name.encode('utf-8'),
private_key.encode('utf-8'),
include_headers=[b'from', b'to', b'subject', b'date', b'message-id']
)
# Combine signature with original content
signed_content = signature + email_bytes
logger.info(f"Successfully signed email for domain: {domain_name}")
# Return as string if input was string
if isinstance(email_content, str):
return signed_content.decode('utf-8')
else:
return signed_content
except Exception as e:
logger.error(f"Error signing email for domain {domain_name}: {e}")
return email_content
def list_dkim_keys(self):
"""List all DKIM keys."""
session = Session()
try:
keys = session.query(DKIMKey, Domain).join(Domain).all()
result = []
for dkim_key, domain in keys:
result.append({
'domain': domain.domain_name,
'selector': dkim_key.selector,
'created_at': dkim_key.created_at,
'active': dkim_key.is_active
})
return result
except Exception as e:
logger.error(f"Error listing DKIM keys: {e}")
return []
finally:
session.close()
def initialize_default_keys(self):
"""Initialize DKIM keys for existing domains that don't have them."""
session = Session()
try:
domains = session.query(Domain).all()
for domain in domains:
existing_key = session.query(DKIMKey).filter_by(
domain_id=domain.id,
is_active=True
).first()
if not existing_key:
logger.info(f"Generating DKIM key for existing domain: {domain.domain_name}")
self.generate_dkim_keypair(domain.domain_name)
except Exception as e:
logger.error(f"Error initializing default DKIM keys: {e}")
finally:
session.close()