Files
PyMTA-server/email_server/smtp_handler.py

723 lines
34 KiB
Python

"""
Enhanced SMTP handler for processing incoming emails with security controls.
Security Features:
- Users can only send as their own email or domain emails (if permitted)
- IP authentication is domain-specific
- Sender authorization validation
- Enhanced header management
"""
import email.utils
import os
import mimetypes
from aiosmtpd.smtp import SMTP as AIOSMTP, AuthResult
from aiosmtpd.controller import Controller
from email_server.auth import EnhancedAuthenticator, EnhancedIPAuthenticator, validate_sender_authorization
from email_server.email_relay import EmailRelay
from email_server.dkim_manager import DKIMManager
from email_server.settings_loader import load_settings
from email_server.tool_box import get_logger, ensure_folder_exists, generate_message_id, get_current_time
from email import policy
from email.parser import BytesParser
from email_server.models import Session, EmailAttachment, EmailLog
logger = get_logger()
settings = load_settings()
helo_hostname = settings['Server'].get('helo_hostname', settings['Server'].get('hostname', 'localhost'))
class CustomSMTP(AIOSMTP):
"""Custom SMTP class with configurable banner and secure AUTH handling."""
def __init__(self, *args, **kwargs):
# Sets Custom SMTP banner from settings
_banner_message = settings['Server'].get('server_banner', '')
if _banner_message == '""':
_banner_message = ''
self.custom_banner = _banner_message
# Store authenticator and auth_require_tls for later use
self._custom_authenticator = kwargs.get('authenticator', None)
self._custom_auth_require_tls = kwargs.get('auth_require_tls', False)
super().__init__(*args, **kwargs)
# Override the __ident__ to use our custom banner
self.__ident__ = self.custom_banner
def _get_auth_methods(self):
# Only advertise AUTH if authenticator is set and (not auth_require_tls or connection is secure)
if self._custom_authenticator and (not self._custom_auth_require_tls or self.session and self.session.ssl):
return super()._get_auth_methods()
return []
async def smtp_AUTH(self, arg):
"""
Override AUTH command to close connection after failed authentication.
"""
result = await super().smtp_AUTH(arg)
# If authentication failed, close the connection immediately
if isinstance(result, AuthResult) and not result.success:
if hasattr(self, 'session') and hasattr(self.session, 'transport') and self.session.transport:
self.session.transport.close()
return result
class EnhancedCombinedAuthenticator:
"""
Enhanced combined authenticator with sender validation support.
Features:
- User authentication with session storage
- IP-based authentication with domain validation
- Fallback authentication logic
"""
def __init__(self):
self.user_auth = EnhancedAuthenticator()
self.ip_auth = EnhancedIPAuthenticator()
def __call__(self, server, session, envelope, mechanism, auth_data):
from aiosmtpd.smtp import LoginPassword
# If auth_data is provided (username/password), try user authentication first
if auth_data and isinstance(auth_data, LoginPassword):
result = self.user_auth(server, session, envelope, mechanism, auth_data)
if result.success:
return result
# If user auth fails, don't try IP auth - return the failure
return result
# If no auth_data provided, IP auth will be validated during MAIL FROM
# For now, allow the connection to proceed
return AuthResult(success=True, handled=True)
class EnhancedCustomSMTPHandler:
"""Enhanced custom SMTP handler with security controls."""
def __init__(self):
self.authenticator = EnhancedAuthenticator()
self.ip_authenticator = EnhancedIPAuthenticator()
self.combined_authenticator = EnhancedCombinedAuthenticator()
self.email_relay = EmailRelay()
self.dkim_manager = DKIMManager()
self.auth_require_tls = False
self.auth_methods = ['LOGIN', 'PLAIN']
def _ensure_required_headers(self, content: str, envelope, message_id: str, custom_headers: list = None) -> str:
"""Ensure all required email headers are present and properly formatted."""
try:
lines = content.splitlines()
for idx, line in enumerate(lines):
if not isinstance(line, str):
logger.error(f"_ensure_required_headers: Non-string line at index {idx}: {type(line)}: {line}")
logger.error(f"_ensure_required_headers: Full content object: {repr(content)}")
raise TypeError(f"_ensure_required_headers: Non-string line in content.splitlines(): {type(line)} at index {idx}")
# Find header/body boundary and collect existing headers
body_start = 0
existing_headers = {}
original_header_order = []
for i, line in enumerate(lines):
if line.strip() == '':
body_start = i + 1
break
if not isinstance(line, str):
logger.error(f"_ensure_required_headers: Header line is not a string: {type(line)}: {line}")
continue
if ':' in line and not line.startswith((' ', '\t')):
try:
header_name, header_value = line.split(':', 1)
except Exception as e:
logger.error(f"_ensure_required_headers: Failed to split header line: {line} - {e}")
continue
if not isinstance(header_name, str) or not isinstance(header_value, str):
logger.error(f"_ensure_required_headers: Non-string header_name or header_value: {type(header_name)}, {type(header_value)}: {header_name}, {header_value}")
continue
header_name_lower = header_name.strip().lower()
header_value = header_value.strip()
# Handle continuation lines
j = i + 1
while j < len(lines) and lines[j].startswith((' ', '\t')):
header_value += ' ' + lines[j].strip()
j += 1
existing_headers[header_name_lower] = header_value
original_header_order.append((header_name.strip(), header_value))
logger.debug(f"Found existing header: {header_name_lower} = {header_value}")
# Extract body and clean it
body_lines = lines[body_start:] if body_start < len(lines) else []
while body_lines and body_lines[-1].strip() == '':
body_lines.pop()
body = '\n'.join(body_lines)
# Build headers in optimized order based on Gmail's structure
required_headers = []
# 1. Message-ID (critical for spam filters)
if 'message-id' in existing_headers:
# Parse existing Message-ID
existing_msg_id = existing_headers['message-id'].strip('<>')
if '@' in existing_msg_id:
prefix, hostname = existing_msg_id.rsplit('@', 1)
hostname = hostname.rstrip('>')
if hostname.lower() != helo_hostname.lower():
# If hostname is wrong, modify it to use our hostname
message_id = f"{prefix}@{helo_hostname}"
else:
# If hostname is correct, keep original ID
message_id = existing_msg_id
else:
# Malformed Message-ID, generate new one
message_id = generate_message_id()
else:
# No Message-ID found, generate new one
message_id = generate_message_id()
# Add the Message-ID header with the final ID
required_headers.append(f"Message-ID: <{message_id}>")
# 2. Date (critical for spam filters)
if 'date' in existing_headers:
required_headers.append(f"Date: {existing_headers['date']}")
else:
date_str = email.utils.formatdate(localtime=True)
required_headers.append(f"Date: {date_str}")
# 3. MIME-Version (declare MIME compliance early)
if 'mime-version' in existing_headers:
required_headers.append(f"MIME-Version: {existing_headers['mime-version']}")
else:
required_headers.append("MIME-Version: 1.0")
# 4. To (primary recipients - critical)
if 'to' in existing_headers:
required_headers.append(f"To: {existing_headers['to']}")
else:
required_headers.append(f"To: {', '.join([rcpt for rcpt in envelope.rcpt_tos])}")
# 5. Cc (if present)
if 'cc' in existing_headers:
required_headers.append(f"Cc: {existing_headers['cc']}")
# 6. From (sender identification - critical)
if 'from' in existing_headers:
required_headers.append(f"From: {existing_headers['from']}")
else:
required_headers.append(f"From: {envelope.mail_from}")
# 7. Subject (message topic - critical)
if 'subject' in existing_headers:
required_headers.append(f"Subject: {existing_headers['subject']}")
else:
required_headers.append("Subject: ")
# 8. Content-Type (media type information)
if 'content-type' in existing_headers:
required_headers.append(f"Content-Type: {existing_headers['content-type']}")
else:
required_headers.append("Content-Type: text/plain; charset=UTF-8; format=flowed")
# 9. Content-Transfer-Encoding
if 'content-transfer-encoding' in existing_headers:
required_headers.append(f"Content-Transfer-Encoding: {existing_headers['content-transfer-encoding']}")
else:
required_headers.append("Content-Transfer-Encoding: 7bit")
# Add custom headers after essential headers
if custom_headers:
for header_name, header_value in custom_headers:
header_name_lower = header_name.lower()
# Skip if header already exists
if header_name_lower not in existing_headers:
required_headers.append(f"{header_name}: {header_value}")
logger.debug(f"Added custom header: {header_name}: {header_value}")
# Build final message
final_content = '\r\n'.join(required_headers)
if body.strip():
final_content += '\r\n\r\n' + body
else:
final_content += '\r\n\r\n'
return final_content
except Exception as e:
import traceback
logger.error(f"Error ensuring headers: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
logger.error(f"Locals: {locals()}")
# Fallback to original content if parsing fails
return content
async def handle_DATA(self, server, session, envelope):
"""Handle incoming email data with improved header management and logging."""
try:
# Convert content to string if it's bytes
if isinstance(envelope.content, bytes):
content = envelope.content.decode('utf-8', errors='replace')
else:
content = envelope.content
# Extract Message-ID from the content
for line in content.splitlines():
if line.lower().startswith('message-id:'):
message_id_extracted = line[11:].strip().strip('<>') # Remove "Message-ID:" and brackets
if '@' in message_id_extracted:
prefix, hostname = message_id_extracted.rsplit('@', 1)
hostname = hostname.rstrip('>')
if hostname.lower() != helo_hostname.lower():
# If hostname is wrong, modify it to use our hostname
message_id = f"{prefix}@{helo_hostname}"
else:
# If hostname is correct, keep original ID
message_id = message_id_extracted
break
logger.debug(f'Processing email with ID: {message_id} from {envelope.mail_from} to {envelope.rcpt_tos}')
# Get authenticated username from session
username = getattr(session, 'username', None)
if not username:
# Check if IP authentication was used
client_ip = getattr(session, 'peer', ['unknown'])[0].split(':')[0] if hasattr(session, 'peer') else None
if client_ip:
from email_server.models import get_whitelisted_ip
sender_domain = envelope.mail_from.split('@')[1] if '@' in envelope.mail_from else None
ip_auth = get_whitelisted_ip(client_ip, sender_domain)
if ip_auth:
username = f"IP:{client_ip}"
logger.debug(f'Authenticated username: {username}')
# Convert content to string if it's bytes
if isinstance(envelope.content, bytes):
content = envelope.content.decode('utf-8', errors='replace')
raw_bytes = envelope.content
else:
content = envelope.content
raw_bytes = envelope.content.encode('utf-8', errors='replace')
# Extract domain from sender for DKIM signing
sender_domain = envelope.mail_from.split('@')[1] if '@' in envelope.mail_from else None
# Get custom headers before processing
custom_headers = []
if sender_domain:
custom_headers = self.dkim_manager.get_active_custom_headers(sender_domain)
# Add beneficial headers for spam score improvement
client_ip = getattr(session, 'peer', ['unknown'])[0] if hasattr(session, 'peer') else None
if client_ip:
custom_headers.append(('X-Originating-IP', f'[{client_ip}]'))
custom_headers.append(('X-Mailer', 'NetBro Mail Server 1.0'))
custom_headers.append(('X-Priority', '3'))
# Ensure required headers are present (including custom headers)
content = self._ensure_required_headers(content, envelope, message_id, custom_headers)
# DKIM-sign the final version of the message (only once, after all modifications)
signed_content = content
dkim_signed = False
if sender_domain:
signed_content = self.dkim_manager.sign_email(content, sender_domain)
if not isinstance(signed_content, (str, bytes)):
logger.error(f"DKIMManager.sign_email returned non-str/bytes: {type(signed_content)}: {signed_content}")
raise TypeError(f"DKIMManager.sign_email returned non-str/bytes: {type(signed_content)}")
dkim_signed = signed_content != content
if dkim_signed:
logger.debug(f'Email {message_id} signed with DKIM for domain {sender_domain}')
# Extract headers for logging
to_address = ''
cc_addresses = ''
bcc_addresses = ''
subject = ''
split_lines = content.splitlines()
for idx, line in enumerate(split_lines):
if not isinstance(line, str):
logger.error(f"DIAGNOSTIC: Non-string line at index {idx}: {type(line)}: {line}")
logger.error(f"DIAGNOSTIC: Full content object: {repr(content)}")
raise TypeError(f"DIAGNOSTIC: Non-string line in content.splitlines(): {type(line)} at index {idx}")
try:
for line in split_lines:
if line.strip() == '':
break
if not isinstance(line, str):
logger.error(f"Header line is not a string: {type(line)}: {line}")
continue
try:
lower_line = line.lower()
except Exception as e:
logger.error(f"Failed to call lower() on line: {line} (type: {type(line)}) - {e}")
import traceback
logger.error(traceback.format_exc())
logger.error(f"Full content.splitlines(): {split_lines}")
continue
if lower_line.startswith('to:'):
to_address = line[3:].strip()
elif lower_line.startswith('cc:'):
cc_addresses = line[3:].strip()
elif lower_line.startswith('subject:'):
subject = line[8:].strip()
except Exception as e:
logger.error(f"Exception in header extraction loop: {e}")
import traceback
logger.error(traceback.format_exc())
logger.error(f"Full content.splitlines(): {split_lines}")
# Check if message content should be stored (sender or IP whitelist)
from email_server.models import get_sender_by_email, get_whitelisted_ip
store_message = False
sender_obj = get_sender_by_email(envelope.mail_from)
if sender_obj and getattr(sender_obj, 'store_message_content', False):
store_message = True
elif client_ip:
domain_name = sender_domain
ip_obj = get_whitelisted_ip(client_ip, domain_name)
if ip_obj and getattr(ip_obj, 'store_message_content', False):
store_message = True
attachments_to_save = []
# Get attachments path from settings
attachments_path = settings['Attachments'].get('attachments_path', 'email_server/server_data/attachments')
saved_attachments = []
logger.debug(f"Using attachments base path: {attachments_path}")
email_log_id = None
if store_message:
# Parse the message for attachments using the email library
msg = BytesParser(policy=policy.default).parsebytes(raw_bytes)
if msg.is_multipart():
# Get storage path for this sender
storage_path = self.get_attachment_storage_path(
attachments_base_path=attachments_path,
sender_domain=sender_domain,
username=username,
client_ip=client_ip
)
ensure_folder_exists(storage_path)
for part in msg.walk():
content_disposition = part.get_content_disposition()
if content_disposition == 'attachment':
filename = part.get_filename()
if not filename:
continue
# Get file data and validate
file_data = part.get_payload(decode=True)
if not file_data:
continue
# Get proper content type
content_type = self.get_content_type(part, filename)
size = len(file_data)
# Strip @domain from message_id for filename
clean_message_id = message_id.split('@')[0] if '@' in message_id else message_id
# Build a unique file path
safe_filename = f"{clean_message_id}_{filename}"
file_path = os.path.join(storage_path, safe_filename)
try:
# Ensure the directory exists before saving
ensure_folder_exists(file_path)
# Save the file
with open(file_path, 'wb') as f:
f.write(file_data)
logger.debug(f"Saved attachment {filename} ({content_type}) to {file_path}")
attachments_to_save.append({
'filename': filename,
'content_type': content_type,
'file_path': file_path,
'size': size
})
except Exception as e:
logger.error(f"Failed to save attachment {filename}: {str(e)}")
continue
# Parse addresses to determine recipient types
def parse_addresses(addr_str):
if not isinstance(addr_str, str):
logger.warning(f"Expected string for address header, got {type(addr_str)}: {addr_str}")
return []
return [addr.strip().lower() for addr in addr_str.split(',') if isinstance(addr, str) and addr.strip()]
to_list = parse_addresses(to_address)
cc_list = parse_addresses(cc_addresses)
# Map recipients to their types based on headers
recipient_type_map = {}
for rcpt in envelope.rcpt_tos:
if not isinstance(rcpt, str):
logger.warning(f"Expected string for recipient, got {type(rcpt)}: {rcpt}")
continue
rcpt_l = rcpt.lower()
if rcpt_l in to_list:
recipient_type_map[rcpt] = 'to'
elif rcpt_l in cc_list:
recipient_type_map[rcpt] = 'cc'
else:
recipient_type_map[rcpt] = 'bcc' # Any recipient not in To/Cc is a Bcc
# Build recipient results
recipient_results = []
recipient_types = []
for rcpt in envelope.rcpt_tos:
rtype = recipient_type_map[rcpt]
recipient_results.append({'recipient': rcpt, 'recipient_type': rtype, 'status': 'pending'})
recipient_types.append(rtype)
# Relay the email and get per-recipient results
relay_results = await self.email_relay.relay_email_async(
envelope.mail_from,
envelope.rcpt_tos,
signed_content,
username=username,
cc_addresses=cc_addresses,
bcc_addresses=None, # BCC addresses are handled through envelope.rcpt_tos
recipient_types=recipient_types
)
# Update status in recipient_results
for result in relay_results:
for r in recipient_results:
if r['recipient'] == result['recipient'] and r['recipient_type'] == result.get('recipient_type', 'to'):
r.update(result)
break
# Determine overall status
status = 'relayed' if all(r['status'] == 'success' for r in recipient_results) else 'failed'
# Extract headers and parse message content
msg = BytesParser(policy=policy.default).parsebytes(raw_bytes)
# Extract headers
email_headers = []
for name, value in msg.items():
email_headers.append(f"{name}: {value}")
email_headers = '\n'.join(email_headers)
# Extract only the text content, not attachments
message_body = ""
if msg.is_multipart():
for part in msg.walk():
if part.get_content_maintype() == 'text' and part.get_content_disposition() is None:
# This is likely the main message text
charset = part.get_content_charset() or 'utf-8'
try:
part_content = part.get_payload(decode=True).decode(charset)
message_body += part_content + "\n"
except Exception as e:
logger.warning(f"Failed to decode message part: {e}")
else:
# Not multipart - if it's text, use it as is
if msg.get_content_maintype() == 'text':
charset = msg.get_content_charset() or 'utf-8'
try:
message_body = msg.get_payload(decode=True).decode(charset)
except Exception as e:
logger.warning(f"Failed to decode message: {e}")
# Trim any extra whitespace
message_body = message_body.strip()
# Get client IP without port
client_ip = getattr(session, 'peer', ['unknown'])[0].split(':')[0] if hasattr(session, 'peer') else 'unknown'
# Log the email with all details
self.email_relay.log_email(
message_id=message_id,
peer=client_ip,
mail_from=envelope.mail_from,
to_address=to_address,
cc_addresses=cc_addresses,
bcc_addresses=', '.join([r['recipient'] for r in recipient_results if r['recipient_type'] == 'bcc']),
subject=subject,
email_headers=email_headers,
message_body=message_body,
status=status,
dkim_signed=dkim_signed,
username=username,
recipient_results=recipient_results
)
# Save attachments to DB, linked to the correct EmailLog
if attachments_to_save:
db_session = Session()
try:
email_log = db_session.query(EmailLog).filter_by(message_id=message_id).first()
if email_log:
for att in attachments_to_save:
attachment = EmailAttachment(
email_log_id=email_log.id,
filename=att['filename'],
content_type=att['content_type'],
file_path=att['file_path'],
size=att['size']
)
db_session.add(attachment)
db_session.commit()
except Exception as e:
logger.error(f"Failed to save attachments to DB: {e}")
db_session.rollback()
finally:
db_session.close()
if status == 'relayed':
logger.debug(f'Email {message_id} successfully relayed')
return '250 Message accepted for delivery'
else:
logger.error(f'Email {message_id} failed to relay')
return '550 Message relay failed'
except Exception as e:
import traceback
logger.error(f'Error handling email: {e}')
logger.error(f'Traceback: {traceback.format_exc()}')
logger.error(f'Locals: {locals()}')
return '550 Internal server error'
async def handle_RCPT(self, server, session, envelope, address, rcpt_options):
"""Handle RCPT TO command - validate recipients."""
logger.debug(f'RCPT TO: {address}')
envelope.rcpt_tos.append(address)
return '250 OK'
async def handle_MAIL(self, server, session, envelope, address, mail_options):
"""
Handle MAIL FROM command with enhanced sender validation.
Security Features:
- Validates user can send as the specified address
- Validates IP authorization for domain
- Comprehensive audit logging
"""
logger.debug(f'MAIL FROM: {address}')
# Validate sender authorization
authorized, message = validate_sender_authorization(session, address)
if not authorized:
logger.warning(f'MAIL FROM rejected: {address} - {message}')
return f'550 {message}'
envelope.mail_from = address
logger.info(f'MAIL FROM accepted: {address} - {message}')
return '250 OK'
def get_attachment_storage_path(self, attachments_base_path: str, sender_domain: str, username: str = None, client_ip: str = None) -> str:
"""Generate the storage path for attachments based on sender domain, authentication, and date.
Args:
attachments_base_path: Base path for attachments storage
sender_domain: Domain of the sender
username: Authenticated username (if any)
client_ip: Client IP address (if IP-based authentication)
Returns:
str: Full path where attachments should be stored, format:
base/domain/[username|ip]/YYYY-DD-MMM/
"""
# Get current date in YYYY-DD-MMM format using consistent time function
current_date = get_current_time().strftime('%Y-%d-%b') # e.g., 2025-14-Jun
# Sanitize domain name for folder name
safe_domain = sender_domain.replace('/', '_').replace('\\', '_')
domain_path = os.path.join(attachments_base_path, safe_domain)
# Determine auth-based subfolder path
if username:
# Sanitize username for folder name
safe_username = username.replace('/', '_').replace('\\', '_')
auth_path = os.path.join(domain_path, safe_username)
elif client_ip:
# Sanitize IP for folder name
safe_ip = client_ip.replace(':', '_')
auth_path = os.path.join(domain_path, safe_ip)
else:
# Fallback to domain-only path
auth_path = domain_path
# Add date-based subfolder
return os.path.join(auth_path, current_date)
def get_content_type(self, part, filename):
"""Get the correct content type for a file, trying multiple methods."""
# First try the part's content type
content_type = part.get_content_type()
# If it's octet-stream, try to guess from filename
if content_type == 'application/octet-stream':
guessed_type, _ = mimetypes.guess_type(filename)
if guessed_type:
content_type = guessed_type
else:
# Use specific types for common extensions
ext = filename.lower().split('.')[-1] if '.' in filename else ''
type_map = {
'txt': 'text/plain',
'csv': 'text/csv',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'png': 'image/png',
'gif': 'image/gif',
'pdf': 'application/pdf',
'json': 'application/json',
'xml': 'application/xml',
'html': 'text/html',
'htm': 'text/html',
}
content_type = type_map.get(ext, 'application/octet-stream')
return content_type
class TLSController(Controller):
"""
Custom controller for direct TLS (SMTPS, port 465) support.
"""
def __init__(self, handler, ssl_context, hostname='localhost', port=40465):
logger.debug(f"TLSController __init__: ssl_context={ssl_context is not None}")
self._ssl_context = ssl_context # Use private attribute to avoid conflicts
self.smtp_hostname = hostname # Store for HELO identification
super().__init__(handler, hostname='0.0.0.0', port=port) # Bind to all interfaces
def factory(self):
logger.debug(f"TLSController factory: ssl_context={self._ssl_context is not None}")
logger.debug(f"TLSController factory: ssl_context object={self._ssl_context}")
logger.debug(f"TLSController factory: hostname={self.smtp_hostname}")
# This is direct TLS (SMTPS, port 465 style)
smtp_instance = CustomSMTP(
self.handler,
tls_context=self._ssl_context,
require_starttls=False, # Direct TLS: do not advertise or require STARTTLS
auth_require_tls=True, # If auth is used, require TLS
authenticator=self.handler.combined_authenticator,
decode_data=True,
hostname=self.smtp_hostname # Use proper hostname for HELO
)
logger.debug(f"TLSController CustomSMTP instance created with TLS: {hasattr(smtp_instance, 'tls_context')}")
return smtp_instance
class PlainController(Controller):
"""Controller for plain SMTP with authentication and IP whitelist fallback."""
def __init__(self, handler, hostname='localhost', port=4025):
self.smtp_hostname = hostname # Store for HELO identification
super().__init__(handler, hostname='0.0.0.0', port=port) # Bind to all interfaces
def factory(self):
# Pass authenticator and set auth_require_tls=False to enable AUTH on plain port
return CustomSMTP(
self.handler,
authenticator=self.handler.combined_authenticator,
auth_require_tls=False, # Allow AUTH on plain port
decode_data=True,
hostname=self.smtp_hostname # Use proper hostname for HELO
)