PyMTA initial upload

This commit is contained in:
nahakubuilde
2025-05-30 06:15:06 +01:00
commit bed5477a19
15 changed files with 2061 additions and 0 deletions

85
tests/send_email.py Normal file
View File

@@ -0,0 +1,85 @@
import smtplib
from email.mime.text import MIMEText
import ssl
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--port', type=int, default=4025)
parser.add_argument('--porttls', type=int, default=40587)
parser.add_argument('--recipient', type=str, default="test@target-email.com")
args = parser.parse_args()
recipient = args.recipient
username='test@example.com'
password='testpass123'
def send_test_email(port=args.port, smtpserver='localhost', use_auth=False,
username=None, password=None, subject='SMTP Authentication Test',
recipient=args.recipient):
"""Send a test email using plain SMTP."""
msg = MIMEText('This is a test email.')
msg['Subject'] = subject
msg['From'] = username
msg['To'] = recipient
try:
with smtplib.SMTP(smtpserver, port, timeout=5) as server:
if use_auth:
server.login(username, password)
server.send_message(msg)
print(f'Email sent {subject}!')
except Exception as e:
print(f'Error sending email: {e}')
def send_test_email_tls(port=args.porttls, smtpserver='localhost', use_auth=False,
username=None, password=None, subject='StarTTLS Email Test',
recipient=args.recipient):
"""Send a test email using STARTTLS."""
msg = MIMEText('This is a test email over TLS.')
msg['Subject'] = subject
msg['From'] = username
msg['To'] = recipient
context = ssl._create_unverified_context()
server = None
try:
server = smtplib.SMTP(smtpserver, port, timeout=5)
#server.set_debuglevel(2)
server.ehlo()
if server.has_extn('STARTTLS'):
server.starttls(context=context)
server.ehlo()
else:
print("Server does not support STARTTLS")
return
if use_auth:
server.login(username, password)
server.send_message(msg)
print(f'Email sent {subject}!')
else:
server.send_message(msg)
print(f'Email sent {subject}!')
except Exception as e:
print(f"Error sending TLS email: {e}")
finally:
if server:
try:
server.quit()
except:
pass
if __name__ == '__main__':
# SMTP Authenticated test:
send_test_email(port=args.port, use_auth=True, username=username, password=password, recipient=recipient, subject='SMTP Authenticated Email Test')
# SMTP IP Whitelisted test:
send_test_email(port=args.port, username=username, recipient=recipient, subject='SMTP IP Whitelisted Test')
# SMTP Bad Credentials test:
send_test_email(port=args.port, use_auth=True, username='usser@example.com', password='badpasssw', recipient=recipient, subject='SMTP Bad Credentials')
# TLS IP Whitelisted test:
send_test_email_tls(port=args.porttls, username=username, recipient=recipient, subject='StarTTLS IP Whitelisted Test')
# TLS Authenticated test:
send_test_email_tls(port=args.porttls, use_auth=True, username=username, password=password, recipient=recipient, subject='StarTTLS Authenticated Email Test')
# TLS Bad Credentials test:
send_test_email_tls(port=args.porttls, use_auth=True, username='usser@example.com', password='badpasssw', recipient=recipient,subject='StarTTLS Bad Credentials Test')

53
tests/test_debug_db.py Normal file
View File

@@ -0,0 +1,53 @@
"""
Debug script to test database operations.
"""
import sys
import os
import sqlite3
# Add current directory to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
print("Testing database operations...")
# Test direct SQLite connection
try:
conn = sqlite3.connect('smtp_server.db')
cursor = conn.cursor()
# Check tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print(f"Tables in database: {[table[0] for table in tables]}")
# Check domains
cursor.execute("SELECT * FROM domains;")
domains = cursor.fetchall()
print(f"Domains: {domains}")
conn.close()
print("Direct SQLite test successful")
except Exception as e:
print(f"Direct SQLite test failed: {e}")
# Test SQLAlchemy models
try:
from models import Session, Domain, User, WhitelistedIP, create_tables
print("Models imported successfully")
# Create session
session = Session()
# Check domains
domains = session.query(Domain).all()
print(f"SQLAlchemy domains: {[(d.id, d.domain_name) for d in domains]}")
session.close()
print("SQLAlchemy test successful")
except Exception as e:
print(f"SQLAlchemy test failed: {e}")
import traceback
traceback.print_exc()