updated config generation

This commit is contained in:
nahakubuilde
2025-05-31 16:32:11 +01:00
parent 16d5f961e2
commit 1d4a37922d
15 changed files with 249 additions and 90 deletions

4
.gitignore vendored
View File

@@ -4,10 +4,14 @@ __pycache__/
*$py.class *$py.class
# Certs, db, private test files # Certs, db, private test files
settings.ini
*.crt *.crt
*.key *.key
*.db *.db
test.txt test.txt
custom_test.py
custom_test.sh
.github/
# C extensions # C extensions
*.so *.so

View File

@@ -2,12 +2,12 @@
Authentication modules for the SMTP server. Authentication modules for the SMTP server.
""" """
import logging
from datetime import datetime from datetime import datetime
from aiosmtpd.smtp import AuthResult, LoginPassword from aiosmtpd.smtp import AuthResult, LoginPassword
from email_server.models import Session, User, Domain, WhitelistedIP, AuthLog, check_password from email_server.models import Session, User, Domain, WhitelistedIP, AuthLog, check_password
from email_server.tool_box import get_logger
logger = logging.getLogger(__name__) logger = get_logger()
class Authenticator: class Authenticator:
"""Username/password authenticator.""" """Username/password authenticator."""
@@ -47,7 +47,7 @@ class Authenticator:
session_db.add(auth_log) session_db.add(auth_log)
session_db.commit() session_db.commit()
logger.info(f'Authenticated user: {username} from domain {domain.domain_name if domain else "unknown"}') logger.debug(f'Authenticated user: {username} from domain {domain.domain_name if domain else "unknown"}')
# Don't include the SMTP response code in the message - let aiosmtpd handle it # Don't include the SMTP response code in the message - let aiosmtpd handle it
return AuthResult(success=True, handled=True) return AuthResult(success=True, handled=True)
else: else:
@@ -94,7 +94,7 @@ class IPAuthenticator:
) )
session_db.add(auth_log) session_db.add(auth_log)
session_db.commit() session_db.commit()
logger.info(f'Authenticated via whitelist: IP {peer_ip} for {domain.domain_name}') logger.debug(f'Authenticated via whitelist: IP {peer_ip} for {domain.domain_name}')
return AuthResult(success=True, handled=True, message='Authenticated via whitelist') return AuthResult(success=True, handled=True, message='Authenticated via whitelist')
return AuthResult(success=False, handled=True, message='IP not whitelisted') return AuthResult(success=False, handled=True, message='IP not whitelisted')

View File

@@ -5,10 +5,9 @@ Command-line tools for managing the SMTP server.
import argparse import argparse
from email_server.models import Session, Domain, User, WhitelistedIP, hash_password, create_tables from email_server.models import Session, Domain, User, WhitelistedIP, hash_password, create_tables
from email_server.dkim_manager import DKIMManager from email_server.dkim_manager import DKIMManager
import logging from email_server.tool_box import get_logger
logging.basicConfig(level=logging.INFO) logger = get_logger()
logger = logging.getLogger(__name__)
def add_domain(domain_name, requires_auth=True): def add_domain(domain_name, requires_auth=True):
"""Add a new domain to the database.""" """Add a new domain to the database."""

View File

@@ -1,26 +0,0 @@
"""
Configuration settings for the SMTP server.
"""
# Server settings
SMTP_PORT = 4025
SMTP_TLS_PORT = 40587
HOSTNAME = 'localhost'
BIND_IP = '0.0.0.0'
# Database settings
DATABASE_URL = 'sqlite:///email_server/server_data/smtp_server.db'
# Logging settings
LOG_LEVEL = 'INFO'
# Email relay settings
RELAY_TIMEOUT = 10
# TLS settings
TLS_CERT_FILE = 'email_server/ssl_certs/server.crt'
TLS_KEY_FILE = 'email_server/ssl_certs/server.key'
# DKIM settings
DKIM_SELECTOR = 'default'
DKIM_KEY_SIZE = 2048

View File

@@ -2,15 +2,19 @@
DKIM key management and email signing functionality. DKIM key management and email signing functionality.
""" """
import logging
import dkim import dkim
from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric import rsa
from datetime import datetime from datetime import datetime
from email_server.models import Session, Domain, DKIMKey from email_server.models import Session, Domain, DKIMKey
from email_server.config import DKIM_SELECTOR, DKIM_KEY_SIZE from email_server.settings_loader import load_settings
from email_server.tool_box import get_logger
logger = logging.getLogger(__name__) settings = load_settings()
DKIM_SELECTOR = settings['DKIM']['DKIM_SELECTOR']
DKIM_KEY_SIZE = int(settings['DKIM']['DKIM_KEY_SIZE'])
logger = get_logger()
class DKIMManager: class DKIMManager:
"""Manages DKIM keys and email signing.""" """Manages DKIM keys and email signing."""
@@ -31,7 +35,7 @@ class DKIMManager:
# Check if DKIM key already exists # Check if DKIM key already exists
existing_key = session.query(DKIMKey).filter_by(domain_id=domain.id, is_active=True).first() existing_key = session.query(DKIMKey).filter_by(domain_id=domain.id, is_active=True).first()
if existing_key: if existing_key:
logger.info(f"DKIM key already exists for domain {domain_name}") logger.debug(f"DKIM key already exists for domain {domain_name}")
return True return True
# Generate RSA key pair # Generate RSA key pair
@@ -66,7 +70,7 @@ class DKIMManager:
session.add(dkim_key) session.add(dkim_key)
session.commit() session.commit()
logger.info(f"Generated DKIM key for domain: {domain_name}") logger.debug(f"Generated DKIM key for domain: {domain_name}")
return True return True
except Exception as e: except Exception as e:
@@ -156,7 +160,7 @@ class DKIMManager:
# Combine signature with original content # Combine signature with original content
signed_content = signature + email_bytes signed_content = signature + email_bytes
logger.info(f"Successfully signed email for domain: {domain_name}") logger.debug(f"Successfully signed email for domain: {domain_name}")
# Return as string if input was string # Return as string if input was string
if isinstance(email_content, str): if isinstance(email_content, str):
@@ -203,7 +207,7 @@ class DKIMManager:
).first() ).first()
if not existing_key: if not existing_key:
logger.info(f"Generating DKIM key for existing domain: {domain.domain_name}") logger.debug(f"Generating DKIM key for existing domain: {domain.domain_name}")
self.generate_dkim_keypair(domain.domain_name) self.generate_dkim_keypair(domain.domain_name)
except Exception as e: except Exception as e:

View File

@@ -5,11 +5,11 @@ Email relay functionality for the SMTP server.
import dns.resolver import dns.resolver
import smtplib import smtplib
import ssl import ssl
import logging
from datetime import datetime from datetime import datetime
from email_server.models import Session, EmailLog from email_server.models import Session, EmailLog
from email_server.tool_box import get_logger
logger = logging.getLogger(__name__) logger = get_logger()
class EmailRelay: class EmailRelay:
"""Handles relaying emails to recipient mail servers.""" """Handles relaying emails to recipient mail servers."""
@@ -29,7 +29,7 @@ class EmailRelay:
# Sort by priority (lower number = higher priority) # Sort by priority (lower number = higher priority)
mx_records = sorted(mx_records, key=lambda x: x.preference) mx_records = sorted(mx_records, key=lambda x: x.preference)
mx_host = mx_records[0].exchange.to_text().rstrip('.') mx_host = mx_records[0].exchange.to_text().rstrip('.')
logger.info(f'Found MX record for {domain}: {mx_host}') logger.debug(f'Found MX record for {domain}: {mx_host}')
except Exception as e: except Exception as e:
logger.error(f'Failed to resolve MX for {domain}: {e}') logger.error(f'Failed to resolve MX for {domain}: {e}')
return False return False
@@ -56,14 +56,14 @@ class EmailRelay:
# Check if server supports STARTTLS # Check if server supports STARTTLS
relay_server.ehlo() relay_server.ehlo()
if relay_server.has_extn('starttls'): if relay_server.has_extn('starttls'):
logger.info(f'Starting TLS connection to {mx_host}') logger.debug(f'Starting TLS connection to {mx_host}')
context = ssl.create_default_context() context = ssl.create_default_context()
# Allow self-signed certificates for mail servers # Allow self-signed certificates for mail servers
context.check_hostname = False context.check_hostname = False
context.verify_mode = ssl.CERT_NONE context.verify_mode = ssl.CERT_NONE
relay_server.starttls(context=context) relay_server.starttls(context=context)
relay_server.ehlo() # Say hello again after STARTTLS relay_server.ehlo() # Say hello again after STARTTLS
logger.info(f'TLS connection established to {mx_host}') logger.debug(f'TLS connection established to {mx_host}')
else: else:
logger.warning(f'Server {mx_host} does not support STARTTLS, using plain text') logger.warning(f'Server {mx_host} does not support STARTTLS, using plain text')
except Exception as tls_e: except Exception as tls_e:
@@ -71,7 +71,7 @@ class EmailRelay:
# Send the email # Send the email
relay_server.sendmail(mail_from, rcpt, content) relay_server.sendmail(mail_from, rcpt, content)
logger.info(f'Successfully relayed email to {rcpt} via {mx_host}') logger.debug(f'Successfully relayed email to {rcpt} via {mx_host}')
return True return True
except Exception as e: except Exception as e:
@@ -86,7 +86,7 @@ class EmailRelay:
# Try other MX records # Try other MX records
for mx_record in mx_records[1:3]: # Try up to 2 backup MX records for mx_record in mx_records[1:3]: # Try up to 2 backup MX records
backup_mx = mx_record.exchange.to_text().rstrip('.') backup_mx = mx_record.exchange.to_text().rstrip('.')
logger.info(f'Trying backup MX record: {backup_mx}') logger.debug(f'Trying backup MX record: {backup_mx}')
try: try:
with smtplib.SMTP(backup_mx, 25, timeout=self.timeout) as backup_server: with smtplib.SMTP(backup_mx, 25, timeout=self.timeout) as backup_server:
@@ -101,12 +101,12 @@ class EmailRelay:
context.verify_mode = ssl.CERT_NONE context.verify_mode = ssl.CERT_NONE
backup_server.starttls(context=context) backup_server.starttls(context=context)
backup_server.ehlo() backup_server.ehlo()
logger.info(f'TLS connection established to backup {backup_mx}') logger.debug(f'TLS connection established to backup {backup_mx}')
except Exception: except Exception:
logger.warning(f'STARTTLS failed with backup {backup_mx}, using plain text') logger.warning(f'STARTTLS failed with backup {backup_mx}, using plain text')
backup_server.sendmail(mail_from, rcpt, content) backup_server.sendmail(mail_from, rcpt, content)
logger.info(f'Successfully relayed email to {rcpt} via backup {backup_mx}') logger.debug(f'Successfully relayed email to {rcpt} via backup {backup_mx}')
return True return True
except Exception as backup_e: except Exception as backup_e:
logger.warning(f'Backup MX {backup_mx} also failed: {backup_e}') logger.warning(f'Backup MX {backup_mx} also failed: {backup_e}')

View File

@@ -6,8 +6,11 @@ from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, B
from sqlalchemy.orm import declarative_base, sessionmaker from sqlalchemy.orm import declarative_base, sessionmaker
from datetime import datetime from datetime import datetime
import bcrypt import bcrypt
from email_server.config import DATABASE_URL from email_server.settings_loader import load_settings
from email_server.tool_box import ensure_folder_exists from email_server.tool_box import ensure_folder_exists, get_logger
settings = load_settings()
DATABASE_URL = settings['Database']['DATABASE_URL']
ensure_folder_exists(DATABASE_URL) ensure_folder_exists(DATABASE_URL)
@@ -16,6 +19,8 @@ Base = declarative_base()
engine = create_engine(DATABASE_URL, echo=False) engine = create_engine(DATABASE_URL, echo=False)
Session = sessionmaker(bind=engine) Session = sessionmaker(bind=engine)
logger = get_logger()
class Domain(Base): class Domain(Base):
__tablename__ = 'domains' __tablename__ = 'domains'
id = Column(Integer, primary_key=True) id = Column(Integer, primary_key=True)

View File

@@ -4,10 +4,10 @@ Main server file that ties all modules together.
""" """
import asyncio import asyncio
import logging from email_server.settings_loader import load_settings
from email_server.tool_box import get_logger
# Import our modules # Import our modules
from email_server.config import SMTP_PORT, SMTP_TLS_PORT, HOSTNAME, LOG_LEVEL, BIND_IP
from email_server.models import create_tables from email_server.models import create_tables
from email_server.smtp_handler import CustomSMTPHandler, PlainController from email_server.smtp_handler import CustomSMTPHandler, PlainController
from email_server.tls_utils import generate_self_signed_cert, create_ssl_context from email_server.tls_utils import generate_self_signed_cert, create_ssl_context
@@ -15,12 +15,14 @@ from email_server.dkim_manager import DKIMManager
from aiosmtpd.controller import Controller from aiosmtpd.controller import Controller
from aiosmtpd.smtp import SMTP as AIOSMTP from aiosmtpd.smtp import SMTP as AIOSMTP
# Configure logging settings = load_settings()
logging.basicConfig( SMTP_PORT = int(settings['Server']['SMTP_PORT'])
level=getattr(logging, LOG_LEVEL), SMTP_TLS_PORT = int(settings['Server']['SMTP_TLS_PORT'])
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' HOSTNAME = settings['Server']['HOSTNAME']
) LOG_LEVEL = settings['Logging']['LOG_LEVEL']
logger = logging.getLogger(__name__) BIND_IP = settings['Server']['BIND_IP']
logger = get_logger()
# Enable asyncio debugging # Enable asyncio debugging
try: try:
@@ -32,14 +34,14 @@ except RuntimeError:
async def start_server(): async def start_server():
"""Main server function.""" """Main server function."""
logger.info("Starting SMTP Server with DKIM support...") logger.debug("Starting SMTP Server with DKIM support...")
# Initialize database # Initialize database
logger.info("Initializing database...") logger.debug("Initializing database...")
create_tables() create_tables()
# Initialize DKIM manager and generate keys for domains without them # Initialize DKIM manager and generate keys for domains without them
logger.info("Initializing DKIM manager...") logger.debug("Initializing DKIM manager...")
dkim_manager = DKIMManager() dkim_manager = DKIMManager()
dkim_manager.initialize_default_keys() dkim_manager.initialize_default_keys()
@@ -53,7 +55,7 @@ async def start_server():
domain = Domain(domain_name='example.com', requires_auth=True) domain = Domain(domain_name='example.com', requires_auth=True)
session.add(domain) session.add(domain)
session.commit() session.commit()
logger.info("Added example.com domain") logger.debug("Added example.com domain")
# Add test user if not exists # Add test user if not exists
user = session.query(User).filter_by(email='test@example.com').first() user = session.query(User).filter_by(email='test@example.com').first()
@@ -65,7 +67,7 @@ async def start_server():
) )
session.add(user) session.add(user)
session.commit() session.commit()
logger.info("Added test user: test@example.com") logger.debug("Added test user: test@example.com")
# Add whitelisted IP if not exists # Add whitelisted IP if not exists
whitelist = session.query(WhitelistedIP).filter_by(ip_address='127.0.0.1').first() whitelist = session.query(WhitelistedIP).filter_by(ip_address='127.0.0.1').first()
@@ -73,7 +75,7 @@ async def start_server():
whitelist = WhitelistedIP(ip_address='127.0.0.1', domain_id=domain.id) whitelist = WhitelistedIP(ip_address='127.0.0.1', domain_id=domain.id)
session.add(whitelist) session.add(whitelist)
session.commit() session.commit()
logger.info("Added whitelisted IP: 127.0.0.1") logger.debug("Added whitelisted IP: 127.0.0.1")
except Exception as e: except Exception as e:
session.rollback() session.rollback()
logger.error(f"Error adding test data: {e}") logger.error(f"Error adding test data: {e}")
@@ -81,7 +83,7 @@ async def start_server():
session.close() session.close()
# Generate SSL certificate if it doesn't exist # Generate SSL certificate if it doesn't exist
logger.info("Checking SSL certificates...") logger.debug("Checking SSL certificates...")
if not generate_self_signed_cert(): if not generate_self_signed_cert():
logger.error("Failed to generate SSL certificate") logger.error("Failed to generate SSL certificate")
return return
@@ -101,7 +103,7 @@ async def start_server():
port=SMTP_PORT port=SMTP_PORT
) )
controller_plain.start() controller_plain.start()
logger.info(f'Starting plain SMTP server on {HOSTNAME}:{SMTP_PORT}...') logger.debug(f'Starting plain SMTP server on {HOSTNAME}:{SMTP_PORT}...')
# Start TLS SMTP server using closure pattern like the original # Start TLS SMTP server using closure pattern like the original
handler_tls = CustomSMTPHandler() handler_tls = CustomSMTPHandler()
@@ -126,15 +128,15 @@ async def start_server():
port=SMTP_TLS_PORT port=SMTP_TLS_PORT
) )
controller_tls.start() controller_tls.start()
logger.info(f' - Plain SMTP (IP whitelist): {BIND_IP}:{SMTP_PORT}') logger.debug(f' - Plain SMTP (IP whitelist): {BIND_IP}:{SMTP_PORT}')
logger.info(f' - STARTTLS SMTP (auth required): {BIND_IP}:{SMTP_TLS_PORT}') logger.debug(f' - STARTTLS SMTP (auth required): {BIND_IP}:{SMTP_TLS_PORT}')
logger.info('Management commands:') logger.debug('Management commands:')
logger.info(' python cli_tools.py --help') logger.debug(' python cli_tools.py --help')
try: try:
await asyncio.Event().wait() await asyncio.Event().wait()
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info('Shutting down SMTP servers...') logger.debug('Shutting down SMTP servers...')
controller_plain.stop() controller_plain.stop()
controller_tls.stop() controller_tls.stop()
logger.info('SMTP servers stopped.') logger.debug('SMTP servers stopped.')

View File

@@ -0,0 +1,54 @@
"""
Settings loader for the SMTP server.
Automatically generates settings.ini with default values if not present.
"""
import configparser
from pathlib import Path
SETTINGS_PATH = Path(__file__).parent.parent / 'settings.ini'
# Default values for settings.ini
DEFAULTS = {
'Server': {
'SMTP_PORT': '4025',
'SMTP_TLS_PORT': '40587',
'HOSTNAME': 'localhost',
'BIND_IP': '0.0.0.0',
},
'Database': {
'DATABASE_URL': 'sqlite:///email_server/server_data/smtp_server.db',
},
'Logging': {
'LOG_LEVEL': 'INFO',
'hide_info_aiosmtpd': 'true',
},
'Relay': {
'RELAY_TIMEOUT': '10',
},
'TLS': {
'TLS_CERT_FILE': 'email_server/ssl_certs/server.crt',
'TLS_KEY_FILE': 'email_server/ssl_certs/server.key',
},
'DKIM': {
'DKIM_SELECTOR': 'default',
'DKIM_KEY_SIZE': '2048',
},
}
def generate_settings_ini(settings_path: Path = SETTINGS_PATH) -> None:
"""Generate settings.ini with default values if it does not exist."""
if settings_path.exists():
return
config_parser = configparser.ConfigParser()
for section, values in DEFAULTS.items():
config_parser[section] = values
with open(settings_path, 'w') as f:
config_parser.write(f)
def load_settings(settings_path: Path = SETTINGS_PATH) -> configparser.ConfigParser:
"""Load settings from settings.ini, generating it if needed."""
generate_settings_ini(settings_path)
config_parser = configparser.ConfigParser()
config_parser.read(settings_path)
return config_parser

View File

@@ -2,7 +2,6 @@
SMTP handler for processing incoming emails. SMTP handler for processing incoming emails.
""" """
import logging
import uuid import uuid
from datetime import datetime from datetime import datetime
from aiosmtpd.smtp import SMTP as AIOSMTP, AuthResult from aiosmtpd.smtp import SMTP as AIOSMTP, AuthResult
@@ -10,8 +9,9 @@ from aiosmtpd.controller import Controller
from email_server.auth import Authenticator, IPAuthenticator from email_server.auth import Authenticator, IPAuthenticator
from email_server.email_relay import EmailRelay from email_server.email_relay import EmailRelay
from email_server.dkim_manager import DKIMManager from email_server.dkim_manager import DKIMManager
from email_server.tool_box import get_logger
logger = logging.getLogger(__name__) logger = get_logger()
class CombinedAuthenticator: class CombinedAuthenticator:
"""Combined authenticator that tries username/password first, then falls back to IP whitelist.""" """Combined authenticator that tries username/password first, then falls back to IP whitelist."""
@@ -50,7 +50,7 @@ class CustomSMTPHandler:
"""Handle incoming email data.""" """Handle incoming email data."""
try: try:
message_id = str(uuid.uuid4()) message_id = str(uuid.uuid4())
logger.info(f'Received email {message_id} from {envelope.mail_from} to {envelope.rcpt_tos}') logger.debug(f'Received email {message_id} from {envelope.mail_from} to {envelope.rcpt_tos}')
# Convert content to string if it's bytes # Convert content to string if it's bytes
if isinstance(envelope.content, bytes): if isinstance(envelope.content, bytes):
@@ -69,7 +69,7 @@ class CustomSMTPHandler:
# Check if signing was successful (content changed) # Check if signing was successful (content changed)
dkim_signed = signed_content != content dkim_signed = signed_content != content
if dkim_signed: if dkim_signed:
logger.info(f'Email {message_id} signed with DKIM for domain {sender_domain}') logger.debug(f'Email {message_id} signed with DKIM for domain {sender_domain}')
# Relay the email # Relay the email
success = self.email_relay.relay_email( success = self.email_relay.relay_email(
@@ -91,7 +91,7 @@ class CustomSMTPHandler:
) )
if success: if success:
logger.info(f'Email {message_id} successfully relayed') logger.debug(f'Email {message_id} successfully relayed')
return '250 Message accepted for delivery' return '250 Message accepted for delivery'
else: else:
logger.error(f'Email {message_id} failed to relay') logger.error(f'Email {message_id} failed to relay')

View File

@@ -4,12 +4,15 @@ TLS utilities for the SMTP server.
import ssl import ssl
import os import os
import logging
from OpenSSL import crypto from OpenSSL import crypto
from email_server.config import TLS_CERT_FILE, TLS_KEY_FILE from email_server.settings_loader import load_settings
from email_server.tool_box import ensure_folder_exists from email_server.tool_box import ensure_folder_exists, get_logger
logger = logging.getLogger(__name__) settings = load_settings()
TLS_CERT_FILE = settings['TLS']['TLS_CERT_FILE']
TLS_KEY_FILE = settings['TLS']['TLS_KEY_FILE']
logger = get_logger()
ensure_folder_exists(TLS_CERT_FILE) ensure_folder_exists(TLS_CERT_FILE)
ensure_folder_exists(TLS_KEY_FILE) ensure_folder_exists(TLS_KEY_FILE)
@@ -17,11 +20,11 @@ ensure_folder_exists(TLS_KEY_FILE)
def generate_self_signed_cert(): def generate_self_signed_cert():
"""Generate self-signed SSL certificate if it doesn't exist.""" """Generate self-signed SSL certificate if it doesn't exist."""
if os.path.exists(TLS_CERT_FILE) and os.path.exists(TLS_KEY_FILE): if os.path.exists(TLS_CERT_FILE) and os.path.exists(TLS_KEY_FILE):
logger.info("SSL certificate already exists") logger.debug("SSL certificate already exists")
return True return True
try: try:
logger.info("Generating self-signed SSL certificate...") logger.debug("Generating self-signed SSL certificate...")
# Generate private key # Generate private key
k = crypto.PKey() k = crypto.PKey()
@@ -47,7 +50,7 @@ def generate_self_signed_cert():
with open(TLS_KEY_FILE, 'wb') as f: with open(TLS_KEY_FILE, 'wb') as f:
f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k)) f.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
logger.info(f"SSL certificate generated: {TLS_CERT_FILE}, {TLS_KEY_FILE}") logger.debug(f"SSL certificate generated: {TLS_CERT_FILE}, {TLS_KEY_FILE}")
return True return True
except Exception as e: except Exception as e:
@@ -60,7 +63,7 @@ def create_ssl_context():
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=TLS_CERT_FILE, keyfile=TLS_KEY_FILE) ssl_context.load_cert_chain(certfile=TLS_CERT_FILE, keyfile=TLS_KEY_FILE)
ssl_context.set_ciphers('DEFAULT') # Relax ciphers for compatibility ssl_context.set_ciphers('DEFAULT') # Relax ciphers for compatibility
logger.info('SSL context created successfully') logger.debug('SSL context created successfully')
return ssl_context return ssl_context
except Exception as e: except Exception as e:
logger.error(f'Failed to create SSL context: {e}') logger.error(f'Failed to create SSL context: {e}')

View File

@@ -3,6 +3,10 @@ Utility functions for the email server.
""" """
import os import os
import logging
from email_server.settings_loader import load_settings
settings = load_settings()
def ensure_folder_exists(filepath): def ensure_folder_exists(filepath):
""" """
@@ -11,3 +15,45 @@ def ensure_folder_exists(filepath):
if filepath.startswith("sqlite:///"): if filepath.startswith("sqlite:///"):
filepath = filepath.replace("sqlite:///", "", 1) filepath = filepath.replace("sqlite:///", "", 1)
os.makedirs(os.path.dirname(filepath), exist_ok=True) os.makedirs(os.path.dirname(filepath), exist_ok=True)
def setup_logging():
"""
Set up global logging configuration using settings.ini.
Should be called once at program entry point.
Optionally hides aiosmtpd 'mail.log' INFO logs when global logging is INFO based on settings.
"""
log_level = getattr(logging, settings['Logging']['LOG_LEVEL'], logging.INFO)
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
if not logging.getLogger().hasHandlers():
logging.basicConfig(level=log_level, format=log_format)
else:
logging.getLogger().setLevel(log_level)
# Hide aiosmtpd INFO logs if configured `hide_info_aiosmtpd = true`
hide_info_aiosmtpd = settings['Logging'].get('hide_info_aiosmtpd', 'true').lower() == 'true'
if hide_info_aiosmtpd and log_level == logging.INFO:
# Set aiosmtpd mail.log to WARNING level to hide INFO logs
logging.getLogger('mail.log').setLevel(logging.WARNING)
def get_logger(name=None):
"""
Get a logger with the given name (default: module name).
Ensures logging is set up before returning the logger.
"""
setup_logging()
if name is None:
# Get the caller's file name
import inspect
frame = inspect.currentframe()
# Go back one frame to the caller
caller_frame = frame.f_back
filename = caller_frame.f_globals.get('__file__', None)
if filename:
base = os.path.basename(filename)
name, ext = os.path.splitext(base)
name = name if ext == '.py' else base
else:
name = '__main__'
return logging.getLogger(name)

View File

@@ -1,13 +1,13 @@
from email_server.server_runner import start_server from email_server.server_runner import start_server
from email_server.tool_box import get_logger
import asyncio import asyncio
import sys import sys
import logging
logging.basicConfig(level=logging.INFO) logger = get_logger()
logger = logging.getLogger(__name__)
if __name__ == '__main__': if __name__ == '__main__':
try: try:
logger.info('Server started')
asyncio.run(start_server()) asyncio.run(start_server())
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info('Server interrupted by user') logger.info('Server interrupted by user')

BIN
tests/Hello.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 76 KiB

68
tests/bash_send_email.sh Normal file
View File

@@ -0,0 +1,68 @@
#!/bin/bash
sender="test@example.com"
receiver="info@example.com"
password="testpass123"
domain="example.com"
body_content_file="@tests/email_body.txt"
SMTP_PORT=4025
SMTP_TLS_PORT=40587
cc_recipient="targetcc@example.com"
bcc_recipient="targetbcc@example.com"
<<com
python -m email_server.cli_tools add-domain $domain
python -m email_server.cli_tools add-user $sender $password $domain
python -m email_server.cli_tools add-ip 127.0.0.1 $domain
python -m email_server.cli_tools add-ip 10.100.111.1 $domain
python -m email_server.cli_tools generate-dkim $domain
# options to add CC and BCC recipients for swaks
--cc $cc_recipient
--bcc $bcc_recipient
com
swaks --to $receiver \
--from $sender \
--server localhost \
--port $SMTP_TLS_PORT \
--auth LOGIN \
--auth-user $sender \
--auth-password $password \
--tls \
--header "Subject: TLS - Large body email" \
--body $body_content_file \
--attach tests/email_body.txt \
--attach tests/Hello.jpg
swaks --to $receiver \
--from $sender \
--server localhost \
--port $SMTP_PORT \
--auth LOGIN \
--auth-user $sender \
--auth-password $password \
--data "Subject: Test Email - authenticated\n\nThis is the message body."
swaks --to $receiver \
--from $sender \
--server localhost \
--port $SMTP_TLS_PORT \
--auth LOGIN \
--auth-user $sender \
--auth-password $password \
--tls \
--data "Subject: Test via STARTTLS - authenticated\n\nThis is the body."
swaks --to $receiver \
--from $sender \
--server localhost \
--port $SMTP_TLS_PORT \
--tls \
--data "Subject: Test via STARTTLS - no auth\n\nThis is the body."
com
swaks --to $receiver \
--from $sender \
--server localhost \
--port $SMTP_PORT \
--data "Subject: Test Email - no auth\n\nThis is the message body."