// Package intake monitors an IMAP mailbox for new messages and dispatches // them to answer workers for processing. It supports two modes: IDLE for // servers with real-time push notifications, and poll for periodic checking. package intake import ( "context" "fmt" "log/slog" "slices" "time" "raven/internal/backoff" "raven/internal/filter" "raven/internal/imap" "raven/internal/tracker" goimap "github.com/emersion/go-imap/v2" "github.com/emersion/go-imap/v2/imapclient" ) // Config holds intake worker settings. type Config struct { // Mode selects IDLE or poll-based message retrieval. Mode Mode `yaml:"mode"` // PollInterval is the duration between checks in poll mode. // Ignored in IDLE mode. Examples: "5s", "1m", "24h". PollInterval string `yaml:"poll_interval"` } // Validate checks that the configuration values are sensible. func (c *Config) Validate() error { if !c.Mode.Valid() { return fmt.Errorf("invalid mode") } d, err := time.ParseDuration(c.PollInterval) if d == 0 || err != nil { return fmt.Errorf( "invalid poll_interval %q: %v", c.PollInterval, err, ) } return nil } // Mode represents the message retrieval strategy. type Mode string // Valid returns true if the mode is a recognized value. func (m Mode) Valid() bool { return slices.Contains([]Mode{ModeIdle, ModePoll}, m) } // String returns the mode as a string. func (m Mode) String() string { return string(m) } const ( // ModeIdle uses IMAP IDLE for real-time notifications. ModeIdle Mode = "idle" // ModePoll uses periodic polling at a configured interval. ModePoll Mode = "poll" ) // Worker monitors an IMAP mailbox and dispatches message UIDs to answer // workers. It maintains a persistent connection with automatic reconnection // and exponential backoff on errors. type Worker struct { // backoff controls retry delays after connection failures. backoff *backoff.Backoff // cfg holds the worker configuration. cfg Config // ic is the IMAP client for mailbox operations. ic *imap.Client // interval is the parsed poll interval duration. interval time.Duration // log is the worker's logger with worker context. log *slog.Logger // tracker prevents duplicate processing across workers. tracker *tracker.Tracker // update signals new messages during IDLE. update chan struct{} // work sends fetched message UIDs to answer workers. work chan<- goimap.UID } // NewWorker creates an intake Worker with the provided configuration. // The tracker coordinates with answer workers to prevent duplicate processing. // The work channel receives UIDs for messages that need responses. func NewWorker( cfg Config, filters filter.Filters, imapConfig imap.Config, log *slog.Logger, tracker *tracker.Tracker, work chan<- goimap.UID, ) (*Worker, error) { if err := cfg.Validate(); err != nil { return nil, fmt.Errorf("invalid config: %v", err) } if tracker == nil { return nil, fmt.Errorf("missing tracker") } if work == nil { return nil, fmt.Errorf("missing work channel") } b, err := backoff.New(backoff.Config{ Initial: time.Second, Max: time.Minute, }) if err != nil { return nil, fmt.Errorf("create backoff: %v", err) } ic, err := imap.NewClient(imapConfig, filters, log) if err != nil { return nil, fmt.Errorf("create imap client: %v", err) } var w = &Worker{ backoff: b, cfg: cfg, ic: ic, log: log.With(slog.String("worker", "intake")), tracker: tracker, work: work, } switch w.cfg.Mode { case ModeIdle: w.update = make(chan struct{}, 1) case ModePoll: d, err := time.ParseDuration(cfg.PollInterval) if err != nil { return nil, fmt.Errorf( "parse poll interval: %v", err, ) } w.interval = d } return w, nil } // Run starts the intake worker. It connects to the IMAP server and monitors // for new messages using either IDLE or poll mode based on configuration. // Handles reconnection automatically with exponential backoff on errors. // Returns nil when the context is canceled. func (w *Worker) Run(ctx context.Context) error { w.log.InfoContext(ctx, "running intake worker") defer w.log.InfoContext(ctx, "intake worker terminating") defer w.ic.Disconnect(ctx) // For IDLE mode, set up handler to signal new messages. var opts *imapclient.Options if w.cfg.Mode == ModeIdle { opts = &imapclient.Options{ UnilateralDataHandler: &imapclient.UnilateralDataHandler{ Mailbox: func(data *imapclient.UnilateralDataMailbox) { if data.NumMessages != nil { select { case w.update <- struct{}{}: default: } } }, }, } } for { // Context closed -- terminate. if err := ctx.Err(); err != nil { w.log.InfoContext( ctx, "context closed", slog.Any("reason", ctx.Err()), ) return nil } // Attempt to connect. if w.ic.Client() == nil { if err := w.ic.Connect( ctx, w.log, opts, ); err != nil { // Failed to connect; backoff and try again. w.log.ErrorContext( ctx, "failed to connect to IMAP server", slog.Any("error", err), ) wait := w.backoff.Next() w.log.InfoContext( ctx, "retrying after backoff", slog.Any("wait", wait), ) // Wait for backoff, unless context closed. select { case <-ctx.Done(): continue case <-time.After(wait): continue } } // Connected. // For IDLE mode, verify server capability. if w.cfg.Mode == ModeIdle && !w.ic.CanIdle() { return fmt.Errorf( "server lacks IDLE capability", ) } } // Retrieve messages with the appropriate mode. var err error switch w.cfg.Mode { case ModeIdle: err = w.idle(ctx) case ModePoll: err = w.poll(ctx) default: return fmt.Errorf("unrecognized mode: %q", w.cfg.Mode) } if err != nil { // Diagnose the error. // Context canceled: loop, log, return. if err := ctx.Err(); err != nil { continue } // Otherwise, error on intake or idle. w.log.ErrorContext( ctx, "IMAP error", slog.Any("error", err), ) wait := w.backoff.Next() w.log.InfoContext( ctx, "retrying after backoff", slog.Any("wait", wait), ) select { case <-ctx.Done(): continue case <-time.After(wait): } // Force reconnect on next loop. w.ic.Disconnect(ctx) } } } // poll runs the polling loop, checking for new messages at the configured // interval. The interval could be 5 seconds or 24 hours depending on use case. // Attempts to reuse the established connection, but the caller handles // reconnection if the server closes an idle connection. func (w *Worker) poll(ctx context.Context) error { for { // Refresh mailbox state before searching. if err := w.ic.Noop(); err != nil { return fmt.Errorf("noop: %v", err) } if err := w.intake(ctx); err != nil { return fmt.Errorf("intake: %v", err) } // Connection is healthy. Reset backoff. w.backoff.Reset() select { case <-ctx.Done(): return fmt.Errorf("context closed: %v", ctx.Err()) case <-time.After(w.interval): continue } } } // idle runs the IDLE loop, using IMAP IDLE for real-time message notifications. // After each intake cycle, issues an IDLE command and waits for the server // to signal new messages. Handles IDLE termination and reconnection. func (w *Worker) idle(ctx context.Context) error { for { if err := w.intake(ctx); err != nil { return fmt.Errorf("intake: %v", err) } // Connection is healthy. Reset backoff. w.backoff.Reset() // Issue the IDLE command. w.log.InfoContext(ctx, "entering IDLE mode") idleCmd, err := w.ic.Idle() if err != nil { return fmt.Errorf("idle: %v", err) } // Monitor the IDLE command. idleDone := make(chan error, 1) go func() { idleDone <- idleCmd.Wait() }() // Wait for: shutdown, connection death, or new message. select { case <-ctx.Done(): if err := idleCmd.Close(); err != nil { w.log.WarnContext( ctx, "failed to close IDLE command", slog.Any("error", err), ) } return fmt.Errorf("context closed: %v", ctx.Err()) case err := <-idleDone: // Connection died or IDLE ended unexpectedly. if err != nil { return fmt.Errorf("idle wait: %v", err) } // IDLE ended without error (server closed). w.log.InfoContext(ctx, "IDLE ended; refreshing") return nil case <-w.update: w.log.InfoContext(ctx, "IDLE: new message received") if err := idleCmd.Close(); err != nil { <-idleDone return fmt.Errorf("idle close: %v", err) } <-idleDone } } } // intake fetches unseen messages and sends their UIDs to the work channel. // Uses the tracker to skip messages already being processed by answer workers. // Returns early if the work queue is full, deferring remaining messages to // the next cycle. func (w *Worker) intake(ctx context.Context) error { w.log.InfoContext(ctx, "fetching unseen messages") uids, err := w.ic.Unseen(ctx) if err != nil { return fmt.Errorf("retrieve unseen messages: %v", err) } if len(uids) == 0 { w.log.InfoContext(ctx, "no new messages") return nil } w.log.InfoContext( ctx, "found unseen messages", slog.Int("count", len(uids)), ) for _, uid := range uids { if !w.tracker.TryAcquire(uid) { w.log.InfoContext( ctx, "skipping message; already acquired", slog.Any("uid", uid), ) continue } select { case <-ctx.Done(): w.tracker.Release(uid) w.log.InfoContext( ctx, "context closed", slog.Any("reason", ctx.Err()), ) return nil case w.work <- uid: w.log.InfoContext( ctx, "enqueued", slog.Any("uid", uid), ) default: // Queue full, release and defer to next cycle. w.tracker.Release(uid) w.log.InfoContext( ctx, "work queue full, deferring remaining", ) return nil } } return nil }