Files
mailgosend/internal/smtp/queue.go
T
2026-05-24 17:15:48 +00:00

298 lines
8.5 KiB
Go

package smtp
import (
"bytes"
"context"
"fmt"
"log"
"mime/multipart"
"net/textproto"
"strings"
"time"
"ghb.freebede.com/nahakubuilder/mailgosend/internal/crypto"
"ghb.freebede.com/nahakubuilder/mailgosend/internal/delivery"
"ghb.freebede.com/nahakubuilder/mailgosend/internal/models"
"ghb.freebede.com/nahakubuilder/mailgosend/internal/storage"
)
// QueueWorker polls the delivery queue and dispatches messages.
type QueueWorker struct {
deps *Deps
interval time.Duration // how often to poll (default 30s)
}
// NewQueueWorker creates a QueueWorker backed by the given deps.
func NewQueueWorker(deps *Deps) *QueueWorker {
return &QueueWorker{
deps: deps,
interval: 30 * time.Second,
}
}
// Run polls the queue until stopCh is closed.
func (w *QueueWorker) Run(stopCh <-chan struct{}) {
log.Println("[queue] worker started")
ticker := time.NewTicker(w.interval)
defer ticker.Stop()
// Drain immediately on start.
w.drainQueue()
for {
select {
case <-stopCh:
log.Println("[queue] worker stopped")
return
case <-ticker.C:
w.drainQueue()
}
}
}
// drainQueue fetches all due entries and delivers them.
func (w *QueueWorker) drainQueue() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
entries, err := w.deps.DB.PeekQueue(ctx, 100)
if err != nil {
log.Printf("[queue] peek error: %v", err)
return
}
if len(entries) == 0 {
return
}
log.Printf("[queue] %d entries ready for delivery", len(entries))
// Derive the global queue decryption key once.
queueKey, err := w.deps.Crypt.DeriveKeyGlobal("queue")
if err != nil {
log.Printf("[queue] derive key: %v", err)
return
}
for _, entry := range entries {
// Mark in-progress to prevent parallel workers picking the same entry.
nextAttempt := time.Now().Add(5 * time.Minute) // safety: reset on success
if err := w.deps.DB.SetQueueStatus(ctx, entry.ID, "sending", "", &nextAttempt); err != nil {
log.Printf("[queue] mark sending %d: %v", entry.ID, err)
continue
}
raw, err := crypto.Decrypt(queueKey, entry.RawEnc)
if err != nil {
log.Printf("[queue] decrypt %d: %v", entry.ID, err)
w.markFailed(ctx, entry.ID, entry.FromAddr, entry.ToAddr, "decrypt error: "+err.Error(), true)
continue
}
ehlo := w.deps.Cfg.SMTPHostname
if ehlo == "" {
ehlo = w.deps.Cfg.Hostname
}
result := delivery.Deliver(ctx, ehlo, entry.FromAddr, entry.ToAddr, raw)
if result.SMTPCode == 250 {
// Success.
if err := w.deps.DB.SetQueueStatus(ctx, entry.ID, "delivered", "delivered", nil); err != nil {
log.Printf("[queue] mark delivered %d: %v", entry.ID, err)
}
w.deps.DB.LogDelivery(ctx, entry.ID, //nolint:errcheck
entry.FromAddr, entry.ToAddr,
"delivered", result.SMTPCode, result.Message, result.MXHost)
log.Printf("[queue] delivered %d: %s → %s via %s", entry.ID, entry.FromAddr, entry.ToAddr, result.MXHost)
continue
}
// Failure.
w.markFailed(ctx, entry.ID, entry.FromAddr, entry.ToAddr, result.Message, result.Perm)
}
}
// markFailed updates queue status with exponential back-off or marks permanent failure.
// On permanent failure with a non-empty sender, generates a DSN (RFC 3464) bounce.
func (w *QueueWorker) markFailed(ctx context.Context, queueID int64, from, to, errMsg string, perm bool) {
status := "failed"
if perm {
status = "bounced"
}
var nextAttempt *time.Time
if !perm {
// Exponential back-off: 5m, 15m, 1h, 4h, 8h (then give up per expires_at).
backoff := w.nextBackoff(ctx, queueID)
t := time.Now().Add(backoff)
nextAttempt = &t
}
if err := w.deps.DB.SetQueueStatus(ctx, queueID, status, errMsg, nextAttempt); err != nil {
log.Printf("[queue] update status %d: %v", queueID, err)
}
w.deps.DB.LogDelivery(ctx, queueID, from, to, status, 0, errMsg, "") //nolint:errcheck
log.Printf("[queue] %s %d → %s: %s", status, queueID, to, errMsg)
// Generate DSN bounce for permanent failures only, never bounce a bounce
// (null sender <> = already a DSN).
if perm && from != "" && from != "<>" {
w.sendDSN(ctx, from, to, errMsg)
}
}
// sendDSN delivers a Delivery Status Notification (RFC 3464) to the original sender.
// Failures here are logged but not re-queued to avoid bounce loops.
func (w *QueueWorker) sendDSN(ctx context.Context, originalFrom, failedTo, reason string) {
sender := strings.ToLower(originalFrom)
// Determine if original sender is a local user.
user, err := w.deps.DB.GetUserByEmail(ctx, sender)
if err != nil {
log.Printf("[queue] dsn lookup sender %s: %v", sender, err)
return
}
if user == nil || !user.Enabled {
// Sender is remote — attempt external SMTP delivery of DSN.
dsnRaw, buildErr := buildDSN(w.deps.Cfg.Hostname, failedTo, reason)
if buildErr != nil {
log.Printf("[queue] dsn build: %v", buildErr)
return
}
ehlo := w.deps.Cfg.SMTPHostname
if ehlo == "" {
ehlo = w.deps.Cfg.Hostname
}
result := delivery.Deliver(ctx, ehlo, "", sender, dsnRaw)
if result.SMTPCode != 250 {
log.Printf("[queue] dsn delivery to %s failed: %s", sender, result.Message)
}
return
}
// Local sender — save DSN directly to INBOX.
inbox, err := w.deps.DB.GetMailboxByType(ctx, user.ID, models.MailboxInbox)
if err != nil || inbox == nil {
log.Printf("[queue] dsn inbox for %s: %v", sender, err)
return
}
dsnRaw, err := buildDSN(w.deps.Cfg.Hostname, failedTo, reason)
if err != nil {
log.Printf("[queue] dsn build: %v", err)
return
}
hostname := w.deps.Cfg.Hostname
if hostname == "" {
hostname = "localhost"
}
msg := &storage.IncomingMessage{
Raw: dsnRaw,
FromEmail: "",
Subject: "Delivery Status Notification: failed to deliver to " + failedTo,
Date: time.Now().UTC(),
MessageID: fmt.Sprintf("<dsn-%d@%s>", time.Now().UnixNano(), hostname),
}
if _, err := w.deps.Store.SaveIncoming(ctx, user.ID, inbox.ID, msg); err != nil {
log.Printf("[queue] dsn save: %v", err)
}
}
// sanitizeDSNField strips CR and LF from a DSN field to prevent header injection.
func sanitizeDSNField(s string) string {
return strings.Map(func(r rune) rune {
if r == '\r' || r == '\n' {
return -1
}
return r
}, s)
}
// buildDSN constructs a minimal RFC 3464 multipart/report message.
func buildDSN(hostname, failedTo, reason string) ([]byte, error) {
if hostname == "" {
hostname = "localhost"
}
failedTo = sanitizeDSNField(failedTo)
reason = sanitizeDSNField(reason)
now := time.Now().UTC()
msgID := fmt.Sprintf("<dsn-%d@%s>", now.UnixNano(), hostname)
var buf bytes.Buffer
mw := multipart.NewWriter(&buf)
boundary := mw.Boundary()
// Outer headers.
header := fmt.Sprintf(
"From: Mail Delivery Subsystem <mailer-daemon@%s>\r\n"+
"To: <%s>\r\n"+
"Subject: Delivery Status Notification (Failure)\r\n"+
"Date: %s\r\n"+
"Message-ID: %s\r\n"+
"MIME-Version: 1.0\r\n"+
"Content-Type: multipart/report; report-type=delivery-status; boundary=%q\r\n"+
"\r\n",
hostname, failedTo,
now.Format("Mon, 02 Jan 2006 15:04:05 -0700"),
msgID, boundary,
)
buf.WriteString(header)
// Part 1: human-readable explanation.
ph := make(textproto.MIMEHeader)
ph.Set("Content-Type", "text/plain; charset=utf-8")
pw, err := mw.CreatePart(ph)
if err != nil {
return nil, err
}
fmt.Fprintf(pw,
"Your message could not be delivered to the following recipient:\r\n\r\n"+
" Recipient: %s\r\n"+
" Reason: %s\r\n\r\n"+
"This is a permanent error. The message has not been delivered and will not be retried.\r\n",
failedTo, reason)
// Part 2: machine-readable delivery-status (RFC 3464).
sh := make(textproto.MIMEHeader)
sh.Set("Content-Type", "message/delivery-status")
sw, err := mw.CreatePart(sh)
if err != nil {
return nil, err
}
fmt.Fprintf(sw,
"Reporting-MTA: dns; %s\r\n\r\n"+
"Final-Recipient: rfc822; %s\r\n"+
"Action: failed\r\n"+
"Status: 5.0.0\r\n"+
"Diagnostic-Code: smtp; %s\r\n",
hostname, failedTo, reason)
if err := mw.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// nextBackoff returns the back-off duration based on attempt count using
// the configured schedule (or a default).
func (w *QueueWorker) nextBackoff(ctx context.Context, queueID int64) time.Duration {
var attempts int
_ = w.deps.DB.SQL().QueryRowContext(ctx,
"SELECT attempts FROM queue WHERE id=?", queueID).Scan(&attempts)
schedule := w.deps.Cfg.QueueRetryMins
if len(schedule) == 0 {
// Default: 5m, 15m, 60m, 240m, 480m
schedule = []int{5, 15, 60, 240, 480}
}
idx := attempts
if idx >= len(schedule) {
idx = len(schedule) - 1
}
return time.Duration(schedule[idx]) * time.Minute
}