diff --git a/internal/backoff/backoff.go b/internal/backoff/backoff.go new file mode 100644 index 0000000..add9444 --- /dev/null +++ b/internal/backoff/backoff.go @@ -0,0 +1,94 @@ +// Package backoff implements exponential backoff with jitter for retry logic. +package backoff + +import ( + "fmt" + "math" + "math/rand/v2" + "time" +) + +// Config holds the configuration for a Backoff instance. +type Config struct { + // Initial is the starting upper bound for the backoff duration. + // Must be > 0. + Initial time.Duration `yaml:"initial"` + // Max is the absolute maximum upper bound for the backoff duration. + // Must be >= Initial. + Max time.Duration `yaml:"max"` + // RNG is an optional source of randomness. + // If nil, the global math/rand/v2 source is used. + RNG *rand.Rand `yaml:"-"` +} + +// Validate checks that the configuration values are sensible. +func (cfg Config) Validate() error { + if cfg.Initial <= 0 { + return fmt.Errorf("invalid initial backoff: %v", cfg.Initial) + } + if cfg.Max < cfg.Initial { + return fmt.Errorf("invalid max duration: %v", cfg.Max) + } + // MaxInt64 will overflow when calculating jitter. + if cfg.Max == math.MaxInt64 { + return fmt.Errorf("max duration cannot be MaxInt64") + } + return nil +} + +// Backoff implements exponential backoff with full jitter. +// It is NOT safe for concurrent use. +type Backoff struct { + initial time.Duration + max time.Duration + current time.Duration + rng *rand.Rand +} + +// New creates a new Backoff instance with the provided configuration. +func New(cfg Config) (*Backoff, error) { + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid config: %v", err) + } + + return &Backoff{ + initial: cfg.Initial, + max: cfg.Max, + current: cfg.Initial, + rng: cfg.RNG, + }, nil +} + +// Next returns the next backoff delay. +func (b *Backoff) Next() time.Duration { + limit := b.current + + // Update state for the next call. + if b.current >= b.max { + b.current = b.max + } else if b.current > b.max/2 { + // If doubling would exceed max, just clamp to max. + b.current = b.max + } else { + b.current *= 2 + } + + // Calculate jitter; return random in [0, limit]. + // Int64N(n) returns values in [0, n). + // For [0, limit], we use limit + 1. + var jitter int64 + if b.rng != nil { + jitter = b.rng.Int64N(int64(limit) + 1) + } else { + jitter = rand.Int64N(int64(limit) + 1) + } + + return time.Duration(jitter) +} + +// Reset resets the current backoff cap to the initial value. +// This should be called after a successful operation to restart +// the backoff sequence for future retries. +func (b *Backoff) Reset() { + b.current = b.initial +} diff --git a/internal/backoff/backoff_test.go b/internal/backoff/backoff_test.go new file mode 100644 index 0000000..d84b64a --- /dev/null +++ b/internal/backoff/backoff_test.go @@ -0,0 +1,196 @@ +package backoff + +import ( + "math" + "math/rand/v2" + "strings" + "testing" + "time" +) + +func TestConfigValidate(t *testing.T) { + var tests = []struct { + name string + cfg Config + shouldError bool + }{ + { + name: "initial zero", + cfg: Config{Initial: 0, Max: time.Second}, + shouldError: true, + }, + { + name: "initial negative", + cfg: Config{Initial: -1, Max: time.Second}, + shouldError: true, + }, + { + name: "max less than initial", + cfg: Config{ + Initial: time.Second, + Max: 500 * time.Millisecond, + }, + shouldError: true, + }, + { + name: "max equals maxint64", + cfg: Config{ + Initial: time.Second, + Max: math.MaxInt64, + }, + shouldError: true, + }, + { + name: "valid", + cfg: Config{ + Initial: time.Second, + Max: time.Second, + }, + shouldError: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cfg.Validate() + if tt.shouldError && err == nil { + t.Fatalf("expected error, got nil") + } + if !tt.shouldError && err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + } +} + +func TestNewInvalidConfig(t *testing.T) { + _, err := New(Config{Initial: 0, Max: time.Second}) + if err == nil { + t.Fatalf("expected error, got nil") + } + if !strings.Contains(err.Error(), "invalid config") { + t.Fatalf("expected invalid config error, got %v", err) + } +} + +// TestDistribution uses a fixed seed to check the logic flow and jitter. +func TestDistribution(t *testing.T) { + // RNG with seed (1, 2) + rng := rand.New(rand.NewPCG(1, 2)) + + initial := 100 * time.Millisecond + maxDur := 400 * time.Millisecond + + b, _ := New(Config{ + Initial: initial, + Max: maxDur, + RNG: rng, + }) + + // Generate reference numbers using the same seed to predict outcomes. + refRng := rand.New(rand.NewPCG(1, 2)) + + expectedCaps := []time.Duration{ + 100 * time.Millisecond, + 200 * time.Millisecond, + 400 * time.Millisecond, + 400 * time.Millisecond, // Clamped + } + + for i, cap := range expectedCaps { + // Expect Next to pick rand(0, cap). + expected := time.Duration(refRng.Int64N(int64(cap) + 1)) + got := b.Next() + + if got != expected { + t.Errorf( + "step %d: expected %v, got %v (cap was %v)", + i, expected, got, cap, + ) + } + if got > cap { + t.Errorf( + "step %d: got %v greater than cap %v", + i, got, cap, + ) + } + } +} + +// TestOverflowProtection ensures that large Max values do not cause overflow +// during the doubling phase. +func TestOverflowProtection(t *testing.T) { + // Use a Max that is near MaxInt64. + max := time.Duration(math.MaxInt64 - 1) + start := max / 4 + + b, err := New(Config{ + Initial: start, + Max: max, + }) + if err != nil { + t.Fatalf("New failed: %v", err) + } + + // 1. Current = start. + // Next returns [0, start]. state becomes start*2 (max/2). + _ = b.Next() + + // 2. Current = max/2. + // Next returns [0, max/2]. + _ = b.Next() + + // 3. Current = max. + // Next returns [0, max]. + // Logic check: current >= max. state stays max. + val := b.Next() + if val < 0 { + t.Errorf("got negative duration %v, likely overflow", val) + } + + // Verify clamp. + // No panic implies success. + for range 5 { + b.Next() + } +} + +func TestDefaultRNG(t *testing.T) { + b, err := New(Config{ + Initial: time.Millisecond, + Max: 10 * time.Millisecond, + }) + if err != nil { + t.Fatalf("New failed: %v", err) + } + got := b.Next() + if got < 0 || got > time.Millisecond { + t.Errorf("out of bounds: %v", got) + } +} + +func TestReset(t *testing.T) { + b, _ := New(Config{ + Initial: 10 * time.Millisecond, + Max: 100 * time.Millisecond, + }) + + // Advance state + b.Next() // 10 + b.Next() // 20 + b.Next() // 40 + + b.Reset() + + // Verify that the cap is reset by checking the bounds of the next + // call. If reset works, the next call is bounded by Initial (10ms). + for range 10 { + got := b.Next() + if got > 10*time.Millisecond { + t.Fatalf( + "call after Reset returned %v; expected <= 10ms", + got, + ) + } + b.Reset() + } +} diff --git a/internal/filter/filter.go b/internal/filter/filter.go new file mode 100644 index 0000000..ecba55c --- /dev/null +++ b/internal/filter/filter.go @@ -0,0 +1,52 @@ +// Package filter provides message filtering configuration. +// It is used to restrict which messages are processed by the answer workers. +// +// NOTE: IMAP Search Limitations +// IMAP SEARCH (RFC 3501 §6.4.4) uses case-insensitive substring matching for +// text fields. +// A filter like From: "alice@example.com" would match: +// - alice@example.com +// - malice@example.com +// - alice@example.com.evil.org +// +// For this reason, server-side IMAP filters should be considered +// performance optimizations only, not security controls. Use the +// [Filters.Senders] allowlist for exact sender matching. +// +// [RFC 3501 §6.4.4]: +// https://datatracker.ietf.org/doc/html/rfc3501#section-6.4.4 +package filter + +import "strings" + +// Filters holds message filtering configuration for IMAP searches +// and sender verification. +type Filters struct { + // Body contains keywords that must appear in the message body. + Body []string `yaml:"body"` + // From filters messages by the From header field. + From string `yaml:"from"` + // Senders is an allowlist of email addresses permitted to receive + // replies. If empty, all senders are allowed. + Senders []string `yaml:"allowed_senders"` + // Subject filters messages by the Subject header field. + Subject string `yaml:"subject"` + // To filters messages by the To header field. + To string `yaml:"to"` +} + +// MatchSender checks if the sender is in the allowed list. +// Returns true if the allowlist is empty (allow all) or if the sender matches. +// Comparison is case-insensitive and ignores leading/trailing whitespace. +func (f *Filters) MatchSender(sender string) bool { + if len(f.Senders) == 0 { + return true + } + sender = strings.ToLower(strings.TrimSpace(sender)) + for _, allowed := range f.Senders { + if strings.ToLower(strings.TrimSpace(allowed)) == sender { + return true + } + } + return false +} diff --git a/internal/filter/filter_test.go b/internal/filter/filter_test.go new file mode 100644 index 0000000..340ce76 --- /dev/null +++ b/internal/filter/filter_test.go @@ -0,0 +1,91 @@ +package filter + +import "testing" + +func TestMatchSender(t *testing.T) { + tests := []struct { + name string + senders []string + sender string + want bool + }{ + { + name: "empty allowlist allows all", + senders: nil, + sender: "anyone@example.com", + want: true, + }, + { + name: "exact match", + senders: []string{"allowed@example.com"}, + sender: "allowed@example.com", + want: true, + }, + { + name: "case insensitive match", + senders: []string{"Allowed@Example.COM"}, + sender: "allowed@example.com", + want: true, + }, + { + name: "sender case insensitive", + senders: []string{"allowed@example.com"}, + sender: "ALLOWED@EXAMPLE.COM", + want: true, + }, + { + name: "whitespace trimmed from sender", + senders: []string{"allowed@example.com"}, + sender: " allowed@example.com ", + want: true, + }, + { + name: "whitespace trimmed from allowlist", + senders: []string{" allowed@example.com "}, + sender: "allowed@example.com", + want: true, + }, + { + name: "not in allowlist", + senders: []string{"allowed@example.com"}, + sender: "blocked@example.com", + want: false, + }, + { + name: "multiple allowed senders", + senders: []string{"one@example.com", "two@example.com", "three@example.com"}, + sender: "two@example.com", + want: true, + }, + { + name: "partial match not allowed", + senders: []string{"allowed@example.com"}, + sender: "allowed@example", + want: false, + }, + { + name: "empty sender with allowlist", + senders: []string{"allowed@example.com"}, + sender: "", + want: false, + }, + { + name: "empty sender with empty allowlist", + senders: nil, + sender: "", + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + f := &Filters{Senders: tt.senders} + if got := f.MatchSender(tt.sender); got != tt.want { + t.Errorf( + "MatchSender(%q) = %v, want %v", + tt.sender, got, tt.want, + ) + } + }) + } +} diff --git a/internal/imap/imap.go b/internal/imap/imap.go new file mode 100644 index 0000000..1cd6852 --- /dev/null +++ b/internal/imap/imap.go @@ -0,0 +1,264 @@ +// Package imap provides an IMAP client wrapper for monitoring and fetching +// email messages. It handles connection management, IDLE support for push +// notifications, and message operations like fetching and marking as seen. +// +// The client supports server-side filtering via [filter.Filters] to limit +// which messages are returned by [Client.Unseen]. +package imap + +import ( + "context" + "fmt" + "log/slog" + "time" + + "raven/internal/filter" + + "github.com/emersion/go-imap/v2" + "github.com/emersion/go-imap/v2/imapclient" +) + +// Config holds IMAP server connection parameters. +type Config struct { + // Host is the IMAP server hostname. + Host string `yaml:"host"` + // Password is the authentication password or app-specific password. + Password string `yaml:"password"` + // Port is the IMAP server port (typically "993" for TLS). + Port string `yaml:"port"` + // User is the authentication username, usually an email address. + User string `yaml:"user"` +} + +// Address returns the host:port string for dialing. +func (c *Config) Address() string { + return fmt.Sprintf("%s:%s", c.Host, c.Port) +} + +// Validate checks that all required fields are present. +func (c *Config) Validate() error { + if c.Host == "" { + return fmt.Errorf("missing host") + } + if c.Password == "" { + return fmt.Errorf("missing password") + } + if c.Port == "" { + return fmt.Errorf("missing port") + } + if c.User == "" { + return fmt.Errorf("missing user") + } + + return nil +} + +// Client wraps an IMAP client with connection management and filtering. +type Client struct { + cfg Config + filters filter.Filters + client *imapclient.Client + log *slog.Logger +} + +// NewClient creates a new Client with the given configuration. +// The client is not connected until [Client.Connect] is called. +func NewClient( + cfg Config, + filters filter.Filters, + log *slog.Logger, +) (*Client, error) { + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid config: %v", err) + } + return &Client{ + cfg: cfg, + filters: filters, + log: log, + }, nil +} + +// Client returns the underlying imapclient.Client. +// Returns nil if not connected. +func (c *Client) Client() *imapclient.Client { + return c.client +} + +// Connect establishes a TLS connection to the IMAP server, +// authenticates, and selects the INBOX. +func (c *Client) Connect( + ctx context.Context, + log *slog.Logger, + opts *imapclient.Options, +) error { + c.log = log + + c.log.InfoContext(ctx, "connecting to IMAP server") + client, err := imapclient.DialTLS(c.cfg.Address(), opts) + if err != nil { + return fmt.Errorf("dial TLS: %w", err) + } + c.client = client + c.log.InfoContext(ctx, "connected to IMAP server") + + c.log.InfoContext(ctx, "logging into to IMAP server") + if err := client.Login(c.cfg.User, c.cfg.Password).Wait(); err != nil { + return fmt.Errorf("login: %w", err) + } + c.log.InfoContext(ctx, "logged into IMAP server") + + c.log.InfoContext(ctx, "selecting INBOX") + if _, err := client.Select("INBOX", nil).Wait(); err != nil { + return fmt.Errorf("select: %w", err) + } + c.log.InfoContext(ctx, "selected INBOX") + + return nil +} + +// CanIdle reports whether the server advertises IDLE capability (RFC 2177). +func (c *Client) CanIdle() bool { + return c.client.Caps().Has(imap.CapIdle) +} + +// Disconnect logs out from the server and closes the connection. +// Safe to call on a nil or disconnected client. +func (c *Client) Disconnect(ctx context.Context) { + if c.client == nil { + return + } + + c.log.InfoContext(ctx, "logging out") + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + var done = make(chan error, 1) + go func() { + done <- c.client.Logout().Wait() + }() + select { + case <-ctx.Done(): + c.log.InfoContext( + ctx, + "context closed", + slog.Any("reason", ctx.Err()), + ) + + case err := <-done: + if err != nil { + c.log.Warn( + "failed to logout from IMAP server", + slog.Any("error", err), + ) + } + } + + c.client.Close() + c.client = nil +} + +// Fetch retrieves a message by UID. If peek is true, the message is +// fetched without marking it as seen (using BODY.PEEK[]). +func (c *Client) Fetch(uid imap.UID, peek bool) ( + *imapclient.FetchMessageBuffer, error, +) { + if c.client == nil { + return nil, fmt.Errorf("client not connected") + } + + opts := &imap.FetchOptions{Envelope: true, UID: true} + if peek { + opts.BodySection = []*imap.FetchItemBodySection{{Peek: true}} + } + + res, err := c.client.Fetch(imap.UIDSetNum(uid), opts).Collect() + if err != nil { + return nil, fmt.Errorf("fetch: %w", err) + } + if len(res) == 0 { + return nil, fmt.Errorf("message not found") + } + + return res[0], nil +} + +// Idle starts an IDLE command (RFC 2177) to wait for server notifications. +// The caller must call Close on the returned command to end IDLE mode. +func (c *Client) Idle() (*imapclient.IdleCommand, error) { + return c.client.Idle() +} + +// MarkSeen adds the \Seen flag to the message with the given UID. +func (c *Client) MarkSeen(uid imap.UID) error { + if c.client == nil { + return fmt.Errorf("client not connected") + } + + if _, err := c.client.Store( + imap.UIDSetNum(uid), + &imap.StoreFlags{ + Op: imap.StoreFlagsAdd, + Silent: true, + Flags: []imap.Flag{imap.FlagSeen}, + }, + nil, + ).Collect(); err != nil { + return fmt.Errorf("collect: %w", err) + } + + return nil +} + +// Noop sends a NOOP command to refresh mailbox state. +func (c *Client) Noop() error { + if c.client == nil { + return fmt.Errorf("client not connected") + } + return c.client.Noop().Wait() +} + +// Unseen returns UIDs of messages matching the client's filters that +// do not have the \Seen flag. Filters are applied server-side. +func (c *Client) Unseen(ctx context.Context) ([]imap.UID, error) { + sc := &imap.SearchCriteria{ + NotFlag: []imap.Flag{imap.FlagSeen}, + } + if len(c.filters.Body) > 0 { + sc.Body = c.filters.Body + } + if c.filters.From != "" { + sc.Header = append( + sc.Header, + imap.SearchCriteriaHeaderField{ + Key: "From", + Value: c.filters.From, + }, + ) + } + if c.filters.Subject != "" { + sc.Header = append( + sc.Header, + imap.SearchCriteriaHeaderField{ + Key: "Subject", + Value: c.filters.Subject, + }, + ) + } + if c.filters.To != "" { + sc.Header = append( + sc.Header, + imap.SearchCriteriaHeaderField{ + Key: "To", + Value: c.filters.To, + }, + ) + } + + res, err := c.client.UIDSearch(sc, nil).Wait() + if err != nil { + return nil, fmt.Errorf("search: %v", err) + } + uids := res.AllUIDs() + + return uids, nil +} diff --git a/internal/imap/imap_test.go b/internal/imap/imap_test.go new file mode 100644 index 0000000..24e3c7d --- /dev/null +++ b/internal/imap/imap_test.go @@ -0,0 +1,224 @@ +package imap + +import ( + "log/slog" + "testing" + + "raven/internal/filter" +) + +func TestConfigAddress(t *testing.T) { + tests := []struct { + name string + cfg Config + want string + }{ + { + name: "standard config", + cfg: Config{Host: "imap.example.com", Port: "993"}, + want: "imap.example.com:993", + }, + { + name: "empty host", + cfg: Config{Host: "", Port: "993"}, + want: ":993", + }, + { + name: "empty port", + cfg: Config{Host: "imap.example.com", Port: ""}, + want: "imap.example.com:", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.cfg.Address(); got != tt.want { + t.Errorf( + "Address() = %q, want %q", + got, tt.want, + ) + } + }) + } +} + +func TestConfigValidate(t *testing.T) { + tests := []struct { + name string + cfg Config + wantErr string + }{ + { + name: "valid config", + cfg: Config{ + Host: "imap.example.com", + Password: "secret", + Port: "993", + User: "user@example.com", + }, + wantErr: "", + }, + { + name: "missing host", + cfg: Config{ + Password: "secret", + Port: "993", + User: "user@example.com", + }, + wantErr: "missing host", + }, + { + name: "missing password", + cfg: Config{ + Host: "imap.example.com", + Port: "993", + User: "user@example.com", + }, + wantErr: "missing password", + }, + { + name: "missing port", + cfg: Config{ + Host: "imap.example.com", + Password: "secret", + User: "user@example.com", + }, + wantErr: "missing port", + }, + { + name: "missing user", + cfg: Config{ + Host: "imap.example.com", + Password: "secret", + Port: "993", + }, + wantErr: "missing user", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cfg.Validate() + if tt.wantErr == "" { + if err != nil { + t.Errorf( + "Validate() unexpected error: %v", + err, + ) + } + return + } + if err == nil { + t.Errorf( + "Validate() expected error containing %q, got nil", + tt.wantErr, + ) + return + } + if got := err.Error(); got != tt.wantErr { + t.Errorf( + "Validate() error = %q, want %q", + got, tt.wantErr, + ) + } + }) + } +} + +func TestNewClient(t *testing.T) { + validCfg := Config{ + Host: "imap.example.com", + Password: "secret", + Port: "993", + User: "user@example.com", + } + + tests := []struct { + name string + cfg Config + filters filter.Filters + wantErr bool + }{ + { + name: "valid config", + cfg: validCfg, + filters: filter.Filters{}, + wantErr: false, + }, + { + name: "valid config with filters", + cfg: validCfg, + filters: filter.Filters{ + Subject: "test", + To: "recipient@example.com", + Body: []string{"keyword"}, + }, + wantErr: false, + }, + { + name: "invalid config", + cfg: Config{Host: "imap.example.com"}, + filters: filter.Filters{}, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewClient( + tt.cfg, tt.filters, slog.Default(), + ) + if tt.wantErr { + if err == nil { + t.Error( + "NewClient() expected error, got nil", + ) + } + return + } + if err != nil { + t.Errorf( + "NewClient() unexpected error: %v", err, + ) + return + } + if client == nil { + t.Error("NewClient() returned nil client") + return + } + // Verify client is not connected. + if client.Client() != nil { + t.Error( + "NewClient() client should not be connected", + ) + } + }) + } +} + +func TestClientNotConnected(t *testing.T) { + cfg := Config{ + Host: "imap.example.com", + Password: "secret", + Port: "993", + User: "user@example.com", + } + client, err := NewClient(cfg, filter.Filters{}, slog.Default()) + if err != nil { + t.Fatalf("NewClient() unexpected error: %v", err) + } + + t.Run("Fetch without connection", func(t *testing.T) { + _, err := client.Fetch(1, true) + if err == nil { + t.Error("Fetch() expected error when not connected") + } + }) + + t.Run("MarkSeen without connection", func(t *testing.T) { + err := client.MarkSeen(1) + if err == nil { + t.Error("MarkSeen() expected error when not connected") + } + }) +} diff --git a/internal/intake/intake.go b/internal/intake/intake.go new file mode 100644 index 0000000..b9b5679 --- /dev/null +++ b/internal/intake/intake.go @@ -0,0 +1,398 @@ +// Package intake monitors an IMAP mailbox for new messages and dispatches +// them to answer workers for processing. It supports two modes: IDLE for +// servers with real-time push notifications, and poll for periodic checking. +package intake + +import ( + "context" + "fmt" + "log/slog" + "slices" + "time" + + "raven/internal/backoff" + "raven/internal/filter" + "raven/internal/imap" + "raven/internal/tracker" + + goimap "github.com/emersion/go-imap/v2" + "github.com/emersion/go-imap/v2/imapclient" +) + +// Config holds intake worker settings. +type Config struct { + // Mode selects IDLE or poll-based message retrieval. + Mode Mode `yaml:"mode"` + // PollInterval is the duration between checks in poll mode. + // Ignored in IDLE mode. Examples: "5s", "1m", "24h". + PollInterval string `yaml:"poll_interval"` +} + +// Validate checks that the configuration values are sensible. +func (c *Config) Validate() error { + if !c.Mode.Valid() { + return fmt.Errorf("invalid mode") + } + d, err := time.ParseDuration(c.PollInterval) + if d == 0 || err != nil { + + return fmt.Errorf( + "invalid poll_interval %q: %v", + c.PollInterval, err, + ) + } + + return nil +} + +// Mode represents the message retrieval strategy. +type Mode string + +// Valid returns true if the mode is a recognized value. +func (m Mode) Valid() bool { + return slices.Contains([]Mode{ModeIdle, ModePoll}, m) +} + +// String returns the mode as a string. +func (m Mode) String() string { + return string(m) +} + +const ( + // ModeIdle uses IMAP IDLE for real-time notifications. + ModeIdle Mode = "idle" + // ModePoll uses periodic polling at a configured interval. + ModePoll Mode = "poll" +) + +// Worker monitors an IMAP mailbox and dispatches message UIDs to answer +// workers. It maintains a persistent connection with automatic reconnection +// and exponential backoff on errors. +type Worker struct { + // backoff controls retry delays after connection failures. + backoff *backoff.Backoff + // cfg holds the worker configuration. + cfg Config + // ic is the IMAP client for mailbox operations. + ic *imap.Client + // interval is the parsed poll interval duration. + interval time.Duration + // log is the worker's logger with worker context. + log *slog.Logger + // tracker prevents duplicate processing across workers. + tracker *tracker.Tracker + // update signals new messages during IDLE. + update chan struct{} + // work sends fetched message UIDs to answer workers. + work chan<- goimap.UID +} + +// NewWorker creates an intake Worker with the provided configuration. +// The tracker coordinates with answer workers to prevent duplicate processing. +// The work channel receives UIDs for messages that need responses. +func NewWorker( + cfg Config, + filters filter.Filters, + imapConfig imap.Config, + log *slog.Logger, + tracker *tracker.Tracker, + work chan<- goimap.UID, +) (*Worker, error) { + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid config: %v", err) + } + if tracker == nil { + return nil, fmt.Errorf("missing tracker") + } + if work == nil { + return nil, fmt.Errorf("missing work channel") + } + + b, err := backoff.New(backoff.Config{ + Initial: time.Second, + Max: time.Minute, + }) + if err != nil { + return nil, fmt.Errorf("create backoff: %v", err) + } + + ic, err := imap.NewClient(imapConfig, filters, log) + if err != nil { + return nil, fmt.Errorf("create imap client: %v", err) + } + + var w = &Worker{ + backoff: b, + cfg: cfg, + ic: ic, + log: log.With(slog.String("worker", "intake")), + tracker: tracker, + work: work, + } + switch w.cfg.Mode { + case ModeIdle: + w.update = make(chan struct{}, 1) + + case ModePoll: + d, err := time.ParseDuration(cfg.PollInterval) + if err != nil { + return nil, fmt.Errorf( + "parse poll interval: %v", err, + ) + } + w.interval = d + } + + return w, nil +} + +// Run starts the intake worker. It connects to the IMAP server and monitors +// for new messages using either IDLE or poll mode based on configuration. +// Handles reconnection automatically with exponential backoff on errors. +// Returns nil when the context is canceled. +func (w *Worker) Run(ctx context.Context) error { + w.log.InfoContext(ctx, "running intake worker") + defer w.log.InfoContext(ctx, "intake worker terminating") + defer w.ic.Disconnect(ctx) + + // For IDLE mode, set up handler to signal new messages. + var opts *imapclient.Options + if w.cfg.Mode == ModeIdle { + opts = &imapclient.Options{ + UnilateralDataHandler: &imapclient.UnilateralDataHandler{ + Mailbox: func(data *imapclient.UnilateralDataMailbox) { + if data.NumMessages != nil { + select { + case w.update <- struct{}{}: + default: + } + } + }, + }, + } + } + for { + // Context closed -- terminate. + if err := ctx.Err(); err != nil { + w.log.InfoContext( + ctx, + "context closed", + slog.Any("reason", ctx.Err()), + ) + return nil + } + + // Attempt to connect. + if w.ic.Client() == nil { + if err := w.ic.Connect( + ctx, w.log, opts, + ); err != nil { + // Failed to connect; backoff and try again. + w.log.ErrorContext( + ctx, + "failed to connect to IMAP server", + slog.Any("error", err), + ) + wait := w.backoff.Next() + w.log.InfoContext( + ctx, + "retrying after backoff", + slog.Any("wait", wait), + ) + // Wait for backoff, unless context closed. + select { + case <-ctx.Done(): + continue + case <-time.After(wait): + continue + } + } + + // Connected. + // For IDLE mode, verify server capability. + if w.cfg.Mode == ModeIdle && !w.ic.CanIdle() { + return fmt.Errorf( + "server lacks IDLE capability", + ) + } + } + + // Retrieve messages with the appropriate mode. + var err error + switch w.cfg.Mode { + case ModeIdle: + err = w.idle(ctx) + case ModePoll: + err = w.poll(ctx) + default: + return fmt.Errorf("unrecognized mode: %q", w.cfg.Mode) + } + if err != nil { + // Diagnose the error. + // Context canceled: loop, log, return. + if err := ctx.Err(); err != nil { + continue + } + // Otherwise, error on intake or idle. + w.log.ErrorContext( + ctx, + "IMAP error", + slog.Any("error", err), + ) + wait := w.backoff.Next() + w.log.InfoContext( + ctx, + "retrying after backoff", + slog.Any("wait", wait), + ) + select { + case <-ctx.Done(): + continue + case <-time.After(wait): + } + // Force reconnect on next loop. + w.ic.Disconnect(ctx) + } + } +} + +// poll runs the polling loop, checking for new messages at the configured +// interval. The interval could be 5 seconds or 24 hours depending on use case. +// Attempts to reuse the established connection, but the caller handles +// reconnection if the server closes an idle connection. +func (w *Worker) poll(ctx context.Context) error { + for { + // Refresh mailbox state before searching. + if err := w.ic.Noop(); err != nil { + return fmt.Errorf("noop: %v", err) + } + + if err := w.intake(ctx); err != nil { + return fmt.Errorf("intake: %v", err) + } + // Connection is healthy. Reset backoff. + w.backoff.Reset() + + select { + case <-ctx.Done(): + return fmt.Errorf("context closed: %v", ctx.Err()) + case <-time.After(w.interval): + continue + } + } +} + +// idle runs the IDLE loop, using IMAP IDLE for real-time message notifications. +// After each intake cycle, issues an IDLE command and waits for the server +// to signal new messages. Handles IDLE termination and reconnection. +func (w *Worker) idle(ctx context.Context) error { + for { + if err := w.intake(ctx); err != nil { + return fmt.Errorf("intake: %v", err) + } + // Connection is healthy. Reset backoff. + w.backoff.Reset() + + // Issue the IDLE command. + w.log.InfoContext(ctx, "entering IDLE mode") + idleCmd, err := w.ic.Idle() + if err != nil { + return fmt.Errorf("idle: %v", err) + } + + // Monitor the IDLE command. + idleDone := make(chan error, 1) + go func() { + idleDone <- idleCmd.Wait() + }() + + // Wait for: shutdown, connection death, or new message. + select { + case <-ctx.Done(): + if err := idleCmd.Close(); err != nil { + w.log.WarnContext( + ctx, + "failed to close IDLE command", + slog.Any("error", err), + ) + } + return fmt.Errorf("context closed: %v", ctx.Err()) + + case err := <-idleDone: + // Connection died or IDLE ended unexpectedly. + if err != nil { + return fmt.Errorf("idle wait: %v", err) + } + // IDLE ended without error (server closed). + w.log.InfoContext(ctx, "IDLE ended; refreshing") + return nil + + case <-w.update: + w.log.InfoContext(ctx, "IDLE: new message received") + if err := idleCmd.Close(); err != nil { + <-idleDone + return fmt.Errorf("idle close: %v", err) + } + <-idleDone + } + } +} + +// intake fetches unseen messages and sends their UIDs to the work channel. +// Uses the tracker to skip messages already being processed by answer workers. +// Returns early if the work queue is full, deferring remaining messages to +// the next cycle. +func (w *Worker) intake(ctx context.Context) error { + w.log.InfoContext(ctx, "fetching unseen messages") + + uids, err := w.ic.Unseen(ctx) + if err != nil { + return fmt.Errorf("retrieve unseen messages: %v", err) + } + if len(uids) == 0 { + w.log.InfoContext(ctx, "no new messages") + return nil + } + w.log.InfoContext( + ctx, + "found unseen messages", + slog.Int("count", len(uids)), + ) + + for _, uid := range uids { + if !w.tracker.TryAcquire(uid) { + w.log.InfoContext( + ctx, + "skipping message; already acquired", + slog.Any("uid", uid), + ) + continue + } + + select { + case <-ctx.Done(): + w.tracker.Release(uid) + w.log.InfoContext( + ctx, + "context closed", + slog.Any("reason", ctx.Err()), + ) + return nil + + case w.work <- uid: + w.log.InfoContext( + ctx, "enqueued", slog.Any("uid", uid), + ) + + default: + // Queue full, release and defer to next cycle. + w.tracker.Release(uid) + w.log.InfoContext( + ctx, "work queue full, deferring remaining", + ) + return nil + } + } + + return nil +} diff --git a/internal/intake/intake_test.go b/internal/intake/intake_test.go new file mode 100644 index 0000000..7a06183 --- /dev/null +++ b/internal/intake/intake_test.go @@ -0,0 +1,210 @@ +package intake + +import ( + "strings" + "testing" + + "raven/internal/filter" + "raven/internal/imap" + "raven/internal/tracker" + + goimap "github.com/emersion/go-imap/v2" +) + +func TestConfigValidate(t *testing.T) { + tests := []struct { + name string + cfg Config + wantErr bool + }{ + { + name: "valid idle mode", + cfg: Config{ + Mode: ModeIdle, + PollInterval: "30s", + }, + wantErr: false, + }, + { + name: "valid poll mode", + cfg: Config{ + Mode: ModePoll, + PollInterval: "5m", + }, + wantErr: false, + }, + { + name: "valid poll mode with long interval", + cfg: Config{ + Mode: ModePoll, + PollInterval: "24h", + }, + wantErr: false, + }, + { + name: "invalid mode", + cfg: Config{ + Mode: "invalid", + PollInterval: "30s", + }, + wantErr: true, + }, + { + name: "empty mode", + cfg: Config{ + Mode: "", + PollInterval: "30s", + }, + wantErr: true, + }, + { + name: "missing poll interval", + cfg: Config{ + Mode: ModeIdle, + PollInterval: "", + }, + wantErr: true, + }, + { + name: "invalid poll interval", + cfg: Config{ + Mode: ModePoll, + PollInterval: "invalid", + }, + wantErr: true, + }, + { + name: "zero poll interval", + cfg: Config{ + Mode: ModePoll, + PollInterval: "0s", + }, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cfg.Validate() + if (err != nil) != tt.wantErr { + t.Errorf( + "Validate() error = %v, wantErr %v", + err, tt.wantErr, + ) + } + }) + } +} + +func TestModeValid(t *testing.T) { + tests := []struct { + mode Mode + want bool + }{ + {ModeIdle, true}, + {ModePoll, true}, + {"idle", true}, + {"poll", true}, + {"", false}, + {"invalid", false}, + {"IDLE", false}, + {"POLL", false}, + } + + for _, tt := range tests { + t.Run(string(tt.mode), func(t *testing.T) { + if got := tt.mode.Valid(); got != tt.want { + t.Errorf( + "Mode(%q).Valid() = %v, want %v", + tt.mode, got, tt.want, + ) + } + }) + } +} + +func TestModeString(t *testing.T) { + tests := []struct { + mode Mode + want string + }{ + {ModeIdle, "idle"}, + {ModePoll, "poll"}, + {"custom", "custom"}, + } + + for _, tt := range tests { + t.Run(tt.want, func(t *testing.T) { + if got := tt.mode.String(); got != tt.want { + t.Errorf( + "Mode.String() = %v, want %v", + got, tt.want, + ) + } + }) + } +} + +func TestNewWorkerValidation(t *testing.T) { + validConfig := Config{ + Mode: ModeIdle, + PollInterval: "30s", + } + validTracker := tracker.New() + validWork := make(chan goimap.UID, 1) + + tests := []struct { + name string + cfg Config + tracker *tracker.Tracker + work chan goimap.UID + wantErr string + }{ + { + name: "nil tracker", + cfg: validConfig, + tracker: nil, + work: validWork, + wantErr: "missing tracker", + }, + { + name: "nil work channel", + cfg: validConfig, + tracker: validTracker, + work: nil, + wantErr: "missing work channel", + }, + { + name: "invalid config", + cfg: Config{ + Mode: "invalid", + PollInterval: "30s", + }, + tracker: validTracker, + work: validWork, + wantErr: "invalid config", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewWorker( + tt.cfg, + filter.Filters{}, + imap.Config{}, + nil, // log + tt.tracker, + tt.work, + ) + if err == nil { + t.Fatal("NewWorker() expected error, got nil") + } + if !strings.Contains(err.Error(), tt.wantErr) { + t.Errorf( + "NewWorker() error = %q, want containing %q", + err.Error(), tt.wantErr, + ) + } + }) + } +} diff --git a/internal/tracker/tracker.go b/internal/tracker/tracker.go new file mode 100644 index 0000000..510cbad --- /dev/null +++ b/internal/tracker/tracker.go @@ -0,0 +1,43 @@ +// Package tracker prevents duplicate processing of IMAP messages by +// maintaining a transient set of active, in-flight UIDs. +package tracker + +import ( + "sync" + + "github.com/emersion/go-imap/v2" +) + +// Tracker ensures unique concurrent processing of messages. +// This prevents race conditions where a message might be fetched +// again before its initial processing completes. +type Tracker struct { + mu sync.Mutex + ids map[imap.UID]struct{} +} + +// New returns a ready-to-use Tracker. +func New() *Tracker { + return &Tracker{ + ids: make(map[imap.UID]struct{}), + } +} + +// TryAcquire attempts to claim exclusive processing rights for a UID. +// It returns true only if the UID is not currently being tracked. +func (t *Tracker) TryAcquire(uid imap.UID) bool { + t.mu.Lock() + defer t.mu.Unlock() + if _, exists := t.ids[uid]; exists { + return false + } + t.ids[uid] = struct{}{} + return true +} + +// Release relinquishes processing rights for a UID, allowing it to be acquired again. +func (t *Tracker) Release(uid imap.UID) { + t.mu.Lock() + defer t.mu.Unlock() + delete(t.ids, uid) +} diff --git a/internal/tracker/tracker_test.go b/internal/tracker/tracker_test.go new file mode 100644 index 0000000..cf8abd7 --- /dev/null +++ b/internal/tracker/tracker_test.go @@ -0,0 +1,94 @@ +package tracker + +import ( + "sync" + "testing" + + "github.com/emersion/go-imap/v2" +) + +func TestNew(t *testing.T) { + tr := New() + if tr == nil { + t.Fatal("New() returned nil") + } + if tr.ids == nil { + t.Fatal("New() initialized with nil map") + } +} + +func TestLifecycle(t *testing.T) { + tr := New() + uid := imap.UID(123) + + // 1. First acquire should succeed. + if !tr.TryAcquire(uid) { + t.Fatalf( + "expected TryAcquire(%d) to return true, got false", + uid, + ) + } + + // 2. Second acquire on same UID should fail. + if tr.TryAcquire(uid) { + t.Fatalf( + "expected TryAcquire(%d) to return false when already held, got true", + uid, + ) + } + + // 3. Different UID should succeed. + otherUID := imap.UID(456) + if !tr.TryAcquire(otherUID) { + t.Fatalf( + "expected TryAcquire(%d) to return true, got false", + otherUID, + ) + } + + // 4. Release first UID. + tr.Release(uid) + + // 5. First UID should be acquirable again. + if !tr.TryAcquire(uid) { + t.Fatalf( + "expected TryAcquire(%d) to return true after Release, got false", + uid, + ) + } +} + +func TestConcurrency(t *testing.T) { + tr := New() + uid := imap.UID(1) + routines := 16 + var wg sync.WaitGroup + + // Counter for successful acquisitions. + successCount := 0 + var mu sync.Mutex + + for range routines { + wg.Go(func() { + if tr.TryAcquire(uid) { + mu.Lock() + successCount++ + mu.Unlock() + } + }) + } + wg.Wait() + + if successCount != 1 { + t.Errorf( + "expected exactly 1 successful acquisition for concurrent access to same UID, got %d", + successCount, + ) + } + + // Verify we can release safely after concurrent hammer. + tr.Release(uid) + if !tr.TryAcquire(uid) { + t.Error("failed to acquire UID after concurrent test release") + } +}