2026-01-04 20:59:26 +00:00
|
|
|
// Package intake monitors an IMAP mailbox for new messages and dispatches
|
|
|
|
|
// them to answer workers for processing. It supports two modes: IDLE for
|
|
|
|
|
// servers with real-time push notifications, and poll for periodic checking.
|
|
|
|
|
package intake
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"log/slog"
|
|
|
|
|
"slices"
|
|
|
|
|
"time"
|
|
|
|
|
|
2026-01-05 01:29:32 +00:00
|
|
|
"github.com/chimerical-llc/raven/internal/backoff"
|
|
|
|
|
"github.com/chimerical-llc/raven/internal/filter"
|
|
|
|
|
"github.com/chimerical-llc/raven/internal/imap"
|
|
|
|
|
"github.com/chimerical-llc/raven/internal/tracker"
|
2026-01-04 20:59:26 +00:00
|
|
|
|
|
|
|
|
goimap "github.com/emersion/go-imap/v2"
|
|
|
|
|
"github.com/emersion/go-imap/v2/imapclient"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Config holds intake worker settings.
|
|
|
|
|
type Config struct {
|
|
|
|
|
// Mode selects IDLE or poll-based message retrieval.
|
|
|
|
|
Mode Mode `yaml:"mode"`
|
|
|
|
|
// PollInterval is the duration between checks in poll mode.
|
|
|
|
|
// Ignored in IDLE mode. Examples: "5s", "1m", "24h".
|
|
|
|
|
PollInterval string `yaml:"poll_interval"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Validate checks that the configuration values are sensible.
|
|
|
|
|
func (c *Config) Validate() error {
|
|
|
|
|
if !c.Mode.Valid() {
|
|
|
|
|
return fmt.Errorf("invalid mode")
|
|
|
|
|
}
|
|
|
|
|
d, err := time.ParseDuration(c.PollInterval)
|
|
|
|
|
if d == 0 || err != nil {
|
|
|
|
|
|
|
|
|
|
return fmt.Errorf(
|
|
|
|
|
"invalid poll_interval %q: %v",
|
|
|
|
|
c.PollInterval, err,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Mode represents the message retrieval strategy.
|
|
|
|
|
type Mode string
|
|
|
|
|
|
|
|
|
|
// Valid returns true if the mode is a recognized value.
|
|
|
|
|
func (m Mode) Valid() bool {
|
|
|
|
|
return slices.Contains([]Mode{ModeIdle, ModePoll}, m)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// String returns the mode as a string.
|
|
|
|
|
func (m Mode) String() string {
|
|
|
|
|
return string(m)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
// ModeIdle uses IMAP IDLE for real-time notifications.
|
|
|
|
|
ModeIdle Mode = "idle"
|
|
|
|
|
// ModePoll uses periodic polling at a configured interval.
|
|
|
|
|
ModePoll Mode = "poll"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// Worker monitors an IMAP mailbox and dispatches message UIDs to answer
|
|
|
|
|
// workers. It maintains a persistent connection with automatic reconnection
|
|
|
|
|
// and exponential backoff on errors.
|
|
|
|
|
type Worker struct {
|
|
|
|
|
// backoff controls retry delays after connection failures.
|
|
|
|
|
backoff *backoff.Backoff
|
|
|
|
|
// cfg holds the worker configuration.
|
|
|
|
|
cfg Config
|
|
|
|
|
// ic is the IMAP client for mailbox operations.
|
|
|
|
|
ic *imap.Client
|
|
|
|
|
// interval is the parsed poll interval duration.
|
|
|
|
|
interval time.Duration
|
|
|
|
|
// log is the worker's logger with worker context.
|
|
|
|
|
log *slog.Logger
|
|
|
|
|
// tracker prevents duplicate processing across workers.
|
|
|
|
|
tracker *tracker.Tracker
|
|
|
|
|
// update signals new messages during IDLE.
|
|
|
|
|
update chan struct{}
|
|
|
|
|
// work sends fetched message UIDs to answer workers.
|
|
|
|
|
work chan<- goimap.UID
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewWorker creates an intake Worker with the provided configuration.
|
|
|
|
|
// The tracker coordinates with answer workers to prevent duplicate processing.
|
|
|
|
|
// The work channel receives UIDs for messages that need responses.
|
|
|
|
|
func NewWorker(
|
|
|
|
|
cfg Config,
|
|
|
|
|
filters filter.Filters,
|
|
|
|
|
imapConfig imap.Config,
|
|
|
|
|
log *slog.Logger,
|
|
|
|
|
tracker *tracker.Tracker,
|
|
|
|
|
work chan<- goimap.UID,
|
|
|
|
|
) (*Worker, error) {
|
|
|
|
|
if err := cfg.Validate(); err != nil {
|
|
|
|
|
return nil, fmt.Errorf("invalid config: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if tracker == nil {
|
|
|
|
|
return nil, fmt.Errorf("missing tracker")
|
|
|
|
|
}
|
|
|
|
|
if work == nil {
|
|
|
|
|
return nil, fmt.Errorf("missing work channel")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
b, err := backoff.New(backoff.Config{
|
|
|
|
|
Initial: time.Second,
|
|
|
|
|
Max: time.Minute,
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("create backoff: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ic, err := imap.NewClient(imapConfig, filters, log)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf("create imap client: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var w = &Worker{
|
|
|
|
|
backoff: b,
|
|
|
|
|
cfg: cfg,
|
|
|
|
|
ic: ic,
|
|
|
|
|
log: log.With(slog.String("worker", "intake")),
|
|
|
|
|
tracker: tracker,
|
|
|
|
|
work: work,
|
|
|
|
|
}
|
|
|
|
|
switch w.cfg.Mode {
|
|
|
|
|
case ModeIdle:
|
|
|
|
|
w.update = make(chan struct{}, 1)
|
|
|
|
|
|
|
|
|
|
case ModePoll:
|
|
|
|
|
d, err := time.ParseDuration(cfg.PollInterval)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, fmt.Errorf(
|
|
|
|
|
"parse poll interval: %v", err,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
w.interval = d
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return w, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Run starts the intake worker. It connects to the IMAP server and monitors
|
|
|
|
|
// for new messages using either IDLE or poll mode based on configuration.
|
|
|
|
|
// Handles reconnection automatically with exponential backoff on errors.
|
|
|
|
|
// Returns nil when the context is canceled.
|
|
|
|
|
func (w *Worker) Run(ctx context.Context) error {
|
|
|
|
|
w.log.InfoContext(ctx, "running intake worker")
|
|
|
|
|
defer w.log.InfoContext(ctx, "intake worker terminating")
|
|
|
|
|
defer w.ic.Disconnect(ctx)
|
|
|
|
|
|
|
|
|
|
// For IDLE mode, set up handler to signal new messages.
|
|
|
|
|
var opts *imapclient.Options
|
|
|
|
|
if w.cfg.Mode == ModeIdle {
|
|
|
|
|
opts = &imapclient.Options{
|
|
|
|
|
UnilateralDataHandler: &imapclient.UnilateralDataHandler{
|
|
|
|
|
Mailbox: func(data *imapclient.UnilateralDataMailbox) {
|
|
|
|
|
if data.NumMessages != nil {
|
|
|
|
|
select {
|
|
|
|
|
case w.update <- struct{}{}:
|
|
|
|
|
default:
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
},
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
// Context closed -- terminate.
|
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
|
|
|
w.log.InfoContext(
|
|
|
|
|
ctx,
|
|
|
|
|
"context closed",
|
|
|
|
|
slog.Any("reason", ctx.Err()),
|
|
|
|
|
)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Attempt to connect.
|
|
|
|
|
if w.ic.Client() == nil {
|
|
|
|
|
if err := w.ic.Connect(
|
|
|
|
|
ctx, w.log, opts,
|
|
|
|
|
); err != nil {
|
|
|
|
|
// Failed to connect; backoff and try again.
|
|
|
|
|
w.log.ErrorContext(
|
|
|
|
|
ctx,
|
|
|
|
|
"failed to connect to IMAP server",
|
|
|
|
|
slog.Any("error", err),
|
|
|
|
|
)
|
|
|
|
|
wait := w.backoff.Next()
|
|
|
|
|
w.log.InfoContext(
|
|
|
|
|
ctx,
|
|
|
|
|
"retrying after backoff",
|
|
|
|
|
slog.Any("wait", wait),
|
|
|
|
|
)
|
|
|
|
|
// Wait for backoff, unless context closed.
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
continue
|
|
|
|
|
case <-time.After(wait):
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Connected.
|
|
|
|
|
// For IDLE mode, verify server capability.
|
|
|
|
|
if w.cfg.Mode == ModeIdle && !w.ic.CanIdle() {
|
|
|
|
|
return fmt.Errorf(
|
|
|
|
|
"server lacks IDLE capability",
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Retrieve messages with the appropriate mode.
|
|
|
|
|
var err error
|
|
|
|
|
switch w.cfg.Mode {
|
|
|
|
|
case ModeIdle:
|
|
|
|
|
err = w.idle(ctx)
|
|
|
|
|
case ModePoll:
|
|
|
|
|
err = w.poll(ctx)
|
|
|
|
|
default:
|
|
|
|
|
return fmt.Errorf("unrecognized mode: %q", w.cfg.Mode)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Diagnose the error.
|
|
|
|
|
// Context canceled: loop, log, return.
|
|
|
|
|
if err := ctx.Err(); err != nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
// Otherwise, error on intake or idle.
|
|
|
|
|
w.log.ErrorContext(
|
|
|
|
|
ctx,
|
|
|
|
|
"IMAP error",
|
|
|
|
|
slog.Any("error", err),
|
|
|
|
|
)
|
|
|
|
|
wait := w.backoff.Next()
|
|
|
|
|
w.log.InfoContext(
|
|
|
|
|
ctx,
|
|
|
|
|
"retrying after backoff",
|
|
|
|
|
slog.Any("wait", wait),
|
|
|
|
|
)
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
continue
|
|
|
|
|
case <-time.After(wait):
|
|
|
|
|
}
|
|
|
|
|
// Force reconnect on next loop.
|
|
|
|
|
w.ic.Disconnect(ctx)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// poll runs the polling loop, checking for new messages at the configured
|
|
|
|
|
// interval. The interval could be 5 seconds or 24 hours depending on use case.
|
|
|
|
|
// Attempts to reuse the established connection, but the caller handles
|
|
|
|
|
// reconnection if the server closes an idle connection.
|
|
|
|
|
func (w *Worker) poll(ctx context.Context) error {
|
|
|
|
|
for {
|
|
|
|
|
// Refresh mailbox state before searching.
|
|
|
|
|
if err := w.ic.Noop(); err != nil {
|
|
|
|
|
return fmt.Errorf("noop: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err := w.intake(ctx); err != nil {
|
|
|
|
|
return fmt.Errorf("intake: %v", err)
|
|
|
|
|
}
|
|
|
|
|
// Connection is healthy. Reset backoff.
|
|
|
|
|
w.backoff.Reset()
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
return fmt.Errorf("context closed: %v", ctx.Err())
|
|
|
|
|
case <-time.After(w.interval):
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// idle runs the IDLE loop, using IMAP IDLE for real-time message notifications.
|
|
|
|
|
// After each intake cycle, issues an IDLE command and waits for the server
|
|
|
|
|
// to signal new messages. Handles IDLE termination and reconnection.
|
|
|
|
|
func (w *Worker) idle(ctx context.Context) error {
|
|
|
|
|
for {
|
|
|
|
|
if err := w.intake(ctx); err != nil {
|
|
|
|
|
return fmt.Errorf("intake: %v", err)
|
|
|
|
|
}
|
|
|
|
|
// Connection is healthy. Reset backoff.
|
|
|
|
|
w.backoff.Reset()
|
|
|
|
|
|
|
|
|
|
// Issue the IDLE command.
|
|
|
|
|
w.log.InfoContext(ctx, "entering IDLE mode")
|
|
|
|
|
idleCmd, err := w.ic.Idle()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("idle: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Monitor the IDLE command.
|
|
|
|
|
idleDone := make(chan error, 1)
|
|
|
|
|
go func() {
|
|
|
|
|
idleDone <- idleCmd.Wait()
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Wait for: shutdown, connection death, or new message.
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
if err := idleCmd.Close(); err != nil {
|
|
|
|
|
w.log.WarnContext(
|
|
|
|
|
ctx,
|
|
|
|
|
"failed to close IDLE command",
|
|
|
|
|
slog.Any("error", err),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
return fmt.Errorf("context closed: %v", ctx.Err())
|
|
|
|
|
|
|
|
|
|
case err := <-idleDone:
|
|
|
|
|
// Connection died or IDLE ended unexpectedly.
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("idle wait: %v", err)
|
|
|
|
|
}
|
|
|
|
|
// IDLE ended without error (server closed).
|
|
|
|
|
w.log.InfoContext(ctx, "IDLE ended; refreshing")
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
case <-w.update:
|
|
|
|
|
w.log.InfoContext(ctx, "IDLE: new message received")
|
|
|
|
|
if err := idleCmd.Close(); err != nil {
|
|
|
|
|
<-idleDone
|
|
|
|
|
return fmt.Errorf("idle close: %v", err)
|
|
|
|
|
}
|
|
|
|
|
<-idleDone
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// intake fetches unseen messages and sends their UIDs to the work channel.
|
|
|
|
|
// Uses the tracker to skip messages already being processed by answer workers.
|
|
|
|
|
// Returns early if the work queue is full, deferring remaining messages to
|
|
|
|
|
// the next cycle.
|
|
|
|
|
func (w *Worker) intake(ctx context.Context) error {
|
|
|
|
|
w.log.InfoContext(ctx, "fetching unseen messages")
|
|
|
|
|
|
|
|
|
|
uids, err := w.ic.Unseen(ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("retrieve unseen messages: %v", err)
|
|
|
|
|
}
|
|
|
|
|
if len(uids) == 0 {
|
|
|
|
|
w.log.InfoContext(ctx, "no new messages")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
w.log.InfoContext(
|
|
|
|
|
ctx,
|
|
|
|
|
"found unseen messages",
|
|
|
|
|
slog.Int("count", len(uids)),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
for _, uid := range uids {
|
|
|
|
|
if !w.tracker.TryAcquire(uid) {
|
|
|
|
|
w.log.InfoContext(
|
|
|
|
|
ctx,
|
|
|
|
|
"skipping message; already acquired",
|
|
|
|
|
slog.Any("uid", uid),
|
|
|
|
|
)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
w.tracker.Release(uid)
|
|
|
|
|
w.log.InfoContext(
|
|
|
|
|
ctx,
|
|
|
|
|
"context closed",
|
|
|
|
|
slog.Any("reason", ctx.Err()),
|
|
|
|
|
)
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
|
|
case w.work <- uid:
|
|
|
|
|
w.log.InfoContext(
|
|
|
|
|
ctx, "enqueued", slog.Any("uid", uid),
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
// Queue full, release and defer to next cycle.
|
|
|
|
|
w.tracker.Release(uid)
|
|
|
|
|
w.log.InfoContext(
|
|
|
|
|
ctx, "work queue full, deferring remaining",
|
|
|
|
|
)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|