351 lines
15 KiB
Python
351 lines
15 KiB
Python
"""
|
|
Email relay functionality for the SMTP server.
|
|
"""
|
|
|
|
import asyncio
|
|
import dns.resolver
|
|
from email_server.models import Session, EmailLog, EmailRecipientLog
|
|
from email_server.settings_loader import load_settings
|
|
from email_server.tool_box import get_logger, get_current_time
|
|
import aiosmtplib
|
|
|
|
logger = get_logger()
|
|
|
|
settings = load_settings()
|
|
_relay_tls_timeout = settings['Server'].get('relay_timeout', 30)
|
|
|
|
port = 25 # Default MX SMTP port for relaying emails
|
|
|
|
class EmailRelay:
|
|
"""Handles relaying emails to recipient mail servers."""
|
|
|
|
def __init__(self):
|
|
|
|
self.timeout = _relay_tls_timeout # Increased timeout for TLS negotiations
|
|
# Get the configured hostname for HELO/EHLO identification
|
|
self.hostname = settings['Server'].get('helo_hostname', settings['Server'].get('hostname', 'localhost'))
|
|
logger.debug(f"EmailRelay initialized with hostname: {self.hostname}")
|
|
|
|
def _modify_headers_for_recipients(self, content, to_addresses, cc_addresses=None):
|
|
"""Modify email headers to set To and Cc fields, preserving original structure for DKIM.
|
|
|
|
Args:
|
|
content: Raw email content
|
|
to_addresses: List of TO recipients
|
|
cc_addresses: List of CC recipients (optional)
|
|
"""
|
|
lines = content.splitlines()
|
|
new_headers = []
|
|
body_start = 0
|
|
has_to = False
|
|
has_cc = False
|
|
|
|
# First pass: find header/body boundary and examine existing headers
|
|
for i, line in enumerate(lines):
|
|
if line.strip() == '':
|
|
body_start = i
|
|
break
|
|
# Skip BCC headers but preserve TO and CC
|
|
if line.lower().startswith('bcc:'):
|
|
continue
|
|
# Track if we have TO/CC headers
|
|
if line.lower().startswith('to:'):
|
|
has_to = True
|
|
elif line.lower().startswith('cc:'):
|
|
has_cc = True
|
|
new_headers.append(line)
|
|
|
|
# Only add headers if they don't exist
|
|
if not has_to and to_addresses:
|
|
new_headers.append(f"To: {', '.join(to_addresses)}")
|
|
if not has_cc and cc_addresses:
|
|
new_headers.append(f"Cc: {', '.join(cc_addresses)}")
|
|
|
|
# Reconstruct the message
|
|
body = '\n'.join(lines[body_start:]) if body_start < len(lines) else ''
|
|
return '\r\n'.join(new_headers) + '\r\n\r\n' + body
|
|
|
|
def _prepare_email_for_recipient(self, content: str, bcc_recipient: str = None) -> str:
|
|
"""Prepare a copy of the email for a specific recipient without modifying original content.
|
|
|
|
Args:
|
|
content: The original signed email content
|
|
bcc_recipient: If specified, prepare content for this BCC recipient
|
|
|
|
Returns:
|
|
str: Email content ready for the specific recipient
|
|
"""
|
|
lines = content.splitlines()
|
|
new_lines = []
|
|
headers_done = False
|
|
empty_line_added = False
|
|
|
|
for line in lines:
|
|
if not headers_done:
|
|
if line.strip() == '':
|
|
headers_done = True
|
|
empty_line_added = True
|
|
new_lines.append(line) # Keep the empty line separator
|
|
# Skip BCC headers
|
|
elif not line.lower().startswith('bcc:'):
|
|
new_lines.append(line)
|
|
else:
|
|
new_lines.append(line)
|
|
|
|
# Ensure there's a blank line between headers and body if not already present
|
|
if not empty_line_added:
|
|
new_lines.append('')
|
|
|
|
return '\r\n'.join(new_lines)
|
|
|
|
async def relay_email_async(
|
|
self,
|
|
mail_from: str,
|
|
rcpt_tos: list[str],
|
|
content: str,
|
|
username: str = None,
|
|
cc_addresses: list[str] = None,
|
|
bcc_addresses: list[str] = None,
|
|
recipient_types: list[str] = None
|
|
) -> list[dict]:
|
|
"""Relay email to recipients' mail servers asynchronously with encryption.
|
|
Preserves DKIM signatures by not modifying the signed content."""
|
|
results = []
|
|
recipient_type_map = {}
|
|
if recipient_types and len(recipient_types) == len(rcpt_tos):
|
|
for addr, rtype in zip(rcpt_tos, recipient_types):
|
|
recipient_type_map[addr] = rtype
|
|
else:
|
|
for addr in rcpt_tos:
|
|
recipient_type_map[addr] = 'to'
|
|
|
|
# Separate visible recipients (TO/CC) and BCC recipients
|
|
visible_recipients = []
|
|
bcc_list = []
|
|
|
|
for rcpt in rcpt_tos:
|
|
if recipient_type_map.get(rcpt) in ['to', 'cc']:
|
|
visible_recipients.append(rcpt)
|
|
elif recipient_type_map.get(rcpt) == 'bcc':
|
|
bcc_list.append(rcpt)
|
|
|
|
# Group recipients by domain for efficient delivery
|
|
domain_groups = {}
|
|
for rcpt in visible_recipients:
|
|
domain = rcpt.split('@')[1].lower()
|
|
rtype = recipient_type_map.get(rcpt, 'to')
|
|
if domain not in domain_groups:
|
|
domain_groups[domain] = {'to': [], 'cc': [], 'bcc': []}
|
|
domain_groups[domain][rtype].append(rcpt)
|
|
|
|
# Handle TO/CC recipients - use original signed content
|
|
for domain, recipients in domain_groups.items():
|
|
to_recipients = recipients['to']
|
|
cc_recipients = recipients['cc']
|
|
if not to_recipients and not cc_recipients:
|
|
continue
|
|
|
|
# Prepare content for TO/CC recipients without modifying headers
|
|
prepared_content = self._prepare_email_for_recipient(content)
|
|
|
|
try:
|
|
mx_records = dns.resolver.resolve(domain, 'MX')
|
|
mx_records = sorted(mx_records, key=lambda x: x.preference)
|
|
mx_hosts = [mx.exchange.to_text().rstrip('.') for mx in mx_records]
|
|
logger.debug(f'Found MX records for {domain}: {mx_hosts}')
|
|
except Exception as e:
|
|
logger.error(f'Failed to resolve MX for {domain}: {e}')
|
|
for rcpt in to_recipients + cc_recipients:
|
|
results.append({
|
|
'recipient': rcpt,
|
|
'status': 'failed',
|
|
'error_code': 'MX',
|
|
'error_message': str(e),
|
|
'server_response': None,
|
|
'recipient_type': recipient_type_map.get(rcpt, 'to')
|
|
})
|
|
continue
|
|
|
|
delivered = False
|
|
last_error = None
|
|
for mx_host in mx_hosts:
|
|
try:
|
|
smtp = aiosmtplib.SMTP(hostname=mx_host, port=port, timeout=self.timeout, local_hostname=self.hostname)
|
|
await smtp.connect()
|
|
ext = getattr(smtp, 'extensions', None)
|
|
if ext is None:
|
|
ext = getattr(smtp, 'esmtp_extensions', None)
|
|
if ext is None:
|
|
logger.error(f"SMTP object has no 'extensions' or 'esmtp_extensions'. Available attributes: {dir(smtp)}")
|
|
ext = {}
|
|
if 'starttls' in ext:
|
|
logger.debug(f'STARTTLS supported by {mx_host}:{port}, upgrading to TLS')
|
|
await smtp.starttls()
|
|
else:
|
|
logger.warning(f'STARTTLS not supported by {mx_host}:{port}, sending in plain text!')
|
|
response = await smtp.sendmail(mail_from, to_recipients + cc_recipients, prepared_content)
|
|
logger.debug(f'Successfully relayed email to {to_recipients + cc_recipients} via {mx_host}:{port}')
|
|
for rcpt in to_recipients + cc_recipients:
|
|
results.append({
|
|
'recipient': rcpt,
|
|
'status': 'success',
|
|
'error_code': None,
|
|
'error_message': None,
|
|
'server_response': str(response),
|
|
'recipient_type': recipient_type_map.get(rcpt, 'to')
|
|
})
|
|
await smtp.quit()
|
|
delivered = True
|
|
break
|
|
except Exception as e:
|
|
logger.error(f'Failed to relay email to {to_recipients + cc_recipients} via {mx_host}:{port}: {e}')
|
|
last_error = {
|
|
'status': 'failed',
|
|
'error_code': 'RELAY',
|
|
'error_message': str(e),
|
|
'server_response': None
|
|
}
|
|
continue
|
|
|
|
if not delivered and last_error:
|
|
for rcpt in to_recipients + cc_recipients:
|
|
results.append({
|
|
'recipient': rcpt,
|
|
'status': last_error['status'],
|
|
'error_code': last_error['error_code'],
|
|
'error_message': last_error['error_message'],
|
|
'server_response': last_error['server_response'],
|
|
'recipient_type': recipient_type_map.get(rcpt, 'to')
|
|
})
|
|
|
|
# Handle BCC recipients - each gets their own copy with original headers
|
|
for bcc in bcc_list:
|
|
domain = bcc.split('@')[1].lower()
|
|
# Prepare content for BCC recipient - remove BCC headers but keep everything else
|
|
prepared_content = self._prepare_email_for_recipient(content, bcc)
|
|
|
|
try:
|
|
mx_records = dns.resolver.resolve(domain, 'MX')
|
|
mx_records = sorted(mx_records, key=lambda x: x.preference)
|
|
mx_hosts = [mx.exchange.to_text().rstrip('.') for mx in mx_records]
|
|
logger.debug(f'Found MX records for {domain}: {mx_hosts}')
|
|
except Exception as e:
|
|
logger.error(f'Failed to resolve MX for {domain}: {e}')
|
|
results.append({
|
|
'recipient': bcc,
|
|
'status': 'failed',
|
|
'error_code': 'MX',
|
|
'error_message': str(e),
|
|
'server_response': None,
|
|
'recipient_type': 'bcc'
|
|
})
|
|
continue
|
|
|
|
delivered = False
|
|
last_error = None
|
|
for mx_host in mx_hosts:
|
|
try:
|
|
smtp = aiosmtplib.SMTP(hostname=mx_host, port=port, timeout=self.timeout, local_hostname=self.hostname)
|
|
await smtp.connect()
|
|
ext = getattr(smtp, 'extensions', None)
|
|
if ext is None:
|
|
ext = getattr(smtp, 'esmtp_extensions', None)
|
|
if ext is None:
|
|
logger.error(f"SMTP object has no 'extensions' or 'esmtp_extensions'. Available attributes: {dir(smtp)}")
|
|
ext = {}
|
|
if 'starttls' in ext:
|
|
logger.debug(f'STARTTLS supported by {mx_host}:{port} for BCC, upgrading to TLS')
|
|
await smtp.starttls()
|
|
else:
|
|
logger.warning(f'STARTTLS not supported by {mx_host}:{port} for BCC, sending in plain text!')
|
|
response = await smtp.sendmail(mail_from, [bcc], prepared_content)
|
|
logger.debug(f'Successfully relayed BCC email to {bcc} via {mx_host}:{port}')
|
|
results.append({
|
|
'recipient': bcc,
|
|
'status': 'success',
|
|
'error_code': None,
|
|
'error_message': None,
|
|
'server_response': str(response),
|
|
'recipient_type': 'bcc'
|
|
})
|
|
await smtp.quit()
|
|
delivered = True
|
|
break
|
|
except Exception as e:
|
|
logger.error(f'Failed to relay BCC email to {bcc} via {mx_host}:{port}: {e}')
|
|
last_error = {
|
|
'status': 'failed',
|
|
'error_code': 'RELAY',
|
|
'error_message': str(e),
|
|
'server_response': None
|
|
}
|
|
continue
|
|
|
|
if not delivered and last_error:
|
|
results.append({
|
|
'recipient': bcc,
|
|
'status': last_error['status'],
|
|
'error_code': last_error['error_code'],
|
|
'error_message': last_error['error_message'],
|
|
'server_response': last_error['server_response'],
|
|
'recipient_type': 'bcc'
|
|
})
|
|
|
|
return results
|
|
|
|
def relay_email(self, *args, **kwargs):
|
|
"""Synchronous wrapper for relay_email_async for compatibility."""
|
|
return asyncio.run(self.relay_email_async(*args, **kwargs))
|
|
|
|
def log_email(self, message_id, peer, mail_from, to_address, cc_addresses, bcc_addresses, subject, email_headers, message_body, status, dkim_signed=False, username=None, recipient_results=None):
|
|
"""Log email activity to database, including per-recipient results."""
|
|
session_db = Session()
|
|
try:
|
|
# Determine status: relayed, partial, failed
|
|
delivered = [r for r in (recipient_results or []) if r['status'] == 'success']
|
|
failed = [r for r in (recipient_results or []) if r['status'] != 'success']
|
|
if delivered and failed:
|
|
overall_status = 'partial'
|
|
elif delivered:
|
|
overall_status = 'relayed'
|
|
else:
|
|
overall_status = 'failed'
|
|
|
|
email_log = EmailLog(
|
|
message_id=message_id,
|
|
timestamp=get_current_time(),
|
|
peer_ip=peer,
|
|
mail_from=mail_from,
|
|
to_address=to_address or '',
|
|
cc_addresses=cc_addresses or '',
|
|
bcc_addresses=bcc_addresses or '',
|
|
subject=subject,
|
|
email_headers=email_headers,
|
|
message_body=message_body,
|
|
status=overall_status,
|
|
dkim_signed=dkim_signed,
|
|
username=username
|
|
)
|
|
session_db.add(email_log)
|
|
session_db.flush()
|
|
|
|
# Log per-recipient results
|
|
if recipient_results:
|
|
for r in recipient_results:
|
|
recipient_log = EmailRecipientLog(
|
|
email_log_id=email_log.id,
|
|
recipient=r['recipient'],
|
|
recipient_type=r.get('recipient_type', 'to'),
|
|
status=r['status'],
|
|
error_code=r.get('error_code'),
|
|
error_message=r.get('error_message'),
|
|
server_response=r.get('server_response')
|
|
)
|
|
session_db.add(recipient_log)
|
|
session_db.commit()
|
|
logger.debug(f'Logged email: {message_id}')
|
|
except Exception as e:
|
|
session_db.rollback()
|
|
logger.error(f'Error logging email: {e}')
|
|
finally:
|
|
session_db.close() |