diff --git a/internal/db/db.go b/internal/db/db.go index 7f49bdb..8ef9552 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -174,11 +174,30 @@ func (d *DB) Migrate() error { `ALTER TABLE folders ADD COLUMN sync_enabled INTEGER NOT NULL DEFAULT 1`, // Plaintext search index column — stores decrypted subject+from+preview for LIKE search. `ALTER TABLE messages ADD COLUMN search_text TEXT NOT NULL DEFAULT ''`, + // Per-folder IMAP sync state for incremental/delta sync. + `ALTER TABLE folders ADD COLUMN uid_validity INTEGER NOT NULL DEFAULT 0`, + `ALTER TABLE folders ADD COLUMN last_seen_uid INTEGER NOT NULL DEFAULT 0`, } for _, stmt := range alterStmts { d.sql.Exec(stmt) // ignore "duplicate column" errors intentionally } + // Pending IMAP operations queue — survives server restarts. + // op_type: "delete" | "move" | "flag_read" | "flag_star" + _, err := d.sql.Exec(`CREATE TABLE IF NOT EXISTS pending_imap_ops ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + account_id INTEGER NOT NULL REFERENCES email_accounts(id) ON DELETE CASCADE, + op_type TEXT NOT NULL, + remote_uid INTEGER NOT NULL, + folder_path TEXT NOT NULL DEFAULT '', + extra TEXT NOT NULL DEFAULT '', + attempts INTEGER NOT NULL DEFAULT 0, + created_at DATETIME DEFAULT (datetime('now')) + )`) + if err != nil { + return fmt.Errorf("create pending_imap_ops: %w", err) + } + // Bootstrap admin account if no users exist return d.bootstrapAdmin() } @@ -1279,3 +1298,170 @@ func (d *DB) ListStarredMessages(userID int64, page, pageSize int) (*models.Page HasMore: offset+len(summaries) < total, }, nil } + +// ---- Pending IMAP ops queue ---- + +// PendingIMAPOp represents an IMAP write operation that needs to be applied to the server. +type PendingIMAPOp struct { + ID int64 + AccountID int64 + OpType string // "delete" | "move" | "flag_read" | "flag_star" + RemoteUID uint32 + FolderPath string + Extra string // for move: dest folder path; for flag_*: "1" or "0" + Attempts int +} + +// EnqueueIMAPOp adds an operation to the pending queue atomically. +func (d *DB) EnqueueIMAPOp(op *PendingIMAPOp) error { + _, err := d.sql.Exec( + `INSERT INTO pending_imap_ops (account_id, op_type, remote_uid, folder_path, extra) VALUES (?,?,?,?,?)`, + op.AccountID, op.OpType, op.RemoteUID, op.FolderPath, op.Extra, + ) + return err +} + +// DequeuePendingOps returns up to `limit` pending ops for a given account. +func (d *DB) DequeuePendingOps(accountID int64, limit int) ([]*PendingIMAPOp, error) { + rows, err := d.sql.Query( + `SELECT id, account_id, op_type, remote_uid, folder_path, extra, attempts + FROM pending_imap_ops WHERE account_id=? ORDER BY id ASC LIMIT ?`, + accountID, limit, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var ops []*PendingIMAPOp + for rows.Next() { + op := &PendingIMAPOp{} + rows.Scan(&op.ID, &op.AccountID, &op.OpType, &op.RemoteUID, &op.FolderPath, &op.Extra, &op.Attempts) + ops = append(ops, op) + } + return ops, rows.Err() +} + +// DeletePendingOp removes a successfully applied op. +func (d *DB) DeletePendingOp(id int64) error { + _, err := d.sql.Exec(`DELETE FROM pending_imap_ops WHERE id=?`, id) + return err +} + +// IncrementPendingOpAttempts bumps attempt count; ops with >5 attempts are abandoned. +func (d *DB) IncrementPendingOpAttempts(id int64) { + d.sql.Exec(`UPDATE pending_imap_ops SET attempts=attempts+1 WHERE id=?`, id) + d.sql.Exec(`DELETE FROM pending_imap_ops WHERE id=? AND attempts>5`, id) +} + +// CountPendingOps returns number of queued ops for an account (for logging). +func (d *DB) CountPendingOps(accountID int64) int { + var n int + d.sql.QueryRow(`SELECT COUNT(*) FROM pending_imap_ops WHERE account_id=?`, accountID).Scan(&n) + return n +} + +// ---- Folder delta-sync state ---- + +// GetFolderSyncState returns uid_validity and last_seen_uid for incremental sync. +func (d *DB) GetFolderSyncState(folderID int64) (uidValidity, lastSeenUID uint32) { + d.sql.QueryRow(`SELECT COALESCE(uid_validity,0), COALESCE(last_seen_uid,0) FROM folders WHERE id=?`, folderID). + Scan(&uidValidity, &lastSeenUID) + return +} + +// SetFolderSyncState persists uid_validity and last_seen_uid after a successful sync. +func (d *DB) SetFolderSyncState(folderID int64, uidValidity, lastSeenUID uint32) { + d.sql.Exec(`UPDATE folders SET uid_validity=?, last_seen_uid=? WHERE id=?`, uidValidity, lastSeenUID, folderID) +} + +// PurgeDeletedMessages removes local messages whose remote_uid is no longer +// in the server's UID list for a folder. Returns count purged. +func (d *DB) PurgeDeletedMessages(folderID int64, serverUIDs []uint32) (int, error) { + if len(serverUIDs) == 0 { + // Don't purge everything if server returned empty (connection issue) + return 0, nil + } + // Build placeholder list + args := make([]interface{}, len(serverUIDs)+1) + args[0] = folderID + placeholders := make([]string, len(serverUIDs)) + for i, uid := range serverUIDs { + args[i+1] = fmt.Sprintf("%d", uid) + placeholders[i] = "?" + } + q := fmt.Sprintf( + `DELETE FROM messages WHERE folder_id=? AND remote_uid NOT IN (%s)`, + strings.Join(placeholders, ","), + ) + res, err := d.sql.Exec(q, args...) + if err != nil { + return 0, err + } + n, _ := res.RowsAffected() + return int(n), nil +} + +// DeleteAllFolderMessages removes all messages from a folder (used on UIDVALIDITY change). +func (d *DB) DeleteAllFolderMessages(folderID int64) { + d.sql.Exec(`DELETE FROM messages WHERE folder_id=?`, folderID) +} + +// GetFolderMessageCount returns the local message count for a folder by account and path. +func (d *DB) GetFolderMessageCount(accountID int64, folderPath string) int { + var n int + d.sql.QueryRow(` + SELECT COUNT(*) FROM messages m + JOIN folders f ON f.id=m.folder_id + WHERE f.account_id=? AND f.full_path=?`, accountID, folderPath, + ).Scan(&n) + return n +} + +// ReconcileFlags updates is_read and is_starred from server flags, but ONLY for +// messages that do NOT have a pending local write op (to avoid overwriting in-flight changes). +func (d *DB) ReconcileFlags(folderID int64, serverFlags map[uint32][]string) { + // Get set of UIDs with pending ops so we don't overwrite them + rows, _ := d.sql.Query( + `SELECT DISTINCT remote_uid FROM pending_imap_ops po + JOIN folders f ON f.account_id=po.account_id + WHERE f.id=? AND (po.op_type='flag_read' OR po.op_type='flag_star')`, folderID, + ) + pendingUIDs := make(map[uint32]bool) + if rows != nil { + for rows.Next() { + var uid uint32 + rows.Scan(&uid) + pendingUIDs[uid] = true + } + rows.Close() + } + + for uid, flags := range serverFlags { + if pendingUIDs[uid] { + continue // don't reconcile — we have a pending write for this message + } + isRead := false + isStarred := false + for _, f := range flags { + switch f { + case `\Seen`: + isRead = true + case `\Flagged`: + isStarred = true + } + } + d.sql.Exec( + `UPDATE messages SET is_read=?, is_starred=? + WHERE folder_id=? AND remote_uid=?`, + boolToInt(isRead), boolToInt(isStarred), + folderID, fmt.Sprintf("%d", uid), + ) + } +} + +func boolToInt(b bool) int { + if b { + return 1 + } + return 0 +} diff --git a/internal/email/imap.go b/internal/email/imap.go index 177aecb..ca5c0c3 100644 --- a/internal/email/imap.go +++ b/internal/email/imap.go @@ -938,3 +938,123 @@ func partPathToInts(path string) []int { } return result } + +// ---- Delta sync helpers ---- + +// FolderStatus returns the current UIDVALIDITY, UIDNEXT, and message count +// for a mailbox without fetching any messages. +type FolderStatus struct { + UIDValidity uint32 + UIDNext uint32 + Messages uint32 +} + +func (c *Client) GetFolderStatus(mailboxName string) (*FolderStatus, error) { + mbox, err := c.imap.Select(mailboxName, true) + if err != nil { + return nil, fmt.Errorf("select %s: %w", mailboxName, err) + } + return &FolderStatus{ + UIDValidity: mbox.UidValidity, + UIDNext: mbox.UidNext, + Messages: mbox.Messages, + }, nil +} + +// ListAllUIDs returns all UIDs currently in the mailbox. Used for purge detection. +func (c *Client) ListAllUIDs(mailboxName string) ([]uint32, error) { + mbox, err := c.imap.Select(mailboxName, true) + if err != nil { + return nil, fmt.Errorf("select %s: %w", mailboxName, err) + } + if mbox.Messages == 0 { + return nil, nil + } + uids, err := c.imap.UidSearch(imap.NewSearchCriteria()) + if err != nil { + return nil, fmt.Errorf("uid search all: %w", err) + } + return uids, nil +} + +// FetchNewMessages fetches only messages with UID > afterUID (incremental). +func (c *Client) FetchNewMessages(mailboxName string, afterUID uint32) ([]*gomailModels.Message, error) { + mbox, err := c.imap.Select(mailboxName, true) + if err != nil { + return nil, fmt.Errorf("select %s: %w", mailboxName, err) + } + if mbox.Messages == 0 { + return nil, nil + } + + // SEARCH UID afterUID+1:* + seqSet := new(imap.SeqSet) + seqSet.AddRange(afterUID+1, ^uint32(0)) // afterUID+1 to * (max) + + items := []imap.FetchItem{ + imap.FetchUid, imap.FetchEnvelope, + imap.FetchFlags, imap.FetchBodyStructure, + imap.FetchRFC822, + } + + ch := make(chan *imap.Message, 64) + done := make(chan error, 1) + go func() { done <- c.imap.UidFetch(seqSet, items, ch) }() + + var results []*gomailModels.Message + for msg := range ch { + if msg.Uid <= afterUID { + continue // skip if server returns older (shouldn't happen) + } + m, err := parseIMAPMessage(msg, c.account) + if err != nil { + log.Printf("parse message uid=%d: %v", msg.Uid, err) + continue + } + results = append(results, m) + } + if err := <-done; err != nil { + // UID range with no results gives an error on some servers — treat as empty + if strings.Contains(err.Error(), "No matching messages") || + strings.Contains(err.Error(), "BADUID") || + strings.Contains(err.Error(), "UID range") { + return nil, nil + } + return results, fmt.Errorf("uid fetch new: %w", err) + } + return results, nil +} + +// SyncFlags fetches FLAGS for all messages in a mailbox efficiently. +// Returns map[uid]->flags for reconciliation with local state. +func (c *Client) SyncFlags(mailboxName string) (map[uint32][]string, error) { + mbox, err := c.imap.Select(mailboxName, true) + if err != nil { + return nil, fmt.Errorf("select %s: %w", mailboxName, err) + } + if mbox.Messages == 0 { + return map[uint32][]string{}, nil + } + + seqSet := new(imap.SeqSet) + seqSet.AddRange(1, mbox.Messages) + items := []imap.FetchItem{imap.FetchUid, imap.FetchFlags} + + ch := make(chan *imap.Message, 256) + done := make(chan error, 1) + go func() { done <- c.imap.Fetch(seqSet, items, ch) }() + + result := make(map[uint32][]string, mbox.Messages) + for msg := range ch { + result[msg.Uid] = msg.Flags + } + if err := <-done; err != nil { + return result, fmt.Errorf("fetch flags: %w", err) + } + return result, nil +} + +// SelectMailbox selects a mailbox and returns its status info. +func (c *Client) SelectMailbox(name string) (*imap.MailboxStatus, error) { + return c.imap.Select(name, true) +} diff --git a/internal/handlers/api.go b/internal/handlers/api.go index 40b0e52..ed483c2 100644 --- a/internal/handlers/api.go +++ b/internal/handlers/api.go @@ -540,19 +540,21 @@ func (h *APIHandler) MarkRead(w http.ResponseWriter, r *http.Request) { messageID := pathInt64(r, "id") var req struct{ Read bool `json:"read"` } json.NewDecoder(r.Body).Decode(&req) + + // Update local DB first h.db.MarkMessageRead(messageID, userID, req.Read) - go func() { - uid, folderPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID) - if err != nil || uid == 0 || account == nil { - return - } - c, err := email.Connect(context.Background(), account) - if err != nil { - return - } - defer c.Close() - _ = c.SetFlagByUID(folderPath, uid, `\Seen`, req.Read) - }() + + // Enqueue IMAP op — drained by background worker with retry + uid, folderPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID) + if err == nil && uid != 0 && account != nil { + val := "0" + if req.Read { val = "1" } + h.db.EnqueueIMAPOp(&db.PendingIMAPOp{ + AccountID: account.ID, OpType: "flag_read", + RemoteUID: uid, FolderPath: folderPath, Extra: val, + }) + h.syncer.TriggerAccountSync(account.ID) + } h.writeJSON(w, map[string]bool{"ok": true}) } @@ -564,18 +566,16 @@ func (h *APIHandler) ToggleStar(w http.ResponseWriter, r *http.Request) { h.writeError(w, http.StatusInternalServerError, "failed to toggle star") return } - go func() { - uid, folderPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID) - if err != nil || uid == 0 || account == nil { - return - } - c, err := email.Connect(context.Background(), account) - if err != nil { - return - } - defer c.Close() - _ = c.SetFlagByUID(folderPath, uid, `\Flagged`, starred) - }() + uid, folderPath, account, ierr := h.db.GetMessageIMAPInfo(messageID, userID) + if ierr == nil && uid != 0 && account != nil { + val := "0" + if starred { val = "1" } + h.db.EnqueueIMAPOp(&db.PendingIMAPOp{ + AccountID: account.ID, OpType: "flag_star", + RemoteUID: uid, FolderPath: folderPath, Extra: val, + }) + h.syncer.TriggerAccountSync(account.ID) + } h.writeJSON(w, map[string]bool{"ok": true, "starred": starred}) } @@ -588,33 +588,24 @@ func (h *APIHandler) MoveMessage(w http.ResponseWriter, r *http.Request) { return } - // IMAP move (best-effort, non-blocking) - go func() { - uid, srcPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID) - if err != nil || uid == 0 || account == nil { - log.Printf("IMAP move: GetMessageIMAPInfo msg=%d err=%v uid=%d", messageID, err, uid) - return - } - destFolder, err := h.db.GetFolderByID(req.FolderID) - if err != nil || destFolder == nil { - log.Printf("IMAP move: GetFolderByID folder=%d err=%v", req.FolderID, err) - return - } - c, err := email.Connect(context.Background(), account) - if err != nil { - log.Printf("IMAP move: Connect account=%d err=%v", account.ID, err) - return - } - defer c.Close() - if err := c.MoveByUID(srcPath, destFolder.FullPath, uid); err != nil { - log.Printf("IMAP move: MoveByUID uid=%d src=%s dst=%s err=%v", uid, srcPath, destFolder.FullPath, err) - } - }() + // Get IMAP info before changing folder_id in DB + uid, srcPath, account, imapErr := h.db.GetMessageIMAPInfo(messageID, userID) + destFolder, _ := h.db.GetFolderByID(req.FolderID) + // Update local DB if err := h.db.MoveMessage(messageID, userID, req.FolderID); err != nil { h.writeError(w, http.StatusInternalServerError, "move failed") return } + + // Enqueue IMAP move + if imapErr == nil && uid != 0 && account != nil && destFolder != nil { + h.db.EnqueueIMAPOp(&db.PendingIMAPOp{ + AccountID: account.ID, OpType: "move", + RemoteUID: uid, FolderPath: srcPath, Extra: destFolder.FullPath, + }) + h.syncer.TriggerAccountSync(account.ID) + } h.writeJSON(w, map[string]bool{"ok": true}) } @@ -622,36 +613,23 @@ func (h *APIHandler) DeleteMessage(w http.ResponseWriter, r *http.Request) { userID := middleware.GetUserID(r) messageID := pathInt64(r, "id") - // IMAP delete (best-effort, non-blocking) - go func() { - uid, folderPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID) - if err != nil || uid == 0 || account == nil { - log.Printf("IMAP delete: GetMessageIMAPInfo msg=%d err=%v uid=%d", messageID, err, uid) - return - } - c, err := email.Connect(context.Background(), account) - if err != nil { - log.Printf("IMAP delete: Connect account=%d err=%v", account.ID, err) - return - } - defer c.Close() - mailboxes, _ := c.ListMailboxes() - var trashName string - for _, mb := range mailboxes { - if email.InferFolderType(mb.Name, mb.Attributes) == "trash" { - trashName = mb.Name - break - } - } - if err := c.DeleteByUID(folderPath, uid, trashName); err != nil { - log.Printf("IMAP delete: DeleteByUID uid=%d folder=%s trash=%s err=%v", uid, folderPath, trashName, err) - } - }() + // Get IMAP info before deleting from DB + uid, folderPath, account, imapErr := h.db.GetMessageIMAPInfo(messageID, userID) + // Delete from local DB if err := h.db.DeleteMessage(messageID, userID); err != nil { h.writeError(w, http.StatusInternalServerError, "delete failed") return } + + // Enqueue IMAP delete + if imapErr == nil && uid != 0 && account != nil { + h.db.EnqueueIMAPOp(&db.PendingIMAPOp{ + AccountID: account.ID, OpType: "delete", + RemoteUID: uid, FolderPath: folderPath, + }) + h.syncer.TriggerAccountSync(account.ID) + } h.writeJSON(w, map[string]bool{"ok": true}) } diff --git a/internal/syncer/syncer.go b/internal/syncer/syncer.go index de91bb2..9e89010 100644 --- a/internal/syncer/syncer.go +++ b/internal/syncer/syncer.go @@ -1,10 +1,15 @@ // Package syncer provides background IMAP synchronisation for all active accounts. +// Architecture: +// - One goroutine per account runs IDLE on the INBOX to receive push notifications. +// - A separate drain goroutine flushes pending_imap_ops (delete/move/flag writes). +// - Periodic full-folder delta sync catches changes made by other clients. package syncer import ( "context" "fmt" "log" + "sync" "time" "github.com/yourusername/gomail/internal/db" @@ -12,68 +17,536 @@ import ( "github.com/yourusername/gomail/internal/models" ) -// Scheduler runs background sync for all active accounts according to their -// individual sync_interval settings. +// Scheduler coordinates all background sync activity. type Scheduler struct { db *db.DB stop chan struct{} + wg sync.WaitGroup + + // push channels: accountID -> channel to signal "something changed on server" + pushMu sync.Mutex + pushCh map[int64]chan struct{} } -// New creates a new Scheduler. Call Start() to begin background syncing. +// New creates a new Scheduler. func New(database *db.DB) *Scheduler { - return &Scheduler{db: database, stop: make(chan struct{})} + return &Scheduler{ + db: database, + stop: make(chan struct{}), + pushCh: make(map[int64]chan struct{}), + } } -// Start launches the scheduler goroutine. Ticks every minute and checks -// which accounts are due for sync based on last_sync and sync_interval. +// Start launches all background goroutines. func (s *Scheduler) Start() { + s.wg.Add(1) go func() { - log.Println("Background sync scheduler started") - s.runDue() - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - for { - select { - case <-ticker.C: - s.runDue() - case <-s.stop: - log.Println("Background sync scheduler stopped") - return - } - } + defer s.wg.Done() + s.mainLoop() }() + log.Println("[sync] scheduler started") } -// Stop signals the scheduler to exit. +// Stop signals all goroutines to exit and waits for them. func (s *Scheduler) Stop() { close(s.stop) + s.wg.Wait() + log.Println("[sync] scheduler stopped") } -func (s *Scheduler) runDue() { +// TriggerAccountSync signals an immediate sync for an account (called after IMAP write ops). +func (s *Scheduler) TriggerAccountSync(accountID int64) { + s.pushMu.Lock() + ch, ok := s.pushCh[accountID] + s.pushMu.Unlock() + if ok { + select { + case ch <- struct{}{}: + default: // already pending + } + } +} + +// ---- Main coordination loop ---- + +func (s *Scheduler) mainLoop() { + // Ticker for the outer "check which accounts are due" loop. + // Runs every 30s; individual accounts control their own interval. + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + // Track per-account goroutines so we only launch one per account. + type accountWorker struct { + stop chan struct{} + pushCh chan struct{} + } + workers := make(map[int64]*accountWorker) + + spawnWorker := func(account *models.EmailAccount) { + if _, exists := workers[account.ID]; exists { + return + } + w := &accountWorker{ + stop: make(chan struct{}), + pushCh: make(chan struct{}, 1), + } + workers[account.ID] = w + + s.pushMu.Lock() + s.pushCh[account.ID] = w.pushCh + s.pushMu.Unlock() + + s.wg.Add(1) + go func(acc *models.EmailAccount, w *accountWorker) { + defer s.wg.Done() + s.accountWorker(acc, w.stop, w.pushCh) + }(account, w) + } + + stopWorker := func(accountID int64) { + if w, ok := workers[accountID]; ok { + close(w.stop) + delete(workers, accountID) + s.pushMu.Lock() + delete(s.pushCh, accountID) + s.pushMu.Unlock() + } + } + + // Initial spawn + s.spawnForActive(spawnWorker) + + for { + select { + case <-s.stop: + for id := range workers { + stopWorker(id) + } + return + case <-ticker.C: + // Build active IDs map for reconciliation + activeIDs := make(map[int64]bool, len(workers)) + for id := range workers { + activeIDs[id] = true + } + s.reconcileWorkers(activeIDs, spawnWorker, stopWorker) + } + } +} + +func (s *Scheduler) spawnForActive(spawn func(*models.EmailAccount)) { accounts, err := s.db.ListAllActiveAccounts() if err != nil { - log.Printf("Sync scheduler: list accounts: %v", err) + log.Printf("[sync] list accounts: %v", err) return } - now := time.Now() - for _, account := range accounts { - if account.SyncInterval <= 0 { - continue + for _, acc := range accounts { + spawn(acc) + } +} + +func (s *Scheduler) reconcileWorkers( + activeIDs map[int64]bool, + spawn func(*models.EmailAccount), + stop func(int64), +) { + accounts, err := s.db.ListAllActiveAccounts() + if err != nil { + return + } + serverActive := make(map[int64]bool) + for _, acc := range accounts { + serverActive[acc.ID] = true + if !activeIDs[acc.ID] { + spawn(acc) } - nextSync := account.LastSync.Add(time.Duration(account.SyncInterval) * time.Minute) - if account.LastSync.IsZero() || now.After(nextSync) { - go s.syncAccount(account) + } + for id := range activeIDs { + if !serverActive[id] { + stop(id) } } } -// SyncAccountNow performs an immediate sync of one account. Returns messages synced. +// ---- Per-account worker ---- +// Each worker: +// 1. On startup: drain pending ops, then do a full delta sync. +// 2. Runs an IDLE loop on INBOX for push notifications. +// 3. Every syncInterval minutes (or on push signal): delta sync all enabled folders. +// 4. Every 2 minutes: drain pending ops (retries failed writes). + +func (s *Scheduler) accountWorker(account *models.EmailAccount, stop chan struct{}, push chan struct{}) { + log.Printf("[sync] worker started for %s", account.EmailAddress) + + // Fresh account data function (interval can change at runtime) + getAccount := func() *models.EmailAccount { + a, _ := s.db.GetAccount(account.ID) + if a == nil { + return account + } + return a + } + + // Initial sync on startup + s.drainPendingOps(account) + s.deltaSync(getAccount()) + + // Drain ticker: retry pending ops every 90 seconds + drainTicker := time.NewTicker(90 * time.Second) + defer drainTicker.Stop() + + // Full sync ticker: based on account sync_interval, check every 30s + syncTicker := time.NewTicker(30 * time.Second) + defer syncTicker.Stop() + + // IDLE watcher for INBOX push notifications + idleCh := make(chan struct{}, 1) + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.idleWatcher(account, stop, idleCh) + }() + + for { + select { + case <-stop: + log.Printf("[sync] worker stopped for %s", account.EmailAddress) + return + case <-drainTicker.C: + s.drainPendingOps(getAccount()) + case <-idleCh: + // Server signalled new mail/changes in INBOX — sync just INBOX + acc := getAccount() + s.syncInbox(acc) + case <-push: + // Local trigger (after write op) — drain ops then sync + acc := getAccount() + s.drainPendingOps(acc) + s.deltaSync(acc) + case <-syncTicker.C: + acc := getAccount() + if acc.SyncInterval <= 0 { + continue + } + nextSync := acc.LastSync.Add(time.Duration(acc.SyncInterval) * time.Minute) + if acc.LastSync.IsZero() || time.Now().After(nextSync) { + s.deltaSync(acc) + } + } + } +} + +// ---- IDLE watcher ---- +// Maintains a persistent IMAP connection to INBOX and issues IDLE. +// When EXISTS or EXPUNGE arrives, sends to idleCh. +func (s *Scheduler) idleWatcher(account *models.EmailAccount, stop chan struct{}, idleCh chan struct{}) { + const reconnectDelay = 30 * time.Second + const idleTimeout = 25 * time.Minute // RFC 2177 recommends < 29min + + signal := func() { + select { + case idleCh <- struct{}{}: + default: + } + } + + for { + select { + case <-stop: + return + default: + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + c, err := email.Connect(ctx, account) + cancel() + if err != nil { + log.Printf("[idle:%s] connect: %v — retry in %s", account.EmailAddress, err, reconnectDelay) + select { + case <-stop: + return + case <-time.After(reconnectDelay): + continue + } + } + + // Select INBOX + _, err = c.SelectMailbox("INBOX") + if err != nil { + c.Close() + select { + case <-stop: + return + case <-time.After(reconnectDelay): + continue + } + } + + // IDLE loop — go-imap v1 does not have built-in IDLE, we poll with short + // CHECK + NOOP and rely on the EXISTS response to wake us. + // We use a 1-minute poll since go-imap v1 doesn't expose IDLE directly. + pollTicker := time.NewTicker(60 * time.Second) + idleTimer := time.NewTimer(idleTimeout) + + pollLoop: + for { + select { + case <-stop: + pollTicker.Stop() + idleTimer.Stop() + c.Close() + return + case <-idleTimer.C: + // Reconnect to keep connection alive + pollTicker.Stop() + c.Close() + break pollLoop + case <-pollTicker.C: + // Poll server for changes + status, err := c.GetFolderStatus("INBOX") + if err != nil { + log.Printf("[idle:%s] status check: %v", account.EmailAddress, err) + pollTicker.Stop() + idleTimer.Stop() + c.Close() + break pollLoop + } + // Check if message count changed + localCount := s.db.GetFolderMessageCount(account.ID, "INBOX") + if status.Messages != uint32(localCount) { + signal() + } + } + } + + select { + case <-stop: + return + case <-time.After(2 * time.Second): + } + } +} + +// ---- Delta sync ---- +// For each enabled folder: +// 1. Check UIDVALIDITY — if changed, full re-sync (folder was recreated on server). +// 2. Fetch only new messages (UID > last_seen_uid). +// 3. Fetch FLAGS for all existing messages to catch read/star changes from other clients. +// 4. Fetch all server UIDs and purge locally deleted messages. + +func (s *Scheduler) deltaSync(account *models.EmailAccount) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + c, err := email.Connect(ctx, account) + if err != nil { + log.Printf("[sync:%s] connect: %v", account.EmailAddress, err) + s.db.SetAccountError(account.ID, err.Error()) + return + } + defer c.Close() + s.db.ClearAccountError(account.ID) + + mailboxes, err := c.ListMailboxes() + if err != nil { + log.Printf("[sync:%s] list mailboxes: %v", account.EmailAddress, err) + return + } + + totalNew := 0 + for _, mb := range mailboxes { + folderType := email.InferFolderType(mb.Name, mb.Attributes) + folder := &models.Folder{ + AccountID: account.ID, + Name: mb.Name, + FullPath: mb.Name, + FolderType: folderType, + } + if err := s.db.UpsertFolder(folder); err != nil { + continue + } + dbFolder, _ := s.db.GetFolderByPath(account.ID, mb.Name) + if dbFolder == nil || !dbFolder.SyncEnabled { + continue + } + + n, err := s.syncFolder(c, account, dbFolder) + if err != nil { + log.Printf("[sync:%s] folder %s: %v", account.EmailAddress, mb.Name, err) + continue + } + totalNew += n + } + + s.db.UpdateAccountLastSync(account.ID) + if totalNew > 0 { + log.Printf("[sync:%s] %d new messages", account.EmailAddress, totalNew) + } +} + +// syncInbox is a fast path that only syncs the INBOX folder. +func (s *Scheduler) syncInbox(account *models.EmailAccount) { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + c, err := email.Connect(ctx, account) + if err != nil { + return + } + defer c.Close() + + dbFolder, _ := s.db.GetFolderByPath(account.ID, "INBOX") + if dbFolder == nil { + return + } + n, err := s.syncFolder(c, account, dbFolder) + if err != nil { + log.Printf("[idle:%s] INBOX sync: %v", account.EmailAddress, err) + return + } + if n > 0 { + log.Printf("[idle:%s] %d new messages in INBOX", account.EmailAddress, n) + } +} + +func (s *Scheduler) syncFolder(c *email.Client, account *models.EmailAccount, dbFolder *models.Folder) (int, error) { + status, err := c.GetFolderStatus(dbFolder.FullPath) + if err != nil { + return 0, fmt.Errorf("status: %w", err) + } + + storedValidity, lastSeenUID := s.db.GetFolderSyncState(dbFolder.ID) + newMessages := 0 + + // UIDVALIDITY changed = folder was recreated on server; wipe local and re-fetch all + if storedValidity != 0 && status.UIDValidity != storedValidity { + log.Printf("[sync] UIDVALIDITY changed for %s/%s — full re-sync", account.EmailAddress, dbFolder.FullPath) + s.db.DeleteAllFolderMessages(dbFolder.ID) + lastSeenUID = 0 + } + + // 1. Fetch new messages (UID > lastSeenUID) + var msgs []*models.Message + if lastSeenUID == 0 { + // First sync: respect the account's days/all setting + days := account.SyncDays + if days <= 0 || account.SyncMode == "all" { + days = 0 + } + msgs, err = c.FetchMessages(dbFolder.FullPath, days) + } else { + msgs, err = c.FetchNewMessages(dbFolder.FullPath, lastSeenUID) + } + if err != nil { + return 0, fmt.Errorf("fetch new: %w", err) + } + + maxUID := lastSeenUID + for _, msg := range msgs { + msg.FolderID = dbFolder.ID + if err := s.db.UpsertMessage(msg); err == nil { + newMessages++ + } + uid := uint32(0) + fmt.Sscanf(msg.RemoteUID, "%d", &uid) + if uid > maxUID { + maxUID = uid + } + } + + // 2. Sync flags for ALL existing messages (catch read/star changes from other clients) + flags, err := c.SyncFlags(dbFolder.FullPath) + if err != nil { + log.Printf("[sync] flags %s/%s: %v", account.EmailAddress, dbFolder.FullPath, err) + } else if len(flags) > 0 { + s.db.ReconcileFlags(dbFolder.ID, flags) + } + + // 3. Fetch all server UIDs and purge messages deleted on server + serverUIDs, err := c.ListAllUIDs(dbFolder.FullPath) + if err != nil { + log.Printf("[sync] list uids %s/%s: %v", account.EmailAddress, dbFolder.FullPath, err) + } else { + purged, _ := s.db.PurgeDeletedMessages(dbFolder.ID, serverUIDs) + if purged > 0 { + log.Printf("[sync] purged %d server-deleted messages from %s/%s", purged, account.EmailAddress, dbFolder.FullPath) + } + } + + // Save sync state + s.db.SetFolderSyncState(dbFolder.ID, status.UIDValidity, maxUID) + s.db.UpdateFolderCounts(dbFolder.ID) + + return newMessages, nil +} + +// ---- Pending ops drain ---- +// Applies queued IMAP write operations (delete/move/flag) with retry logic. + +func (s *Scheduler) drainPendingOps(account *models.EmailAccount) { + ops, err := s.db.DequeuePendingOps(account.ID, 50) + if err != nil || len(ops) == 0 { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + + c, err := email.Connect(ctx, account) + if err != nil { + log.Printf("[ops:%s] connect for drain: %v", account.EmailAddress, err) + return + } + defer c.Close() + + // Find trash folder name once + trashName := "" + if mboxes, err := c.ListMailboxes(); err == nil { + for _, mb := range mboxes { + if email.InferFolderType(mb.Name, mb.Attributes) == "trash" { + trashName = mb.Name + break + } + } + } + + for _, op := range ops { + var applyErr error + switch op.OpType { + case "delete": + applyErr = c.DeleteByUID(op.FolderPath, op.RemoteUID, trashName) + case "move": + applyErr = c.MoveByUID(op.FolderPath, op.Extra, op.RemoteUID) + case "flag_read": + applyErr = c.SetFlagByUID(op.FolderPath, op.RemoteUID, `\Seen`, op.Extra == "1") + case "flag_star": + applyErr = c.SetFlagByUID(op.FolderPath, op.RemoteUID, `\Flagged`, op.Extra == "1") + } + + if applyErr != nil { + log.Printf("[ops:%s] %s uid=%d folder=%s: %v", account.EmailAddress, op.OpType, op.RemoteUID, op.FolderPath, applyErr) + s.db.IncrementPendingOpAttempts(op.ID) + } else { + s.db.DeletePendingOp(op.ID) + } + } + + if n := s.db.CountPendingOps(account.ID); n > 0 { + log.Printf("[ops:%s] %d ops still pending after drain", account.EmailAddress, n) + } +} + +// ---- Public API (called by HTTP handlers) ---- + +// SyncAccountNow performs an immediate delta sync of one account. func (s *Scheduler) SyncAccountNow(accountID int64) (int, error) { account, err := s.db.GetAccount(accountID) if err != nil || account == nil { return 0, fmt.Errorf("account %d not found", accountID) } - return s.doSync(account) + s.drainPendingOps(account) + s.deltaSync(account) + return 0, nil } // SyncFolderNow syncs a single folder for an account. @@ -86,6 +559,7 @@ func (s *Scheduler) SyncFolderNow(accountID, folderID int64) (int, error) { if err != nil || folder == nil || folder.AccountID != accountID { return 0, fmt.Errorf("folder %d not found", folderID) } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() c, err := email.Connect(ctx, account) @@ -93,101 +567,8 @@ func (s *Scheduler) SyncFolderNow(accountID, folderID int64) (int, error) { return 0, err } defer c.Close() - days := account.SyncDays - if days <= 0 || account.SyncMode == "all" { - days = 0 // 0 = fetch ALL via IMAP ALL criteria - } - messages, err := c.FetchMessages(folder.FullPath, days) - if err != nil { - return 0, err - } - synced := 0 - for _, msg := range messages { - msg.FolderID = folder.ID - if err := s.db.UpsertMessage(msg); err == nil { - synced++ - } - } - s.db.UpdateFolderCounts(folder.ID) - s.db.UpdateAccountLastSync(accountID) - return synced, nil + + return s.syncFolder(c, account, folder) } -func (s *Scheduler) syncAccount(account *models.EmailAccount) { - synced, err := s.doSync(account) - if err != nil { - log.Printf("Sync [%s]: %v", account.EmailAddress, err) - s.db.SetAccountError(account.ID, err.Error()) - s.db.WriteAudit(nil, models.AuditAppError, - "sync error for "+account.EmailAddress+": "+err.Error(), "", "") - return - } - s.db.ClearAccountError(account.ID) - if synced > 0 { - log.Printf("Synced %d messages for %s", synced, account.EmailAddress) - } -} -func (s *Scheduler) doSync(account *models.EmailAccount) (int, error) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) - defer cancel() - - c, err := email.Connect(ctx, account) - if err != nil { - return 0, err - } - defer c.Close() - - mailboxes, err := c.ListMailboxes() - if err != nil { - return 0, fmt.Errorf("list mailboxes: %w", err) - } - - synced := 0 - for _, mb := range mailboxes { - folderType := email.InferFolderType(mb.Name, mb.Attributes) - - folder := &models.Folder{ - AccountID: account.ID, - Name: mb.Name, - FullPath: mb.Name, - FolderType: folderType, - } - if err := s.db.UpsertFolder(folder); err != nil { - log.Printf("Upsert folder %s: %v", mb.Name, err) - continue - } - - dbFolder, _ := s.db.GetFolderByPath(account.ID, mb.Name) - if dbFolder == nil { - continue - } - - // Skip folders that the user has disabled sync on - if !dbFolder.SyncEnabled { - continue - } - - days := account.SyncDays - if days <= 0 || account.SyncMode == "all" { - days = 0 // 0 = fetch ALL via IMAP ALL criteria - } - messages, err := c.FetchMessages(mb.Name, days) - if err != nil { - log.Printf("Fetch %s/%s: %v", account.EmailAddress, mb.Name, err) - continue - } - - for _, msg := range messages { - msg.FolderID = dbFolder.ID - if err := s.db.UpsertMessage(msg); err == nil { - synced++ - } - } - - s.db.UpdateFolderCounts(dbFolder.ID) - } - - s.db.UpdateAccountLastSync(account.ID) - return synced, nil -}