mirror of
https://github.com/ghostersk/gowebmail.git
synced 2026-04-17 08:36:01 +01:00
message deletion sync fixed
This commit is contained in:
@@ -174,11 +174,30 @@ func (d *DB) Migrate() error {
|
||||
`ALTER TABLE folders ADD COLUMN sync_enabled INTEGER NOT NULL DEFAULT 1`,
|
||||
// Plaintext search index column — stores decrypted subject+from+preview for LIKE search.
|
||||
`ALTER TABLE messages ADD COLUMN search_text TEXT NOT NULL DEFAULT ''`,
|
||||
// Per-folder IMAP sync state for incremental/delta sync.
|
||||
`ALTER TABLE folders ADD COLUMN uid_validity INTEGER NOT NULL DEFAULT 0`,
|
||||
`ALTER TABLE folders ADD COLUMN last_seen_uid INTEGER NOT NULL DEFAULT 0`,
|
||||
}
|
||||
for _, stmt := range alterStmts {
|
||||
d.sql.Exec(stmt) // ignore "duplicate column" errors intentionally
|
||||
}
|
||||
|
||||
// Pending IMAP operations queue — survives server restarts.
|
||||
// op_type: "delete" | "move" | "flag_read" | "flag_star"
|
||||
_, err := d.sql.Exec(`CREATE TABLE IF NOT EXISTS pending_imap_ops (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
account_id INTEGER NOT NULL REFERENCES email_accounts(id) ON DELETE CASCADE,
|
||||
op_type TEXT NOT NULL,
|
||||
remote_uid INTEGER NOT NULL,
|
||||
folder_path TEXT NOT NULL DEFAULT '',
|
||||
extra TEXT NOT NULL DEFAULT '',
|
||||
attempts INTEGER NOT NULL DEFAULT 0,
|
||||
created_at DATETIME DEFAULT (datetime('now'))
|
||||
)`)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create pending_imap_ops: %w", err)
|
||||
}
|
||||
|
||||
// Bootstrap admin account if no users exist
|
||||
return d.bootstrapAdmin()
|
||||
}
|
||||
@@ -1279,3 +1298,170 @@ func (d *DB) ListStarredMessages(userID int64, page, pageSize int) (*models.Page
|
||||
HasMore: offset+len(summaries) < total,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ---- Pending IMAP ops queue ----
|
||||
|
||||
// PendingIMAPOp represents an IMAP write operation that needs to be applied to the server.
|
||||
type PendingIMAPOp struct {
|
||||
ID int64
|
||||
AccountID int64
|
||||
OpType string // "delete" | "move" | "flag_read" | "flag_star"
|
||||
RemoteUID uint32
|
||||
FolderPath string
|
||||
Extra string // for move: dest folder path; for flag_*: "1" or "0"
|
||||
Attempts int
|
||||
}
|
||||
|
||||
// EnqueueIMAPOp adds an operation to the pending queue atomically.
|
||||
func (d *DB) EnqueueIMAPOp(op *PendingIMAPOp) error {
|
||||
_, err := d.sql.Exec(
|
||||
`INSERT INTO pending_imap_ops (account_id, op_type, remote_uid, folder_path, extra) VALUES (?,?,?,?,?)`,
|
||||
op.AccountID, op.OpType, op.RemoteUID, op.FolderPath, op.Extra,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
// DequeuePendingOps returns up to `limit` pending ops for a given account.
|
||||
func (d *DB) DequeuePendingOps(accountID int64, limit int) ([]*PendingIMAPOp, error) {
|
||||
rows, err := d.sql.Query(
|
||||
`SELECT id, account_id, op_type, remote_uid, folder_path, extra, attempts
|
||||
FROM pending_imap_ops WHERE account_id=? ORDER BY id ASC LIMIT ?`,
|
||||
accountID, limit,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var ops []*PendingIMAPOp
|
||||
for rows.Next() {
|
||||
op := &PendingIMAPOp{}
|
||||
rows.Scan(&op.ID, &op.AccountID, &op.OpType, &op.RemoteUID, &op.FolderPath, &op.Extra, &op.Attempts)
|
||||
ops = append(ops, op)
|
||||
}
|
||||
return ops, rows.Err()
|
||||
}
|
||||
|
||||
// DeletePendingOp removes a successfully applied op.
|
||||
func (d *DB) DeletePendingOp(id int64) error {
|
||||
_, err := d.sql.Exec(`DELETE FROM pending_imap_ops WHERE id=?`, id)
|
||||
return err
|
||||
}
|
||||
|
||||
// IncrementPendingOpAttempts bumps attempt count; ops with >5 attempts are abandoned.
|
||||
func (d *DB) IncrementPendingOpAttempts(id int64) {
|
||||
d.sql.Exec(`UPDATE pending_imap_ops SET attempts=attempts+1 WHERE id=?`, id)
|
||||
d.sql.Exec(`DELETE FROM pending_imap_ops WHERE id=? AND attempts>5`, id)
|
||||
}
|
||||
|
||||
// CountPendingOps returns number of queued ops for an account (for logging).
|
||||
func (d *DB) CountPendingOps(accountID int64) int {
|
||||
var n int
|
||||
d.sql.QueryRow(`SELECT COUNT(*) FROM pending_imap_ops WHERE account_id=?`, accountID).Scan(&n)
|
||||
return n
|
||||
}
|
||||
|
||||
// ---- Folder delta-sync state ----
|
||||
|
||||
// GetFolderSyncState returns uid_validity and last_seen_uid for incremental sync.
|
||||
func (d *DB) GetFolderSyncState(folderID int64) (uidValidity, lastSeenUID uint32) {
|
||||
d.sql.QueryRow(`SELECT COALESCE(uid_validity,0), COALESCE(last_seen_uid,0) FROM folders WHERE id=?`, folderID).
|
||||
Scan(&uidValidity, &lastSeenUID)
|
||||
return
|
||||
}
|
||||
|
||||
// SetFolderSyncState persists uid_validity and last_seen_uid after a successful sync.
|
||||
func (d *DB) SetFolderSyncState(folderID int64, uidValidity, lastSeenUID uint32) {
|
||||
d.sql.Exec(`UPDATE folders SET uid_validity=?, last_seen_uid=? WHERE id=?`, uidValidity, lastSeenUID, folderID)
|
||||
}
|
||||
|
||||
// PurgeDeletedMessages removes local messages whose remote_uid is no longer
|
||||
// in the server's UID list for a folder. Returns count purged.
|
||||
func (d *DB) PurgeDeletedMessages(folderID int64, serverUIDs []uint32) (int, error) {
|
||||
if len(serverUIDs) == 0 {
|
||||
// Don't purge everything if server returned empty (connection issue)
|
||||
return 0, nil
|
||||
}
|
||||
// Build placeholder list
|
||||
args := make([]interface{}, len(serverUIDs)+1)
|
||||
args[0] = folderID
|
||||
placeholders := make([]string, len(serverUIDs))
|
||||
for i, uid := range serverUIDs {
|
||||
args[i+1] = fmt.Sprintf("%d", uid)
|
||||
placeholders[i] = "?"
|
||||
}
|
||||
q := fmt.Sprintf(
|
||||
`DELETE FROM messages WHERE folder_id=? AND remote_uid NOT IN (%s)`,
|
||||
strings.Join(placeholders, ","),
|
||||
)
|
||||
res, err := d.sql.Exec(q, args...)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
n, _ := res.RowsAffected()
|
||||
return int(n), nil
|
||||
}
|
||||
|
||||
// DeleteAllFolderMessages removes all messages from a folder (used on UIDVALIDITY change).
|
||||
func (d *DB) DeleteAllFolderMessages(folderID int64) {
|
||||
d.sql.Exec(`DELETE FROM messages WHERE folder_id=?`, folderID)
|
||||
}
|
||||
|
||||
// GetFolderMessageCount returns the local message count for a folder by account and path.
|
||||
func (d *DB) GetFolderMessageCount(accountID int64, folderPath string) int {
|
||||
var n int
|
||||
d.sql.QueryRow(`
|
||||
SELECT COUNT(*) FROM messages m
|
||||
JOIN folders f ON f.id=m.folder_id
|
||||
WHERE f.account_id=? AND f.full_path=?`, accountID, folderPath,
|
||||
).Scan(&n)
|
||||
return n
|
||||
}
|
||||
|
||||
// ReconcileFlags updates is_read and is_starred from server flags, but ONLY for
|
||||
// messages that do NOT have a pending local write op (to avoid overwriting in-flight changes).
|
||||
func (d *DB) ReconcileFlags(folderID int64, serverFlags map[uint32][]string) {
|
||||
// Get set of UIDs with pending ops so we don't overwrite them
|
||||
rows, _ := d.sql.Query(
|
||||
`SELECT DISTINCT remote_uid FROM pending_imap_ops po
|
||||
JOIN folders f ON f.account_id=po.account_id
|
||||
WHERE f.id=? AND (po.op_type='flag_read' OR po.op_type='flag_star')`, folderID,
|
||||
)
|
||||
pendingUIDs := make(map[uint32]bool)
|
||||
if rows != nil {
|
||||
for rows.Next() {
|
||||
var uid uint32
|
||||
rows.Scan(&uid)
|
||||
pendingUIDs[uid] = true
|
||||
}
|
||||
rows.Close()
|
||||
}
|
||||
|
||||
for uid, flags := range serverFlags {
|
||||
if pendingUIDs[uid] {
|
||||
continue // don't reconcile — we have a pending write for this message
|
||||
}
|
||||
isRead := false
|
||||
isStarred := false
|
||||
for _, f := range flags {
|
||||
switch f {
|
||||
case `\Seen`:
|
||||
isRead = true
|
||||
case `\Flagged`:
|
||||
isStarred = true
|
||||
}
|
||||
}
|
||||
d.sql.Exec(
|
||||
`UPDATE messages SET is_read=?, is_starred=?
|
||||
WHERE folder_id=? AND remote_uid=?`,
|
||||
boolToInt(isRead), boolToInt(isStarred),
|
||||
folderID, fmt.Sprintf("%d", uid),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func boolToInt(b bool) int {
|
||||
if b {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
@@ -938,3 +938,123 @@ func partPathToInts(path string) []int {
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ---- Delta sync helpers ----
|
||||
|
||||
// FolderStatus returns the current UIDVALIDITY, UIDNEXT, and message count
|
||||
// for a mailbox without fetching any messages.
|
||||
type FolderStatus struct {
|
||||
UIDValidity uint32
|
||||
UIDNext uint32
|
||||
Messages uint32
|
||||
}
|
||||
|
||||
func (c *Client) GetFolderStatus(mailboxName string) (*FolderStatus, error) {
|
||||
mbox, err := c.imap.Select(mailboxName, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select %s: %w", mailboxName, err)
|
||||
}
|
||||
return &FolderStatus{
|
||||
UIDValidity: mbox.UidValidity,
|
||||
UIDNext: mbox.UidNext,
|
||||
Messages: mbox.Messages,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ListAllUIDs returns all UIDs currently in the mailbox. Used for purge detection.
|
||||
func (c *Client) ListAllUIDs(mailboxName string) ([]uint32, error) {
|
||||
mbox, err := c.imap.Select(mailboxName, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select %s: %w", mailboxName, err)
|
||||
}
|
||||
if mbox.Messages == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
uids, err := c.imap.UidSearch(imap.NewSearchCriteria())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("uid search all: %w", err)
|
||||
}
|
||||
return uids, nil
|
||||
}
|
||||
|
||||
// FetchNewMessages fetches only messages with UID > afterUID (incremental).
|
||||
func (c *Client) FetchNewMessages(mailboxName string, afterUID uint32) ([]*gomailModels.Message, error) {
|
||||
mbox, err := c.imap.Select(mailboxName, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select %s: %w", mailboxName, err)
|
||||
}
|
||||
if mbox.Messages == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// SEARCH UID afterUID+1:*
|
||||
seqSet := new(imap.SeqSet)
|
||||
seqSet.AddRange(afterUID+1, ^uint32(0)) // afterUID+1 to * (max)
|
||||
|
||||
items := []imap.FetchItem{
|
||||
imap.FetchUid, imap.FetchEnvelope,
|
||||
imap.FetchFlags, imap.FetchBodyStructure,
|
||||
imap.FetchRFC822,
|
||||
}
|
||||
|
||||
ch := make(chan *imap.Message, 64)
|
||||
done := make(chan error, 1)
|
||||
go func() { done <- c.imap.UidFetch(seqSet, items, ch) }()
|
||||
|
||||
var results []*gomailModels.Message
|
||||
for msg := range ch {
|
||||
if msg.Uid <= afterUID {
|
||||
continue // skip if server returns older (shouldn't happen)
|
||||
}
|
||||
m, err := parseIMAPMessage(msg, c.account)
|
||||
if err != nil {
|
||||
log.Printf("parse message uid=%d: %v", msg.Uid, err)
|
||||
continue
|
||||
}
|
||||
results = append(results, m)
|
||||
}
|
||||
if err := <-done; err != nil {
|
||||
// UID range with no results gives an error on some servers — treat as empty
|
||||
if strings.Contains(err.Error(), "No matching messages") ||
|
||||
strings.Contains(err.Error(), "BADUID") ||
|
||||
strings.Contains(err.Error(), "UID range") {
|
||||
return nil, nil
|
||||
}
|
||||
return results, fmt.Errorf("uid fetch new: %w", err)
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// SyncFlags fetches FLAGS for all messages in a mailbox efficiently.
|
||||
// Returns map[uid]->flags for reconciliation with local state.
|
||||
func (c *Client) SyncFlags(mailboxName string) (map[uint32][]string, error) {
|
||||
mbox, err := c.imap.Select(mailboxName, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("select %s: %w", mailboxName, err)
|
||||
}
|
||||
if mbox.Messages == 0 {
|
||||
return map[uint32][]string{}, nil
|
||||
}
|
||||
|
||||
seqSet := new(imap.SeqSet)
|
||||
seqSet.AddRange(1, mbox.Messages)
|
||||
items := []imap.FetchItem{imap.FetchUid, imap.FetchFlags}
|
||||
|
||||
ch := make(chan *imap.Message, 256)
|
||||
done := make(chan error, 1)
|
||||
go func() { done <- c.imap.Fetch(seqSet, items, ch) }()
|
||||
|
||||
result := make(map[uint32][]string, mbox.Messages)
|
||||
for msg := range ch {
|
||||
result[msg.Uid] = msg.Flags
|
||||
}
|
||||
if err := <-done; err != nil {
|
||||
return result, fmt.Errorf("fetch flags: %w", err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// SelectMailbox selects a mailbox and returns its status info.
|
||||
func (c *Client) SelectMailbox(name string) (*imap.MailboxStatus, error) {
|
||||
return c.imap.Select(name, true)
|
||||
}
|
||||
|
||||
@@ -540,19 +540,21 @@ func (h *APIHandler) MarkRead(w http.ResponseWriter, r *http.Request) {
|
||||
messageID := pathInt64(r, "id")
|
||||
var req struct{ Read bool `json:"read"` }
|
||||
json.NewDecoder(r.Body).Decode(&req)
|
||||
|
||||
// Update local DB first
|
||||
h.db.MarkMessageRead(messageID, userID, req.Read)
|
||||
go func() {
|
||||
uid, folderPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID)
|
||||
if err != nil || uid == 0 || account == nil {
|
||||
return
|
||||
}
|
||||
c, err := email.Connect(context.Background(), account)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
_ = c.SetFlagByUID(folderPath, uid, `\Seen`, req.Read)
|
||||
}()
|
||||
|
||||
// Enqueue IMAP op — drained by background worker with retry
|
||||
uid, folderPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID)
|
||||
if err == nil && uid != 0 && account != nil {
|
||||
val := "0"
|
||||
if req.Read { val = "1" }
|
||||
h.db.EnqueueIMAPOp(&db.PendingIMAPOp{
|
||||
AccountID: account.ID, OpType: "flag_read",
|
||||
RemoteUID: uid, FolderPath: folderPath, Extra: val,
|
||||
})
|
||||
h.syncer.TriggerAccountSync(account.ID)
|
||||
}
|
||||
h.writeJSON(w, map[string]bool{"ok": true})
|
||||
}
|
||||
|
||||
@@ -564,18 +566,16 @@ func (h *APIHandler) ToggleStar(w http.ResponseWriter, r *http.Request) {
|
||||
h.writeError(w, http.StatusInternalServerError, "failed to toggle star")
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
uid, folderPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID)
|
||||
if err != nil || uid == 0 || account == nil {
|
||||
return
|
||||
}
|
||||
c, err := email.Connect(context.Background(), account)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
_ = c.SetFlagByUID(folderPath, uid, `\Flagged`, starred)
|
||||
}()
|
||||
uid, folderPath, account, ierr := h.db.GetMessageIMAPInfo(messageID, userID)
|
||||
if ierr == nil && uid != 0 && account != nil {
|
||||
val := "0"
|
||||
if starred { val = "1" }
|
||||
h.db.EnqueueIMAPOp(&db.PendingIMAPOp{
|
||||
AccountID: account.ID, OpType: "flag_star",
|
||||
RemoteUID: uid, FolderPath: folderPath, Extra: val,
|
||||
})
|
||||
h.syncer.TriggerAccountSync(account.ID)
|
||||
}
|
||||
h.writeJSON(w, map[string]bool{"ok": true, "starred": starred})
|
||||
}
|
||||
|
||||
@@ -588,33 +588,24 @@ func (h *APIHandler) MoveMessage(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// IMAP move (best-effort, non-blocking)
|
||||
go func() {
|
||||
uid, srcPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID)
|
||||
if err != nil || uid == 0 || account == nil {
|
||||
log.Printf("IMAP move: GetMessageIMAPInfo msg=%d err=%v uid=%d", messageID, err, uid)
|
||||
return
|
||||
}
|
||||
destFolder, err := h.db.GetFolderByID(req.FolderID)
|
||||
if err != nil || destFolder == nil {
|
||||
log.Printf("IMAP move: GetFolderByID folder=%d err=%v", req.FolderID, err)
|
||||
return
|
||||
}
|
||||
c, err := email.Connect(context.Background(), account)
|
||||
if err != nil {
|
||||
log.Printf("IMAP move: Connect account=%d err=%v", account.ID, err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
if err := c.MoveByUID(srcPath, destFolder.FullPath, uid); err != nil {
|
||||
log.Printf("IMAP move: MoveByUID uid=%d src=%s dst=%s err=%v", uid, srcPath, destFolder.FullPath, err)
|
||||
}
|
||||
}()
|
||||
// Get IMAP info before changing folder_id in DB
|
||||
uid, srcPath, account, imapErr := h.db.GetMessageIMAPInfo(messageID, userID)
|
||||
destFolder, _ := h.db.GetFolderByID(req.FolderID)
|
||||
|
||||
// Update local DB
|
||||
if err := h.db.MoveMessage(messageID, userID, req.FolderID); err != nil {
|
||||
h.writeError(w, http.StatusInternalServerError, "move failed")
|
||||
return
|
||||
}
|
||||
|
||||
// Enqueue IMAP move
|
||||
if imapErr == nil && uid != 0 && account != nil && destFolder != nil {
|
||||
h.db.EnqueueIMAPOp(&db.PendingIMAPOp{
|
||||
AccountID: account.ID, OpType: "move",
|
||||
RemoteUID: uid, FolderPath: srcPath, Extra: destFolder.FullPath,
|
||||
})
|
||||
h.syncer.TriggerAccountSync(account.ID)
|
||||
}
|
||||
h.writeJSON(w, map[string]bool{"ok": true})
|
||||
}
|
||||
|
||||
@@ -622,36 +613,23 @@ func (h *APIHandler) DeleteMessage(w http.ResponseWriter, r *http.Request) {
|
||||
userID := middleware.GetUserID(r)
|
||||
messageID := pathInt64(r, "id")
|
||||
|
||||
// IMAP delete (best-effort, non-blocking)
|
||||
go func() {
|
||||
uid, folderPath, account, err := h.db.GetMessageIMAPInfo(messageID, userID)
|
||||
if err != nil || uid == 0 || account == nil {
|
||||
log.Printf("IMAP delete: GetMessageIMAPInfo msg=%d err=%v uid=%d", messageID, err, uid)
|
||||
return
|
||||
}
|
||||
c, err := email.Connect(context.Background(), account)
|
||||
if err != nil {
|
||||
log.Printf("IMAP delete: Connect account=%d err=%v", account.ID, err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
mailboxes, _ := c.ListMailboxes()
|
||||
var trashName string
|
||||
for _, mb := range mailboxes {
|
||||
if email.InferFolderType(mb.Name, mb.Attributes) == "trash" {
|
||||
trashName = mb.Name
|
||||
break
|
||||
}
|
||||
}
|
||||
if err := c.DeleteByUID(folderPath, uid, trashName); err != nil {
|
||||
log.Printf("IMAP delete: DeleteByUID uid=%d folder=%s trash=%s err=%v", uid, folderPath, trashName, err)
|
||||
}
|
||||
}()
|
||||
// Get IMAP info before deleting from DB
|
||||
uid, folderPath, account, imapErr := h.db.GetMessageIMAPInfo(messageID, userID)
|
||||
|
||||
// Delete from local DB
|
||||
if err := h.db.DeleteMessage(messageID, userID); err != nil {
|
||||
h.writeError(w, http.StatusInternalServerError, "delete failed")
|
||||
return
|
||||
}
|
||||
|
||||
// Enqueue IMAP delete
|
||||
if imapErr == nil && uid != 0 && account != nil {
|
||||
h.db.EnqueueIMAPOp(&db.PendingIMAPOp{
|
||||
AccountID: account.ID, OpType: "delete",
|
||||
RemoteUID: uid, FolderPath: folderPath,
|
||||
})
|
||||
h.syncer.TriggerAccountSync(account.ID)
|
||||
}
|
||||
h.writeJSON(w, map[string]bool{"ok": true})
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
// Package syncer provides background IMAP synchronisation for all active accounts.
|
||||
// Architecture:
|
||||
// - One goroutine per account runs IDLE on the INBOX to receive push notifications.
|
||||
// - A separate drain goroutine flushes pending_imap_ops (delete/move/flag writes).
|
||||
// - Periodic full-folder delta sync catches changes made by other clients.
|
||||
package syncer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/yourusername/gomail/internal/db"
|
||||
@@ -12,68 +17,536 @@ import (
|
||||
"github.com/yourusername/gomail/internal/models"
|
||||
)
|
||||
|
||||
// Scheduler runs background sync for all active accounts according to their
|
||||
// individual sync_interval settings.
|
||||
// Scheduler coordinates all background sync activity.
|
||||
type Scheduler struct {
|
||||
db *db.DB
|
||||
stop chan struct{}
|
||||
wg sync.WaitGroup
|
||||
|
||||
// push channels: accountID -> channel to signal "something changed on server"
|
||||
pushMu sync.Mutex
|
||||
pushCh map[int64]chan struct{}
|
||||
}
|
||||
|
||||
// New creates a new Scheduler. Call Start() to begin background syncing.
|
||||
// New creates a new Scheduler.
|
||||
func New(database *db.DB) *Scheduler {
|
||||
return &Scheduler{db: database, stop: make(chan struct{})}
|
||||
return &Scheduler{
|
||||
db: database,
|
||||
stop: make(chan struct{}),
|
||||
pushCh: make(map[int64]chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Start launches the scheduler goroutine. Ticks every minute and checks
|
||||
// which accounts are due for sync based on last_sync and sync_interval.
|
||||
// Start launches all background goroutines.
|
||||
func (s *Scheduler) Start() {
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
log.Println("Background sync scheduler started")
|
||||
s.runDue()
|
||||
ticker := time.NewTicker(1 * time.Minute)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.runDue()
|
||||
case <-s.stop:
|
||||
log.Println("Background sync scheduler stopped")
|
||||
return
|
||||
}
|
||||
}
|
||||
defer s.wg.Done()
|
||||
s.mainLoop()
|
||||
}()
|
||||
log.Println("[sync] scheduler started")
|
||||
}
|
||||
|
||||
// Stop signals the scheduler to exit.
|
||||
// Stop signals all goroutines to exit and waits for them.
|
||||
func (s *Scheduler) Stop() {
|
||||
close(s.stop)
|
||||
s.wg.Wait()
|
||||
log.Println("[sync] scheduler stopped")
|
||||
}
|
||||
|
||||
func (s *Scheduler) runDue() {
|
||||
// TriggerAccountSync signals an immediate sync for an account (called after IMAP write ops).
|
||||
func (s *Scheduler) TriggerAccountSync(accountID int64) {
|
||||
s.pushMu.Lock()
|
||||
ch, ok := s.pushCh[accountID]
|
||||
s.pushMu.Unlock()
|
||||
if ok {
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
default: // already pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Main coordination loop ----
|
||||
|
||||
func (s *Scheduler) mainLoop() {
|
||||
// Ticker for the outer "check which accounts are due" loop.
|
||||
// Runs every 30s; individual accounts control their own interval.
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Track per-account goroutines so we only launch one per account.
|
||||
type accountWorker struct {
|
||||
stop chan struct{}
|
||||
pushCh chan struct{}
|
||||
}
|
||||
workers := make(map[int64]*accountWorker)
|
||||
|
||||
spawnWorker := func(account *models.EmailAccount) {
|
||||
if _, exists := workers[account.ID]; exists {
|
||||
return
|
||||
}
|
||||
w := &accountWorker{
|
||||
stop: make(chan struct{}),
|
||||
pushCh: make(chan struct{}, 1),
|
||||
}
|
||||
workers[account.ID] = w
|
||||
|
||||
s.pushMu.Lock()
|
||||
s.pushCh[account.ID] = w.pushCh
|
||||
s.pushMu.Unlock()
|
||||
|
||||
s.wg.Add(1)
|
||||
go func(acc *models.EmailAccount, w *accountWorker) {
|
||||
defer s.wg.Done()
|
||||
s.accountWorker(acc, w.stop, w.pushCh)
|
||||
}(account, w)
|
||||
}
|
||||
|
||||
stopWorker := func(accountID int64) {
|
||||
if w, ok := workers[accountID]; ok {
|
||||
close(w.stop)
|
||||
delete(workers, accountID)
|
||||
s.pushMu.Lock()
|
||||
delete(s.pushCh, accountID)
|
||||
s.pushMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// Initial spawn
|
||||
s.spawnForActive(spawnWorker)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.stop:
|
||||
for id := range workers {
|
||||
stopWorker(id)
|
||||
}
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Build active IDs map for reconciliation
|
||||
activeIDs := make(map[int64]bool, len(workers))
|
||||
for id := range workers {
|
||||
activeIDs[id] = true
|
||||
}
|
||||
s.reconcileWorkers(activeIDs, spawnWorker, stopWorker)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) spawnForActive(spawn func(*models.EmailAccount)) {
|
||||
accounts, err := s.db.ListAllActiveAccounts()
|
||||
if err != nil {
|
||||
log.Printf("Sync scheduler: list accounts: %v", err)
|
||||
log.Printf("[sync] list accounts: %v", err)
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
for _, account := range accounts {
|
||||
if account.SyncInterval <= 0 {
|
||||
continue
|
||||
for _, acc := range accounts {
|
||||
spawn(acc)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) reconcileWorkers(
|
||||
activeIDs map[int64]bool,
|
||||
spawn func(*models.EmailAccount),
|
||||
stop func(int64),
|
||||
) {
|
||||
accounts, err := s.db.ListAllActiveAccounts()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
serverActive := make(map[int64]bool)
|
||||
for _, acc := range accounts {
|
||||
serverActive[acc.ID] = true
|
||||
if !activeIDs[acc.ID] {
|
||||
spawn(acc)
|
||||
}
|
||||
nextSync := account.LastSync.Add(time.Duration(account.SyncInterval) * time.Minute)
|
||||
if account.LastSync.IsZero() || now.After(nextSync) {
|
||||
go s.syncAccount(account)
|
||||
}
|
||||
for id := range activeIDs {
|
||||
if !serverActive[id] {
|
||||
stop(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SyncAccountNow performs an immediate sync of one account. Returns messages synced.
|
||||
// ---- Per-account worker ----
|
||||
// Each worker:
|
||||
// 1. On startup: drain pending ops, then do a full delta sync.
|
||||
// 2. Runs an IDLE loop on INBOX for push notifications.
|
||||
// 3. Every syncInterval minutes (or on push signal): delta sync all enabled folders.
|
||||
// 4. Every 2 minutes: drain pending ops (retries failed writes).
|
||||
|
||||
func (s *Scheduler) accountWorker(account *models.EmailAccount, stop chan struct{}, push chan struct{}) {
|
||||
log.Printf("[sync] worker started for %s", account.EmailAddress)
|
||||
|
||||
// Fresh account data function (interval can change at runtime)
|
||||
getAccount := func() *models.EmailAccount {
|
||||
a, _ := s.db.GetAccount(account.ID)
|
||||
if a == nil {
|
||||
return account
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Initial sync on startup
|
||||
s.drainPendingOps(account)
|
||||
s.deltaSync(getAccount())
|
||||
|
||||
// Drain ticker: retry pending ops every 90 seconds
|
||||
drainTicker := time.NewTicker(90 * time.Second)
|
||||
defer drainTicker.Stop()
|
||||
|
||||
// Full sync ticker: based on account sync_interval, check every 30s
|
||||
syncTicker := time.NewTicker(30 * time.Second)
|
||||
defer syncTicker.Stop()
|
||||
|
||||
// IDLE watcher for INBOX push notifications
|
||||
idleCh := make(chan struct{}, 1)
|
||||
s.wg.Add(1)
|
||||
go func() {
|
||||
defer s.wg.Done()
|
||||
s.idleWatcher(account, stop, idleCh)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
log.Printf("[sync] worker stopped for %s", account.EmailAddress)
|
||||
return
|
||||
case <-drainTicker.C:
|
||||
s.drainPendingOps(getAccount())
|
||||
case <-idleCh:
|
||||
// Server signalled new mail/changes in INBOX — sync just INBOX
|
||||
acc := getAccount()
|
||||
s.syncInbox(acc)
|
||||
case <-push:
|
||||
// Local trigger (after write op) — drain ops then sync
|
||||
acc := getAccount()
|
||||
s.drainPendingOps(acc)
|
||||
s.deltaSync(acc)
|
||||
case <-syncTicker.C:
|
||||
acc := getAccount()
|
||||
if acc.SyncInterval <= 0 {
|
||||
continue
|
||||
}
|
||||
nextSync := acc.LastSync.Add(time.Duration(acc.SyncInterval) * time.Minute)
|
||||
if acc.LastSync.IsZero() || time.Now().After(nextSync) {
|
||||
s.deltaSync(acc)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---- IDLE watcher ----
|
||||
// Maintains a persistent IMAP connection to INBOX and issues IDLE.
|
||||
// When EXISTS or EXPUNGE arrives, sends to idleCh.
|
||||
func (s *Scheduler) idleWatcher(account *models.EmailAccount, stop chan struct{}, idleCh chan struct{}) {
|
||||
const reconnectDelay = 30 * time.Second
|
||||
const idleTimeout = 25 * time.Minute // RFC 2177 recommends < 29min
|
||||
|
||||
signal := func() {
|
||||
select {
|
||||
case idleCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
c, err := email.Connect(ctx, account)
|
||||
cancel()
|
||||
if err != nil {
|
||||
log.Printf("[idle:%s] connect: %v — retry in %s", account.EmailAddress, err, reconnectDelay)
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-time.After(reconnectDelay):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// Select INBOX
|
||||
_, err = c.SelectMailbox("INBOX")
|
||||
if err != nil {
|
||||
c.Close()
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-time.After(reconnectDelay):
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// IDLE loop — go-imap v1 does not have built-in IDLE, we poll with short
|
||||
// CHECK + NOOP and rely on the EXISTS response to wake us.
|
||||
// We use a 1-minute poll since go-imap v1 doesn't expose IDLE directly.
|
||||
pollTicker := time.NewTicker(60 * time.Second)
|
||||
idleTimer := time.NewTimer(idleTimeout)
|
||||
|
||||
pollLoop:
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
pollTicker.Stop()
|
||||
idleTimer.Stop()
|
||||
c.Close()
|
||||
return
|
||||
case <-idleTimer.C:
|
||||
// Reconnect to keep connection alive
|
||||
pollTicker.Stop()
|
||||
c.Close()
|
||||
break pollLoop
|
||||
case <-pollTicker.C:
|
||||
// Poll server for changes
|
||||
status, err := c.GetFolderStatus("INBOX")
|
||||
if err != nil {
|
||||
log.Printf("[idle:%s] status check: %v", account.EmailAddress, err)
|
||||
pollTicker.Stop()
|
||||
idleTimer.Stop()
|
||||
c.Close()
|
||||
break pollLoop
|
||||
}
|
||||
// Check if message count changed
|
||||
localCount := s.db.GetFolderMessageCount(account.ID, "INBOX")
|
||||
if status.Messages != uint32(localCount) {
|
||||
signal()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-time.After(2 * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Delta sync ----
|
||||
// For each enabled folder:
|
||||
// 1. Check UIDVALIDITY — if changed, full re-sync (folder was recreated on server).
|
||||
// 2. Fetch only new messages (UID > last_seen_uid).
|
||||
// 3. Fetch FLAGS for all existing messages to catch read/star changes from other clients.
|
||||
// 4. Fetch all server UIDs and purge locally deleted messages.
|
||||
|
||||
func (s *Scheduler) deltaSync(account *models.EmailAccount) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
c, err := email.Connect(ctx, account)
|
||||
if err != nil {
|
||||
log.Printf("[sync:%s] connect: %v", account.EmailAddress, err)
|
||||
s.db.SetAccountError(account.ID, err.Error())
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
s.db.ClearAccountError(account.ID)
|
||||
|
||||
mailboxes, err := c.ListMailboxes()
|
||||
if err != nil {
|
||||
log.Printf("[sync:%s] list mailboxes: %v", account.EmailAddress, err)
|
||||
return
|
||||
}
|
||||
|
||||
totalNew := 0
|
||||
for _, mb := range mailboxes {
|
||||
folderType := email.InferFolderType(mb.Name, mb.Attributes)
|
||||
folder := &models.Folder{
|
||||
AccountID: account.ID,
|
||||
Name: mb.Name,
|
||||
FullPath: mb.Name,
|
||||
FolderType: folderType,
|
||||
}
|
||||
if err := s.db.UpsertFolder(folder); err != nil {
|
||||
continue
|
||||
}
|
||||
dbFolder, _ := s.db.GetFolderByPath(account.ID, mb.Name)
|
||||
if dbFolder == nil || !dbFolder.SyncEnabled {
|
||||
continue
|
||||
}
|
||||
|
||||
n, err := s.syncFolder(c, account, dbFolder)
|
||||
if err != nil {
|
||||
log.Printf("[sync:%s] folder %s: %v", account.EmailAddress, mb.Name, err)
|
||||
continue
|
||||
}
|
||||
totalNew += n
|
||||
}
|
||||
|
||||
s.db.UpdateAccountLastSync(account.ID)
|
||||
if totalNew > 0 {
|
||||
log.Printf("[sync:%s] %d new messages", account.EmailAddress, totalNew)
|
||||
}
|
||||
}
|
||||
|
||||
// syncInbox is a fast path that only syncs the INBOX folder.
|
||||
func (s *Scheduler) syncInbox(account *models.EmailAccount) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
c, err := email.Connect(ctx, account)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
dbFolder, _ := s.db.GetFolderByPath(account.ID, "INBOX")
|
||||
if dbFolder == nil {
|
||||
return
|
||||
}
|
||||
n, err := s.syncFolder(c, account, dbFolder)
|
||||
if err != nil {
|
||||
log.Printf("[idle:%s] INBOX sync: %v", account.EmailAddress, err)
|
||||
return
|
||||
}
|
||||
if n > 0 {
|
||||
log.Printf("[idle:%s] %d new messages in INBOX", account.EmailAddress, n)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) syncFolder(c *email.Client, account *models.EmailAccount, dbFolder *models.Folder) (int, error) {
|
||||
status, err := c.GetFolderStatus(dbFolder.FullPath)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("status: %w", err)
|
||||
}
|
||||
|
||||
storedValidity, lastSeenUID := s.db.GetFolderSyncState(dbFolder.ID)
|
||||
newMessages := 0
|
||||
|
||||
// UIDVALIDITY changed = folder was recreated on server; wipe local and re-fetch all
|
||||
if storedValidity != 0 && status.UIDValidity != storedValidity {
|
||||
log.Printf("[sync] UIDVALIDITY changed for %s/%s — full re-sync", account.EmailAddress, dbFolder.FullPath)
|
||||
s.db.DeleteAllFolderMessages(dbFolder.ID)
|
||||
lastSeenUID = 0
|
||||
}
|
||||
|
||||
// 1. Fetch new messages (UID > lastSeenUID)
|
||||
var msgs []*models.Message
|
||||
if lastSeenUID == 0 {
|
||||
// First sync: respect the account's days/all setting
|
||||
days := account.SyncDays
|
||||
if days <= 0 || account.SyncMode == "all" {
|
||||
days = 0
|
||||
}
|
||||
msgs, err = c.FetchMessages(dbFolder.FullPath, days)
|
||||
} else {
|
||||
msgs, err = c.FetchNewMessages(dbFolder.FullPath, lastSeenUID)
|
||||
}
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("fetch new: %w", err)
|
||||
}
|
||||
|
||||
maxUID := lastSeenUID
|
||||
for _, msg := range msgs {
|
||||
msg.FolderID = dbFolder.ID
|
||||
if err := s.db.UpsertMessage(msg); err == nil {
|
||||
newMessages++
|
||||
}
|
||||
uid := uint32(0)
|
||||
fmt.Sscanf(msg.RemoteUID, "%d", &uid)
|
||||
if uid > maxUID {
|
||||
maxUID = uid
|
||||
}
|
||||
}
|
||||
|
||||
// 2. Sync flags for ALL existing messages (catch read/star changes from other clients)
|
||||
flags, err := c.SyncFlags(dbFolder.FullPath)
|
||||
if err != nil {
|
||||
log.Printf("[sync] flags %s/%s: %v", account.EmailAddress, dbFolder.FullPath, err)
|
||||
} else if len(flags) > 0 {
|
||||
s.db.ReconcileFlags(dbFolder.ID, flags)
|
||||
}
|
||||
|
||||
// 3. Fetch all server UIDs and purge messages deleted on server
|
||||
serverUIDs, err := c.ListAllUIDs(dbFolder.FullPath)
|
||||
if err != nil {
|
||||
log.Printf("[sync] list uids %s/%s: %v", account.EmailAddress, dbFolder.FullPath, err)
|
||||
} else {
|
||||
purged, _ := s.db.PurgeDeletedMessages(dbFolder.ID, serverUIDs)
|
||||
if purged > 0 {
|
||||
log.Printf("[sync] purged %d server-deleted messages from %s/%s", purged, account.EmailAddress, dbFolder.FullPath)
|
||||
}
|
||||
}
|
||||
|
||||
// Save sync state
|
||||
s.db.SetFolderSyncState(dbFolder.ID, status.UIDValidity, maxUID)
|
||||
s.db.UpdateFolderCounts(dbFolder.ID)
|
||||
|
||||
return newMessages, nil
|
||||
}
|
||||
|
||||
// ---- Pending ops drain ----
|
||||
// Applies queued IMAP write operations (delete/move/flag) with retry logic.
|
||||
|
||||
func (s *Scheduler) drainPendingOps(account *models.EmailAccount) {
|
||||
ops, err := s.db.DequeuePendingOps(account.ID, 50)
|
||||
if err != nil || len(ops) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
c, err := email.Connect(ctx, account)
|
||||
if err != nil {
|
||||
log.Printf("[ops:%s] connect for drain: %v", account.EmailAddress, err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
// Find trash folder name once
|
||||
trashName := ""
|
||||
if mboxes, err := c.ListMailboxes(); err == nil {
|
||||
for _, mb := range mboxes {
|
||||
if email.InferFolderType(mb.Name, mb.Attributes) == "trash" {
|
||||
trashName = mb.Name
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, op := range ops {
|
||||
var applyErr error
|
||||
switch op.OpType {
|
||||
case "delete":
|
||||
applyErr = c.DeleteByUID(op.FolderPath, op.RemoteUID, trashName)
|
||||
case "move":
|
||||
applyErr = c.MoveByUID(op.FolderPath, op.Extra, op.RemoteUID)
|
||||
case "flag_read":
|
||||
applyErr = c.SetFlagByUID(op.FolderPath, op.RemoteUID, `\Seen`, op.Extra == "1")
|
||||
case "flag_star":
|
||||
applyErr = c.SetFlagByUID(op.FolderPath, op.RemoteUID, `\Flagged`, op.Extra == "1")
|
||||
}
|
||||
|
||||
if applyErr != nil {
|
||||
log.Printf("[ops:%s] %s uid=%d folder=%s: %v", account.EmailAddress, op.OpType, op.RemoteUID, op.FolderPath, applyErr)
|
||||
s.db.IncrementPendingOpAttempts(op.ID)
|
||||
} else {
|
||||
s.db.DeletePendingOp(op.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if n := s.db.CountPendingOps(account.ID); n > 0 {
|
||||
log.Printf("[ops:%s] %d ops still pending after drain", account.EmailAddress, n)
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Public API (called by HTTP handlers) ----
|
||||
|
||||
// SyncAccountNow performs an immediate delta sync of one account.
|
||||
func (s *Scheduler) SyncAccountNow(accountID int64) (int, error) {
|
||||
account, err := s.db.GetAccount(accountID)
|
||||
if err != nil || account == nil {
|
||||
return 0, fmt.Errorf("account %d not found", accountID)
|
||||
}
|
||||
return s.doSync(account)
|
||||
s.drainPendingOps(account)
|
||||
s.deltaSync(account)
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// SyncFolderNow syncs a single folder for an account.
|
||||
@@ -86,6 +559,7 @@ func (s *Scheduler) SyncFolderNow(accountID, folderID int64) (int, error) {
|
||||
if err != nil || folder == nil || folder.AccountID != accountID {
|
||||
return 0, fmt.Errorf("folder %d not found", folderID)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
c, err := email.Connect(ctx, account)
|
||||
@@ -93,101 +567,8 @@ func (s *Scheduler) SyncFolderNow(accountID, folderID int64) (int, error) {
|
||||
return 0, err
|
||||
}
|
||||
defer c.Close()
|
||||
days := account.SyncDays
|
||||
if days <= 0 || account.SyncMode == "all" {
|
||||
days = 0 // 0 = fetch ALL via IMAP ALL criteria
|
||||
}
|
||||
messages, err := c.FetchMessages(folder.FullPath, days)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
synced := 0
|
||||
for _, msg := range messages {
|
||||
msg.FolderID = folder.ID
|
||||
if err := s.db.UpsertMessage(msg); err == nil {
|
||||
synced++
|
||||
}
|
||||
}
|
||||
s.db.UpdateFolderCounts(folder.ID)
|
||||
s.db.UpdateAccountLastSync(accountID)
|
||||
return synced, nil
|
||||
|
||||
return s.syncFolder(c, account, folder)
|
||||
}
|
||||
|
||||
func (s *Scheduler) syncAccount(account *models.EmailAccount) {
|
||||
synced, err := s.doSync(account)
|
||||
if err != nil {
|
||||
log.Printf("Sync [%s]: %v", account.EmailAddress, err)
|
||||
s.db.SetAccountError(account.ID, err.Error())
|
||||
s.db.WriteAudit(nil, models.AuditAppError,
|
||||
"sync error for "+account.EmailAddress+": "+err.Error(), "", "")
|
||||
return
|
||||
}
|
||||
s.db.ClearAccountError(account.ID)
|
||||
if synced > 0 {
|
||||
log.Printf("Synced %d messages for %s", synced, account.EmailAddress)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Scheduler) doSync(account *models.EmailAccount) (int, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
c, err := email.Connect(ctx, account)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
mailboxes, err := c.ListMailboxes()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("list mailboxes: %w", err)
|
||||
}
|
||||
|
||||
synced := 0
|
||||
for _, mb := range mailboxes {
|
||||
folderType := email.InferFolderType(mb.Name, mb.Attributes)
|
||||
|
||||
folder := &models.Folder{
|
||||
AccountID: account.ID,
|
||||
Name: mb.Name,
|
||||
FullPath: mb.Name,
|
||||
FolderType: folderType,
|
||||
}
|
||||
if err := s.db.UpsertFolder(folder); err != nil {
|
||||
log.Printf("Upsert folder %s: %v", mb.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
dbFolder, _ := s.db.GetFolderByPath(account.ID, mb.Name)
|
||||
if dbFolder == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip folders that the user has disabled sync on
|
||||
if !dbFolder.SyncEnabled {
|
||||
continue
|
||||
}
|
||||
|
||||
days := account.SyncDays
|
||||
if days <= 0 || account.SyncMode == "all" {
|
||||
days = 0 // 0 = fetch ALL via IMAP ALL criteria
|
||||
}
|
||||
messages, err := c.FetchMessages(mb.Name, days)
|
||||
if err != nil {
|
||||
log.Printf("Fetch %s/%s: %v", account.EmailAddress, mb.Name, err)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, msg := range messages {
|
||||
msg.FolderID = dbFolder.ID
|
||||
if err := s.db.UpsertMessage(msg); err == nil {
|
||||
synced++
|
||||
}
|
||||
}
|
||||
|
||||
s.db.UpdateFolderCounts(dbFolder.ID)
|
||||
}
|
||||
|
||||
s.db.UpdateAccountLastSync(account.ID)
|
||||
return synced, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user