360 lines
14 KiB
Python
360 lines
14 KiB
Python
"""
|
|
Database models for the SMTP server using ESRV schema.
|
|
|
|
Enhanced security features:
|
|
- Users can only send as their own email or domain emails (if permitted)
|
|
- IP authentication is domain-specific
|
|
- All tables use 'esrv_' prefix for namespace isolation
|
|
"""
|
|
|
|
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text, Boolean, ForeignKey
|
|
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
|
|
from sqlalchemy.sql import func
|
|
from datetime import datetime
|
|
import bcrypt
|
|
from email_server.settings_loader import load_settings
|
|
from email_server.tool_box import ensure_folder_exists, get_logger
|
|
|
|
settings = load_settings()
|
|
# ConfigParser keys are case-insensitive, so we can use either case
|
|
DATABASE_URL = settings['Database']['database_url']
|
|
|
|
ensure_folder_exists(DATABASE_URL)
|
|
|
|
# SQLAlchemy setup
|
|
Base = declarative_base()
|
|
engine = create_engine(DATABASE_URL, echo=False)
|
|
Session = sessionmaker(bind=engine)
|
|
|
|
logger = get_logger()
|
|
|
|
class Domain(Base):
|
|
"""Domain model with enhanced security features."""
|
|
__tablename__ = 'esrv_domains'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
domain_name = Column(String, unique=True, nullable=False)
|
|
is_active = Column(Boolean, default=True)
|
|
created_at = Column(DateTime, default=func.now())
|
|
|
|
# Add relationships with proper foreign key references
|
|
senders = relationship("Sender", backref="domain", lazy="joined")
|
|
dkim_keys = relationship("DKIMKey", backref="domain", lazy="joined")
|
|
whitelisted_ips = relationship("WhitelistedIP", backref="domain", lazy="joined")
|
|
custom_headers = relationship("CustomHeader", backref="domain", lazy="joined")
|
|
|
|
def __repr__(self):
|
|
return f"<Domain(id={self.id}, domain_name='{self.domain_name}', active={self.is_active})>"
|
|
|
|
class Sender(Base):
|
|
"""
|
|
Sender model with enhanced authentication controls.
|
|
|
|
Security features:
|
|
- can_send_as_domain: If True, sender can send as any email from their domain
|
|
- If False, sender can only send as their own email address
|
|
"""
|
|
__tablename__ = 'esrv_senders'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
email = Column(String, unique=True, nullable=False)
|
|
password_hash = Column(String, nullable=False)
|
|
domain_id = Column(Integer, ForeignKey('esrv_domains.id'), nullable=False)
|
|
can_send_as_domain = Column(Boolean, default=False)
|
|
is_active = Column(Boolean, default=True)
|
|
created_at = Column(DateTime, default=func.now())
|
|
store_message_content = Column(Boolean, default=False) # Store message body/attachments
|
|
|
|
def can_send_as(self, from_address: str) -> bool:
|
|
"""
|
|
Check if this sender can send emails as the given from_address.
|
|
|
|
Args:
|
|
from_address: The email address the sender wants to send from
|
|
Returns:
|
|
True if sender is allowed to send as this address
|
|
"""
|
|
# Sender can always send as their own email
|
|
if from_address.lower() == self.email.lower():
|
|
return True
|
|
# If sender has domain privileges, check if from_address is from same domain
|
|
if self.can_send_as_domain:
|
|
sender_domain = self.email.split('@')[1].lower()
|
|
from_domain = from_address.split('@')[1].lower() if '@' in from_address else ''
|
|
return sender_domain == from_domain
|
|
return False
|
|
|
|
def __repr__(self):
|
|
return f"<Sender(id={self.id}, email='{self.email}', domain_id={self.domain_id}, can_send_as_domain={self.can_send_as_domain})>"
|
|
|
|
class WhitelistedIP(Base):
|
|
"""
|
|
IP whitelist model with domain-specific authentication.
|
|
|
|
Security feature:
|
|
- IPs can only send emails for their specific authorized domain
|
|
"""
|
|
__tablename__ = 'esrv_whitelisted_ips'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
ip_address = Column(String, nullable=False)
|
|
domain_id = Column(Integer, ForeignKey('esrv_domains.id'), nullable=False)
|
|
is_active = Column(Boolean, default=True)
|
|
created_at = Column(DateTime, default=func.now())
|
|
store_message_content = Column(Boolean, default=False) # Store message body/attachments
|
|
|
|
def can_send_for_domain(self, domain_name: str) -> bool:
|
|
"""
|
|
Check if this IP can send emails for the given domain.
|
|
|
|
Args:
|
|
domain_name: The domain name to check
|
|
|
|
Returns:
|
|
True if IP is authorized for this domain
|
|
"""
|
|
if not self.is_active:
|
|
return False
|
|
|
|
# Need to check against the actual domain
|
|
session = Session()
|
|
try:
|
|
domain = session.query(Domain).filter_by(
|
|
domain_name=domain_name.lower(),
|
|
is_active=True
|
|
).first()
|
|
return domain and domain.id == self.domain_id
|
|
finally:
|
|
session.close()
|
|
|
|
def __repr__(self):
|
|
return f"<WhitelistedIP(id={self.id}, ip='{self.ip_address}', domain_id={self.domain_id})>"
|
|
|
|
class EmailLog(Base):
|
|
"""Email log model for tracking sent emails."""
|
|
__tablename__ = 'esrv_email_logs'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
message_id = Column(String, unique=True, nullable=False)
|
|
timestamp = Column(DateTime, nullable=False)
|
|
peer_ip = Column(String, nullable=False) # Store only IP address
|
|
mail_from = Column(String, nullable=False)
|
|
to_address = Column(String, nullable=False, server_default='')
|
|
cc_addresses = Column(String, nullable=True, server_default='') # Comma-separated CC
|
|
bcc_addresses = Column(String, nullable=True, server_default='') # Comma-separated BCC
|
|
subject = Column(Text, nullable=True)
|
|
email_headers = Column(Text, nullable=False) # Store only email headers
|
|
message_body = Column(Text, nullable=True) # Store actual message content
|
|
status = Column(String, nullable=False)
|
|
dkim_signed = Column(Boolean, default=False)
|
|
username = Column(String, nullable=True) # Authenticated username
|
|
created_at = Column(DateTime, default=func.now())
|
|
|
|
recipients = relationship("EmailRecipientLog", back_populates="email_log", cascade="all, delete-orphan")
|
|
attachments = relationship("EmailAttachment", back_populates="email_log", cascade="all, delete-orphan")
|
|
|
|
def __repr__(self):
|
|
return f"<EmailLog(id={self.id}, message_id='{self.message_id}', from='{self.mail_from}', to='{self.to_address}', status='{self.status}')>"
|
|
|
|
class EmailRecipientLog(Base):
|
|
"""Log for each recipient of an email, including status and error details."""
|
|
__tablename__ = 'esrv_email_recipient_logs'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
email_log_id = Column(Integer, ForeignKey('esrv_email_logs.id'), nullable=False)
|
|
recipient = Column(String, nullable=False)
|
|
recipient_type = Column(String, nullable=False) # 'to', 'cc', 'bcc'
|
|
status = Column(String, nullable=False) # 'success', 'failed', etc.
|
|
error_code = Column(String, nullable=True)
|
|
error_message = Column(Text, nullable=True)
|
|
server_response = Column(Text, nullable=True)
|
|
|
|
email_log = relationship("EmailLog", back_populates="recipients")
|
|
|
|
def __repr__(self):
|
|
return f"<EmailRecipientLog(id={self.id}, recipient='{self.recipient}', type='{self.recipient_type}', status='{self.status}')>"
|
|
|
|
class AuthLog(Base):
|
|
"""Authentication log model for security auditing."""
|
|
__tablename__ = 'esrv_auth_logs'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
auth_type = Column(String, nullable=False) # 'user' or 'ip'
|
|
identifier = Column(String, nullable=False) # email or IP address
|
|
ip_address = Column(String)
|
|
success = Column(Boolean, nullable=False)
|
|
message = Column(Text)
|
|
created_at = Column(DateTime, default=func.now())
|
|
|
|
def __repr__(self):
|
|
return f"<AuthLog(id={self.id}, type='{self.auth_type}', identifier='{self.identifier}', success={self.success})>"
|
|
|
|
class DKIMKey(Base):
|
|
"""DKIM key model for email signing."""
|
|
__tablename__ = 'esrv_dkim_keys'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
domain_id = Column(Integer, ForeignKey('esrv_domains.id'), nullable=False)
|
|
selector = Column(String, nullable=False, default='default')
|
|
private_key = Column(Text, nullable=False)
|
|
public_key = Column(Text, nullable=False)
|
|
is_active = Column(Boolean, default=True)
|
|
created_at = Column(DateTime, default=func.now())
|
|
replaced_at = Column(DateTime, nullable=True) # When this key was replaced by a new one
|
|
|
|
def __repr__(self):
|
|
return f"<DKIMKey(id={self.id}, domain_id={self.domain_id}, selector='{self.selector}', active={self.is_active})>"
|
|
|
|
class CustomHeader(Base):
|
|
"""Custom header model for domain-specific email headers."""
|
|
__tablename__ = 'esrv_custom_headers'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
domain_id = Column(Integer, ForeignKey('esrv_domains.id'), nullable=False)
|
|
header_name = Column(String, nullable=False)
|
|
header_value = Column(String, nullable=False)
|
|
is_active = Column(Boolean, default=True)
|
|
created_at = Column(DateTime, default=func.now())
|
|
|
|
def __repr__(self):
|
|
return f"<CustomHeader(id={self.id}, domain_id={self.domain_id}, header='{self.header_name}: {self.header_value}', active={self.is_active})>"
|
|
|
|
class EmailAttachment(Base):
|
|
"""Attachment metadata and file path, linked to EmailLog."""
|
|
__tablename__ = 'esrv_email_attachments'
|
|
|
|
id = Column(Integer, primary_key=True)
|
|
email_log_id = Column(Integer, ForeignKey('esrv_email_logs.id'), nullable=False)
|
|
filename = Column(String, nullable=False)
|
|
content_type = Column(String, nullable=True)
|
|
file_path = Column(String, nullable=False) # Path on disk
|
|
size = Column(Integer, nullable=True)
|
|
uploaded_at = Column(DateTime, default=func.now())
|
|
|
|
email_log = relationship("EmailLog", back_populates="attachments")
|
|
|
|
def __repr__(self):
|
|
return f"<EmailAttachment(id={self.id}, filename='{self.filename}', file_path='{self.file_path}')>"
|
|
|
|
|
|
def create_tables():
|
|
"""Create all database tables using ESRV schema."""
|
|
Base.metadata.create_all(engine)
|
|
logger.info("Created ESRV database tables")
|
|
|
|
def hash_password(password: str) -> str:
|
|
"""Hash a password using bcrypt."""
|
|
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
|
|
|
|
def check_password(password: str, hashed: str) -> bool:
|
|
"""Check a password against its hash."""
|
|
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
|
|
|
|
def log_auth_attempt(auth_type: str, identifier: str, ip_address: str,
|
|
success: bool, message: str) -> None:
|
|
"""
|
|
Log an authentication attempt for security auditing.
|
|
|
|
Args:
|
|
auth_type: Type of auth ('user' or 'ip')
|
|
identifier: User email or IP address
|
|
ip_address: Client IP address
|
|
success: Whether auth was successful
|
|
message: Additional details
|
|
"""
|
|
session = Session()
|
|
try:
|
|
auth_log = AuthLog(
|
|
auth_type=auth_type,
|
|
identifier=identifier,
|
|
ip_address=ip_address,
|
|
success=success,
|
|
message=message
|
|
)
|
|
session.add(auth_log)
|
|
session.commit()
|
|
logger.info(f"Auth log: {auth_type} {identifier} from {ip_address} - {'SUCCESS' if success else 'FAILED'}")
|
|
except Exception as e:
|
|
session.rollback()
|
|
logger.error(f"Failed to log auth attempt: {e}")
|
|
finally:
|
|
session.close()
|
|
|
|
def log_email(from_address: str, to_address: str, subject: str,
|
|
status: str, message: str = None) -> None:
|
|
"""
|
|
Log an email send attempt.
|
|
|
|
Args:
|
|
from_address: Sender email
|
|
to_address: Recipient email
|
|
subject: Email subject
|
|
status: Send status ('sent', 'failed', etc.)
|
|
message: Additional details
|
|
"""
|
|
session = Session()
|
|
try:
|
|
email_log = EmailLog(
|
|
from_address=from_address,
|
|
to_address=to_address,
|
|
subject=subject,
|
|
status=status,
|
|
message=message
|
|
)
|
|
session.add(email_log)
|
|
session.commit()
|
|
logger.info(f"Email log: {from_address} -> {to_address} - {status}")
|
|
except Exception as e:
|
|
session.rollback()
|
|
logger.error(f"Failed to log email: {e}")
|
|
finally:
|
|
session.close()
|
|
|
|
def get_sender_by_email(email: str):
|
|
"""Get sender by email address."""
|
|
session = Session()
|
|
try:
|
|
return session.query(Sender).filter_by(email=email.lower(), is_active=True).first()
|
|
finally:
|
|
session.close()
|
|
|
|
def get_domain_by_name(domain_name: str):
|
|
"""Get domain by name."""
|
|
session = Session()
|
|
try:
|
|
return session.query(Domain).filter_by(domain_name=domain_name.lower(), is_active=True).first()
|
|
finally:
|
|
session.close()
|
|
|
|
def get_whitelisted_ip(ip_address: str, domain_name: str = None):
|
|
"""
|
|
Get whitelisted IP, optionally filtered by domain.
|
|
|
|
Args:
|
|
ip_address: IP address to check
|
|
domain_name: Optional domain name to restrict to
|
|
|
|
Returns:
|
|
WhitelistedIP object if found and authorized for domain
|
|
"""
|
|
session = Session()
|
|
try:
|
|
query = session.query(WhitelistedIP).filter_by(
|
|
ip_address=ip_address,
|
|
is_active=True
|
|
)
|
|
|
|
if domain_name:
|
|
# Join with domain to check authorization
|
|
domain = session.query(Domain).filter_by(
|
|
domain_name=domain_name.lower(),
|
|
is_active=True
|
|
).first()
|
|
if not domain:
|
|
return None
|
|
query = query.filter_by(domain_id=domain.id)
|
|
|
|
return query.first()
|
|
finally:
|
|
session.close()
|