Add intake worker

Subsystem to monitor IMAP mailbox for new messages.

Introduces:
- intake: worker that uses IDLE or polling to detect new emails.
- imap: client wrapper for connection management and IMAP commands.
- filter: logic for IMAP search and sender allow-list.
- tracker: concurrency control to prevent processing the same UID twice.
- backoff: for handling connection retries with jitter.
This commit is contained in:
dwrz
2026-01-04 20:59:26 +00:00
parent ce3943cc1d
commit c53ee5f6ad
10 changed files with 1666 additions and 0 deletions

View File

@@ -0,0 +1,94 @@
// Package backoff implements exponential backoff with jitter for retry logic.
package backoff
import (
"fmt"
"math"
"math/rand/v2"
"time"
)
// Config holds the configuration for a Backoff instance.
type Config struct {
// Initial is the starting upper bound for the backoff duration.
// Must be > 0.
Initial time.Duration `yaml:"initial"`
// Max is the absolute maximum upper bound for the backoff duration.
// Must be >= Initial.
Max time.Duration `yaml:"max"`
// RNG is an optional source of randomness.
// If nil, the global math/rand/v2 source is used.
RNG *rand.Rand `yaml:"-"`
}
// Validate checks that the configuration values are sensible.
func (cfg Config) Validate() error {
if cfg.Initial <= 0 {
return fmt.Errorf("invalid initial backoff: %v", cfg.Initial)
}
if cfg.Max < cfg.Initial {
return fmt.Errorf("invalid max duration: %v", cfg.Max)
}
// MaxInt64 will overflow when calculating jitter.
if cfg.Max == math.MaxInt64 {
return fmt.Errorf("max duration cannot be MaxInt64")
}
return nil
}
// Backoff implements exponential backoff with full jitter.
// It is NOT safe for concurrent use.
type Backoff struct {
initial time.Duration
max time.Duration
current time.Duration
rng *rand.Rand
}
// New creates a new Backoff instance with the provided configuration.
func New(cfg Config) (*Backoff, error) {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %v", err)
}
return &Backoff{
initial: cfg.Initial,
max: cfg.Max,
current: cfg.Initial,
rng: cfg.RNG,
}, nil
}
// Next returns the next backoff delay.
func (b *Backoff) Next() time.Duration {
limit := b.current
// Update state for the next call.
if b.current >= b.max {
b.current = b.max
} else if b.current > b.max/2 {
// If doubling would exceed max, just clamp to max.
b.current = b.max
} else {
b.current *= 2
}
// Calculate jitter; return random in [0, limit].
// Int64N(n) returns values in [0, n).
// For [0, limit], we use limit + 1.
var jitter int64
if b.rng != nil {
jitter = b.rng.Int64N(int64(limit) + 1)
} else {
jitter = rand.Int64N(int64(limit) + 1)
}
return time.Duration(jitter)
}
// Reset resets the current backoff cap to the initial value.
// This should be called after a successful operation to restart
// the backoff sequence for future retries.
func (b *Backoff) Reset() {
b.current = b.initial
}

View File

@@ -0,0 +1,196 @@
package backoff
import (
"math"
"math/rand/v2"
"strings"
"testing"
"time"
)
func TestConfigValidate(t *testing.T) {
var tests = []struct {
name string
cfg Config
shouldError bool
}{
{
name: "initial zero",
cfg: Config{Initial: 0, Max: time.Second},
shouldError: true,
},
{
name: "initial negative",
cfg: Config{Initial: -1, Max: time.Second},
shouldError: true,
},
{
name: "max less than initial",
cfg: Config{
Initial: time.Second,
Max: 500 * time.Millisecond,
},
shouldError: true,
},
{
name: "max equals maxint64",
cfg: Config{
Initial: time.Second,
Max: math.MaxInt64,
},
shouldError: true,
},
{
name: "valid",
cfg: Config{
Initial: time.Second,
Max: time.Second,
},
shouldError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.shouldError && err == nil {
t.Fatalf("expected error, got nil")
}
if !tt.shouldError && err != nil {
t.Fatalf("unexpected error: %v", err)
}
})
}
}
func TestNewInvalidConfig(t *testing.T) {
_, err := New(Config{Initial: 0, Max: time.Second})
if err == nil {
t.Fatalf("expected error, got nil")
}
if !strings.Contains(err.Error(), "invalid config") {
t.Fatalf("expected invalid config error, got %v", err)
}
}
// TestDistribution uses a fixed seed to check the logic flow and jitter.
func TestDistribution(t *testing.T) {
// RNG with seed (1, 2)
rng := rand.New(rand.NewPCG(1, 2))
initial := 100 * time.Millisecond
maxDur := 400 * time.Millisecond
b, _ := New(Config{
Initial: initial,
Max: maxDur,
RNG: rng,
})
// Generate reference numbers using the same seed to predict outcomes.
refRng := rand.New(rand.NewPCG(1, 2))
expectedCaps := []time.Duration{
100 * time.Millisecond,
200 * time.Millisecond,
400 * time.Millisecond,
400 * time.Millisecond, // Clamped
}
for i, cap := range expectedCaps {
// Expect Next to pick rand(0, cap).
expected := time.Duration(refRng.Int64N(int64(cap) + 1))
got := b.Next()
if got != expected {
t.Errorf(
"step %d: expected %v, got %v (cap was %v)",
i, expected, got, cap,
)
}
if got > cap {
t.Errorf(
"step %d: got %v greater than cap %v",
i, got, cap,
)
}
}
}
// TestOverflowProtection ensures that large Max values do not cause overflow
// during the doubling phase.
func TestOverflowProtection(t *testing.T) {
// Use a Max that is near MaxInt64.
max := time.Duration(math.MaxInt64 - 1)
start := max / 4
b, err := New(Config{
Initial: start,
Max: max,
})
if err != nil {
t.Fatalf("New failed: %v", err)
}
// 1. Current = start.
// Next returns [0, start]. state becomes start*2 (max/2).
_ = b.Next()
// 2. Current = max/2.
// Next returns [0, max/2].
_ = b.Next()
// 3. Current = max.
// Next returns [0, max].
// Logic check: current >= max. state stays max.
val := b.Next()
if val < 0 {
t.Errorf("got negative duration %v, likely overflow", val)
}
// Verify clamp.
// No panic implies success.
for range 5 {
b.Next()
}
}
func TestDefaultRNG(t *testing.T) {
b, err := New(Config{
Initial: time.Millisecond,
Max: 10 * time.Millisecond,
})
if err != nil {
t.Fatalf("New failed: %v", err)
}
got := b.Next()
if got < 0 || got > time.Millisecond {
t.Errorf("out of bounds: %v", got)
}
}
func TestReset(t *testing.T) {
b, _ := New(Config{
Initial: 10 * time.Millisecond,
Max: 100 * time.Millisecond,
})
// Advance state
b.Next() // 10
b.Next() // 20
b.Next() // 40
b.Reset()
// Verify that the cap is reset by checking the bounds of the next
// call. If reset works, the next call is bounded by Initial (10ms).
for range 10 {
got := b.Next()
if got > 10*time.Millisecond {
t.Fatalf(
"call after Reset returned %v; expected <= 10ms",
got,
)
}
b.Reset()
}
}

52
internal/filter/filter.go Normal file
View File

@@ -0,0 +1,52 @@
// Package filter provides message filtering configuration.
// It is used to restrict which messages are processed by the answer workers.
//
// NOTE: IMAP Search Limitations
// IMAP SEARCH (RFC 3501 §6.4.4) uses case-insensitive substring matching for
// text fields.
// A filter like From: "alice@example.com" would match:
// - alice@example.com
// - malice@example.com
// - alice@example.com.evil.org
//
// For this reason, server-side IMAP filters should be considered
// performance optimizations only, not security controls. Use the
// [Filters.Senders] allowlist for exact sender matching.
//
// [RFC 3501 §6.4.4]:
// https://datatracker.ietf.org/doc/html/rfc3501#section-6.4.4
package filter
import "strings"
// Filters holds message filtering configuration for IMAP searches
// and sender verification.
type Filters struct {
// Body contains keywords that must appear in the message body.
Body []string `yaml:"body"`
// From filters messages by the From header field.
From string `yaml:"from"`
// Senders is an allowlist of email addresses permitted to receive
// replies. If empty, all senders are allowed.
Senders []string `yaml:"allowed_senders"`
// Subject filters messages by the Subject header field.
Subject string `yaml:"subject"`
// To filters messages by the To header field.
To string `yaml:"to"`
}
// MatchSender checks if the sender is in the allowed list.
// Returns true if the allowlist is empty (allow all) or if the sender matches.
// Comparison is case-insensitive and ignores leading/trailing whitespace.
func (f *Filters) MatchSender(sender string) bool {
if len(f.Senders) == 0 {
return true
}
sender = strings.ToLower(strings.TrimSpace(sender))
for _, allowed := range f.Senders {
if strings.ToLower(strings.TrimSpace(allowed)) == sender {
return true
}
}
return false
}

View File

@@ -0,0 +1,91 @@
package filter
import "testing"
func TestMatchSender(t *testing.T) {
tests := []struct {
name string
senders []string
sender string
want bool
}{
{
name: "empty allowlist allows all",
senders: nil,
sender: "anyone@example.com",
want: true,
},
{
name: "exact match",
senders: []string{"allowed@example.com"},
sender: "allowed@example.com",
want: true,
},
{
name: "case insensitive match",
senders: []string{"Allowed@Example.COM"},
sender: "allowed@example.com",
want: true,
},
{
name: "sender case insensitive",
senders: []string{"allowed@example.com"},
sender: "ALLOWED@EXAMPLE.COM",
want: true,
},
{
name: "whitespace trimmed from sender",
senders: []string{"allowed@example.com"},
sender: " allowed@example.com ",
want: true,
},
{
name: "whitespace trimmed from allowlist",
senders: []string{" allowed@example.com "},
sender: "allowed@example.com",
want: true,
},
{
name: "not in allowlist",
senders: []string{"allowed@example.com"},
sender: "blocked@example.com",
want: false,
},
{
name: "multiple allowed senders",
senders: []string{"one@example.com", "two@example.com", "three@example.com"},
sender: "two@example.com",
want: true,
},
{
name: "partial match not allowed",
senders: []string{"allowed@example.com"},
sender: "allowed@example",
want: false,
},
{
name: "empty sender with allowlist",
senders: []string{"allowed@example.com"},
sender: "",
want: false,
},
{
name: "empty sender with empty allowlist",
senders: nil,
sender: "",
want: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
f := &Filters{Senders: tt.senders}
if got := f.MatchSender(tt.sender); got != tt.want {
t.Errorf(
"MatchSender(%q) = %v, want %v",
tt.sender, got, tt.want,
)
}
})
}
}

264
internal/imap/imap.go Normal file
View File

@@ -0,0 +1,264 @@
// Package imap provides an IMAP client wrapper for monitoring and fetching
// email messages. It handles connection management, IDLE support for push
// notifications, and message operations like fetching and marking as seen.
//
// The client supports server-side filtering via [filter.Filters] to limit
// which messages are returned by [Client.Unseen].
package imap
import (
"context"
"fmt"
"log/slog"
"time"
"raven/internal/filter"
"github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
)
// Config holds IMAP server connection parameters.
type Config struct {
// Host is the IMAP server hostname.
Host string `yaml:"host"`
// Password is the authentication password or app-specific password.
Password string `yaml:"password"`
// Port is the IMAP server port (typically "993" for TLS).
Port string `yaml:"port"`
// User is the authentication username, usually an email address.
User string `yaml:"user"`
}
// Address returns the host:port string for dialing.
func (c *Config) Address() string {
return fmt.Sprintf("%s:%s", c.Host, c.Port)
}
// Validate checks that all required fields are present.
func (c *Config) Validate() error {
if c.Host == "" {
return fmt.Errorf("missing host")
}
if c.Password == "" {
return fmt.Errorf("missing password")
}
if c.Port == "" {
return fmt.Errorf("missing port")
}
if c.User == "" {
return fmt.Errorf("missing user")
}
return nil
}
// Client wraps an IMAP client with connection management and filtering.
type Client struct {
cfg Config
filters filter.Filters
client *imapclient.Client
log *slog.Logger
}
// NewClient creates a new Client with the given configuration.
// The client is not connected until [Client.Connect] is called.
func NewClient(
cfg Config,
filters filter.Filters,
log *slog.Logger,
) (*Client, error) {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %v", err)
}
return &Client{
cfg: cfg,
filters: filters,
log: log,
}, nil
}
// Client returns the underlying imapclient.Client.
// Returns nil if not connected.
func (c *Client) Client() *imapclient.Client {
return c.client
}
// Connect establishes a TLS connection to the IMAP server,
// authenticates, and selects the INBOX.
func (c *Client) Connect(
ctx context.Context,
log *slog.Logger,
opts *imapclient.Options,
) error {
c.log = log
c.log.InfoContext(ctx, "connecting to IMAP server")
client, err := imapclient.DialTLS(c.cfg.Address(), opts)
if err != nil {
return fmt.Errorf("dial TLS: %w", err)
}
c.client = client
c.log.InfoContext(ctx, "connected to IMAP server")
c.log.InfoContext(ctx, "logging into to IMAP server")
if err := client.Login(c.cfg.User, c.cfg.Password).Wait(); err != nil {
return fmt.Errorf("login: %w", err)
}
c.log.InfoContext(ctx, "logged into IMAP server")
c.log.InfoContext(ctx, "selecting INBOX")
if _, err := client.Select("INBOX", nil).Wait(); err != nil {
return fmt.Errorf("select: %w", err)
}
c.log.InfoContext(ctx, "selected INBOX")
return nil
}
// CanIdle reports whether the server advertises IDLE capability (RFC 2177).
func (c *Client) CanIdle() bool {
return c.client.Caps().Has(imap.CapIdle)
}
// Disconnect logs out from the server and closes the connection.
// Safe to call on a nil or disconnected client.
func (c *Client) Disconnect(ctx context.Context) {
if c.client == nil {
return
}
c.log.InfoContext(ctx, "logging out")
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
var done = make(chan error, 1)
go func() {
done <- c.client.Logout().Wait()
}()
select {
case <-ctx.Done():
c.log.InfoContext(
ctx,
"context closed",
slog.Any("reason", ctx.Err()),
)
case err := <-done:
if err != nil {
c.log.Warn(
"failed to logout from IMAP server",
slog.Any("error", err),
)
}
}
c.client.Close()
c.client = nil
}
// Fetch retrieves a message by UID. If peek is true, the message is
// fetched without marking it as seen (using BODY.PEEK[]).
func (c *Client) Fetch(uid imap.UID, peek bool) (
*imapclient.FetchMessageBuffer, error,
) {
if c.client == nil {
return nil, fmt.Errorf("client not connected")
}
opts := &imap.FetchOptions{Envelope: true, UID: true}
if peek {
opts.BodySection = []*imap.FetchItemBodySection{{Peek: true}}
}
res, err := c.client.Fetch(imap.UIDSetNum(uid), opts).Collect()
if err != nil {
return nil, fmt.Errorf("fetch: %w", err)
}
if len(res) == 0 {
return nil, fmt.Errorf("message not found")
}
return res[0], nil
}
// Idle starts an IDLE command (RFC 2177) to wait for server notifications.
// The caller must call Close on the returned command to end IDLE mode.
func (c *Client) Idle() (*imapclient.IdleCommand, error) {
return c.client.Idle()
}
// MarkSeen adds the \Seen flag to the message with the given UID.
func (c *Client) MarkSeen(uid imap.UID) error {
if c.client == nil {
return fmt.Errorf("client not connected")
}
if _, err := c.client.Store(
imap.UIDSetNum(uid),
&imap.StoreFlags{
Op: imap.StoreFlagsAdd,
Silent: true,
Flags: []imap.Flag{imap.FlagSeen},
},
nil,
).Collect(); err != nil {
return fmt.Errorf("collect: %w", err)
}
return nil
}
// Noop sends a NOOP command to refresh mailbox state.
func (c *Client) Noop() error {
if c.client == nil {
return fmt.Errorf("client not connected")
}
return c.client.Noop().Wait()
}
// Unseen returns UIDs of messages matching the client's filters that
// do not have the \Seen flag. Filters are applied server-side.
func (c *Client) Unseen(ctx context.Context) ([]imap.UID, error) {
sc := &imap.SearchCriteria{
NotFlag: []imap.Flag{imap.FlagSeen},
}
if len(c.filters.Body) > 0 {
sc.Body = c.filters.Body
}
if c.filters.From != "" {
sc.Header = append(
sc.Header,
imap.SearchCriteriaHeaderField{
Key: "From",
Value: c.filters.From,
},
)
}
if c.filters.Subject != "" {
sc.Header = append(
sc.Header,
imap.SearchCriteriaHeaderField{
Key: "Subject",
Value: c.filters.Subject,
},
)
}
if c.filters.To != "" {
sc.Header = append(
sc.Header,
imap.SearchCriteriaHeaderField{
Key: "To",
Value: c.filters.To,
},
)
}
res, err := c.client.UIDSearch(sc, nil).Wait()
if err != nil {
return nil, fmt.Errorf("search: %v", err)
}
uids := res.AllUIDs()
return uids, nil
}

224
internal/imap/imap_test.go Normal file
View File

@@ -0,0 +1,224 @@
package imap
import (
"log/slog"
"testing"
"raven/internal/filter"
)
func TestConfigAddress(t *testing.T) {
tests := []struct {
name string
cfg Config
want string
}{
{
name: "standard config",
cfg: Config{Host: "imap.example.com", Port: "993"},
want: "imap.example.com:993",
},
{
name: "empty host",
cfg: Config{Host: "", Port: "993"},
want: ":993",
},
{
name: "empty port",
cfg: Config{Host: "imap.example.com", Port: ""},
want: "imap.example.com:",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := tt.cfg.Address(); got != tt.want {
t.Errorf(
"Address() = %q, want %q",
got, tt.want,
)
}
})
}
}
func TestConfigValidate(t *testing.T) {
tests := []struct {
name string
cfg Config
wantErr string
}{
{
name: "valid config",
cfg: Config{
Host: "imap.example.com",
Password: "secret",
Port: "993",
User: "user@example.com",
},
wantErr: "",
},
{
name: "missing host",
cfg: Config{
Password: "secret",
Port: "993",
User: "user@example.com",
},
wantErr: "missing host",
},
{
name: "missing password",
cfg: Config{
Host: "imap.example.com",
Port: "993",
User: "user@example.com",
},
wantErr: "missing password",
},
{
name: "missing port",
cfg: Config{
Host: "imap.example.com",
Password: "secret",
User: "user@example.com",
},
wantErr: "missing port",
},
{
name: "missing user",
cfg: Config{
Host: "imap.example.com",
Password: "secret",
Port: "993",
},
wantErr: "missing user",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if tt.wantErr == "" {
if err != nil {
t.Errorf(
"Validate() unexpected error: %v",
err,
)
}
return
}
if err == nil {
t.Errorf(
"Validate() expected error containing %q, got nil",
tt.wantErr,
)
return
}
if got := err.Error(); got != tt.wantErr {
t.Errorf(
"Validate() error = %q, want %q",
got, tt.wantErr,
)
}
})
}
}
func TestNewClient(t *testing.T) {
validCfg := Config{
Host: "imap.example.com",
Password: "secret",
Port: "993",
User: "user@example.com",
}
tests := []struct {
name string
cfg Config
filters filter.Filters
wantErr bool
}{
{
name: "valid config",
cfg: validCfg,
filters: filter.Filters{},
wantErr: false,
},
{
name: "valid config with filters",
cfg: validCfg,
filters: filter.Filters{
Subject: "test",
To: "recipient@example.com",
Body: []string{"keyword"},
},
wantErr: false,
},
{
name: "invalid config",
cfg: Config{Host: "imap.example.com"},
filters: filter.Filters{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
client, err := NewClient(
tt.cfg, tt.filters, slog.Default(),
)
if tt.wantErr {
if err == nil {
t.Error(
"NewClient() expected error, got nil",
)
}
return
}
if err != nil {
t.Errorf(
"NewClient() unexpected error: %v", err,
)
return
}
if client == nil {
t.Error("NewClient() returned nil client")
return
}
// Verify client is not connected.
if client.Client() != nil {
t.Error(
"NewClient() client should not be connected",
)
}
})
}
}
func TestClientNotConnected(t *testing.T) {
cfg := Config{
Host: "imap.example.com",
Password: "secret",
Port: "993",
User: "user@example.com",
}
client, err := NewClient(cfg, filter.Filters{}, slog.Default())
if err != nil {
t.Fatalf("NewClient() unexpected error: %v", err)
}
t.Run("Fetch without connection", func(t *testing.T) {
_, err := client.Fetch(1, true)
if err == nil {
t.Error("Fetch() expected error when not connected")
}
})
t.Run("MarkSeen without connection", func(t *testing.T) {
err := client.MarkSeen(1)
if err == nil {
t.Error("MarkSeen() expected error when not connected")
}
})
}

398
internal/intake/intake.go Normal file
View File

@@ -0,0 +1,398 @@
// Package intake monitors an IMAP mailbox for new messages and dispatches
// them to answer workers for processing. It supports two modes: IDLE for
// servers with real-time push notifications, and poll for periodic checking.
package intake
import (
"context"
"fmt"
"log/slog"
"slices"
"time"
"raven/internal/backoff"
"raven/internal/filter"
"raven/internal/imap"
"raven/internal/tracker"
goimap "github.com/emersion/go-imap/v2"
"github.com/emersion/go-imap/v2/imapclient"
)
// Config holds intake worker settings.
type Config struct {
// Mode selects IDLE or poll-based message retrieval.
Mode Mode `yaml:"mode"`
// PollInterval is the duration between checks in poll mode.
// Ignored in IDLE mode. Examples: "5s", "1m", "24h".
PollInterval string `yaml:"poll_interval"`
}
// Validate checks that the configuration values are sensible.
func (c *Config) Validate() error {
if !c.Mode.Valid() {
return fmt.Errorf("invalid mode")
}
d, err := time.ParseDuration(c.PollInterval)
if d == 0 || err != nil {
return fmt.Errorf(
"invalid poll_interval %q: %v",
c.PollInterval, err,
)
}
return nil
}
// Mode represents the message retrieval strategy.
type Mode string
// Valid returns true if the mode is a recognized value.
func (m Mode) Valid() bool {
return slices.Contains([]Mode{ModeIdle, ModePoll}, m)
}
// String returns the mode as a string.
func (m Mode) String() string {
return string(m)
}
const (
// ModeIdle uses IMAP IDLE for real-time notifications.
ModeIdle Mode = "idle"
// ModePoll uses periodic polling at a configured interval.
ModePoll Mode = "poll"
)
// Worker monitors an IMAP mailbox and dispatches message UIDs to answer
// workers. It maintains a persistent connection with automatic reconnection
// and exponential backoff on errors.
type Worker struct {
// backoff controls retry delays after connection failures.
backoff *backoff.Backoff
// cfg holds the worker configuration.
cfg Config
// ic is the IMAP client for mailbox operations.
ic *imap.Client
// interval is the parsed poll interval duration.
interval time.Duration
// log is the worker's logger with worker context.
log *slog.Logger
// tracker prevents duplicate processing across workers.
tracker *tracker.Tracker
// update signals new messages during IDLE.
update chan struct{}
// work sends fetched message UIDs to answer workers.
work chan<- goimap.UID
}
// NewWorker creates an intake Worker with the provided configuration.
// The tracker coordinates with answer workers to prevent duplicate processing.
// The work channel receives UIDs for messages that need responses.
func NewWorker(
cfg Config,
filters filter.Filters,
imapConfig imap.Config,
log *slog.Logger,
tracker *tracker.Tracker,
work chan<- goimap.UID,
) (*Worker, error) {
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("invalid config: %v", err)
}
if tracker == nil {
return nil, fmt.Errorf("missing tracker")
}
if work == nil {
return nil, fmt.Errorf("missing work channel")
}
b, err := backoff.New(backoff.Config{
Initial: time.Second,
Max: time.Minute,
})
if err != nil {
return nil, fmt.Errorf("create backoff: %v", err)
}
ic, err := imap.NewClient(imapConfig, filters, log)
if err != nil {
return nil, fmt.Errorf("create imap client: %v", err)
}
var w = &Worker{
backoff: b,
cfg: cfg,
ic: ic,
log: log.With(slog.String("worker", "intake")),
tracker: tracker,
work: work,
}
switch w.cfg.Mode {
case ModeIdle:
w.update = make(chan struct{}, 1)
case ModePoll:
d, err := time.ParseDuration(cfg.PollInterval)
if err != nil {
return nil, fmt.Errorf(
"parse poll interval: %v", err,
)
}
w.interval = d
}
return w, nil
}
// Run starts the intake worker. It connects to the IMAP server and monitors
// for new messages using either IDLE or poll mode based on configuration.
// Handles reconnection automatically with exponential backoff on errors.
// Returns nil when the context is canceled.
func (w *Worker) Run(ctx context.Context) error {
w.log.InfoContext(ctx, "running intake worker")
defer w.log.InfoContext(ctx, "intake worker terminating")
defer w.ic.Disconnect(ctx)
// For IDLE mode, set up handler to signal new messages.
var opts *imapclient.Options
if w.cfg.Mode == ModeIdle {
opts = &imapclient.Options{
UnilateralDataHandler: &imapclient.UnilateralDataHandler{
Mailbox: func(data *imapclient.UnilateralDataMailbox) {
if data.NumMessages != nil {
select {
case w.update <- struct{}{}:
default:
}
}
},
},
}
}
for {
// Context closed -- terminate.
if err := ctx.Err(); err != nil {
w.log.InfoContext(
ctx,
"context closed",
slog.Any("reason", ctx.Err()),
)
return nil
}
// Attempt to connect.
if w.ic.Client() == nil {
if err := w.ic.Connect(
ctx, w.log, opts,
); err != nil {
// Failed to connect; backoff and try again.
w.log.ErrorContext(
ctx,
"failed to connect to IMAP server",
slog.Any("error", err),
)
wait := w.backoff.Next()
w.log.InfoContext(
ctx,
"retrying after backoff",
slog.Any("wait", wait),
)
// Wait for backoff, unless context closed.
select {
case <-ctx.Done():
continue
case <-time.After(wait):
continue
}
}
// Connected.
// For IDLE mode, verify server capability.
if w.cfg.Mode == ModeIdle && !w.ic.CanIdle() {
return fmt.Errorf(
"server lacks IDLE capability",
)
}
}
// Retrieve messages with the appropriate mode.
var err error
switch w.cfg.Mode {
case ModeIdle:
err = w.idle(ctx)
case ModePoll:
err = w.poll(ctx)
default:
return fmt.Errorf("unrecognized mode: %q", w.cfg.Mode)
}
if err != nil {
// Diagnose the error.
// Context canceled: loop, log, return.
if err := ctx.Err(); err != nil {
continue
}
// Otherwise, error on intake or idle.
w.log.ErrorContext(
ctx,
"IMAP error",
slog.Any("error", err),
)
wait := w.backoff.Next()
w.log.InfoContext(
ctx,
"retrying after backoff",
slog.Any("wait", wait),
)
select {
case <-ctx.Done():
continue
case <-time.After(wait):
}
// Force reconnect on next loop.
w.ic.Disconnect(ctx)
}
}
}
// poll runs the polling loop, checking for new messages at the configured
// interval. The interval could be 5 seconds or 24 hours depending on use case.
// Attempts to reuse the established connection, but the caller handles
// reconnection if the server closes an idle connection.
func (w *Worker) poll(ctx context.Context) error {
for {
// Refresh mailbox state before searching.
if err := w.ic.Noop(); err != nil {
return fmt.Errorf("noop: %v", err)
}
if err := w.intake(ctx); err != nil {
return fmt.Errorf("intake: %v", err)
}
// Connection is healthy. Reset backoff.
w.backoff.Reset()
select {
case <-ctx.Done():
return fmt.Errorf("context closed: %v", ctx.Err())
case <-time.After(w.interval):
continue
}
}
}
// idle runs the IDLE loop, using IMAP IDLE for real-time message notifications.
// After each intake cycle, issues an IDLE command and waits for the server
// to signal new messages. Handles IDLE termination and reconnection.
func (w *Worker) idle(ctx context.Context) error {
for {
if err := w.intake(ctx); err != nil {
return fmt.Errorf("intake: %v", err)
}
// Connection is healthy. Reset backoff.
w.backoff.Reset()
// Issue the IDLE command.
w.log.InfoContext(ctx, "entering IDLE mode")
idleCmd, err := w.ic.Idle()
if err != nil {
return fmt.Errorf("idle: %v", err)
}
// Monitor the IDLE command.
idleDone := make(chan error, 1)
go func() {
idleDone <- idleCmd.Wait()
}()
// Wait for: shutdown, connection death, or new message.
select {
case <-ctx.Done():
if err := idleCmd.Close(); err != nil {
w.log.WarnContext(
ctx,
"failed to close IDLE command",
slog.Any("error", err),
)
}
return fmt.Errorf("context closed: %v", ctx.Err())
case err := <-idleDone:
// Connection died or IDLE ended unexpectedly.
if err != nil {
return fmt.Errorf("idle wait: %v", err)
}
// IDLE ended without error (server closed).
w.log.InfoContext(ctx, "IDLE ended; refreshing")
return nil
case <-w.update:
w.log.InfoContext(ctx, "IDLE: new message received")
if err := idleCmd.Close(); err != nil {
<-idleDone
return fmt.Errorf("idle close: %v", err)
}
<-idleDone
}
}
}
// intake fetches unseen messages and sends their UIDs to the work channel.
// Uses the tracker to skip messages already being processed by answer workers.
// Returns early if the work queue is full, deferring remaining messages to
// the next cycle.
func (w *Worker) intake(ctx context.Context) error {
w.log.InfoContext(ctx, "fetching unseen messages")
uids, err := w.ic.Unseen(ctx)
if err != nil {
return fmt.Errorf("retrieve unseen messages: %v", err)
}
if len(uids) == 0 {
w.log.InfoContext(ctx, "no new messages")
return nil
}
w.log.InfoContext(
ctx,
"found unseen messages",
slog.Int("count", len(uids)),
)
for _, uid := range uids {
if !w.tracker.TryAcquire(uid) {
w.log.InfoContext(
ctx,
"skipping message; already acquired",
slog.Any("uid", uid),
)
continue
}
select {
case <-ctx.Done():
w.tracker.Release(uid)
w.log.InfoContext(
ctx,
"context closed",
slog.Any("reason", ctx.Err()),
)
return nil
case w.work <- uid:
w.log.InfoContext(
ctx, "enqueued", slog.Any("uid", uid),
)
default:
// Queue full, release and defer to next cycle.
w.tracker.Release(uid)
w.log.InfoContext(
ctx, "work queue full, deferring remaining",
)
return nil
}
}
return nil
}

View File

@@ -0,0 +1,210 @@
package intake
import (
"strings"
"testing"
"raven/internal/filter"
"raven/internal/imap"
"raven/internal/tracker"
goimap "github.com/emersion/go-imap/v2"
)
func TestConfigValidate(t *testing.T) {
tests := []struct {
name string
cfg Config
wantErr bool
}{
{
name: "valid idle mode",
cfg: Config{
Mode: ModeIdle,
PollInterval: "30s",
},
wantErr: false,
},
{
name: "valid poll mode",
cfg: Config{
Mode: ModePoll,
PollInterval: "5m",
},
wantErr: false,
},
{
name: "valid poll mode with long interval",
cfg: Config{
Mode: ModePoll,
PollInterval: "24h",
},
wantErr: false,
},
{
name: "invalid mode",
cfg: Config{
Mode: "invalid",
PollInterval: "30s",
},
wantErr: true,
},
{
name: "empty mode",
cfg: Config{
Mode: "",
PollInterval: "30s",
},
wantErr: true,
},
{
name: "missing poll interval",
cfg: Config{
Mode: ModeIdle,
PollInterval: "",
},
wantErr: true,
},
{
name: "invalid poll interval",
cfg: Config{
Mode: ModePoll,
PollInterval: "invalid",
},
wantErr: true,
},
{
name: "zero poll interval",
cfg: Config{
Mode: ModePoll,
PollInterval: "0s",
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
if (err != nil) != tt.wantErr {
t.Errorf(
"Validate() error = %v, wantErr %v",
err, tt.wantErr,
)
}
})
}
}
func TestModeValid(t *testing.T) {
tests := []struct {
mode Mode
want bool
}{
{ModeIdle, true},
{ModePoll, true},
{"idle", true},
{"poll", true},
{"", false},
{"invalid", false},
{"IDLE", false},
{"POLL", false},
}
for _, tt := range tests {
t.Run(string(tt.mode), func(t *testing.T) {
if got := tt.mode.Valid(); got != tt.want {
t.Errorf(
"Mode(%q).Valid() = %v, want %v",
tt.mode, got, tt.want,
)
}
})
}
}
func TestModeString(t *testing.T) {
tests := []struct {
mode Mode
want string
}{
{ModeIdle, "idle"},
{ModePoll, "poll"},
{"custom", "custom"},
}
for _, tt := range tests {
t.Run(tt.want, func(t *testing.T) {
if got := tt.mode.String(); got != tt.want {
t.Errorf(
"Mode.String() = %v, want %v",
got, tt.want,
)
}
})
}
}
func TestNewWorkerValidation(t *testing.T) {
validConfig := Config{
Mode: ModeIdle,
PollInterval: "30s",
}
validTracker := tracker.New()
validWork := make(chan goimap.UID, 1)
tests := []struct {
name string
cfg Config
tracker *tracker.Tracker
work chan goimap.UID
wantErr string
}{
{
name: "nil tracker",
cfg: validConfig,
tracker: nil,
work: validWork,
wantErr: "missing tracker",
},
{
name: "nil work channel",
cfg: validConfig,
tracker: validTracker,
work: nil,
wantErr: "missing work channel",
},
{
name: "invalid config",
cfg: Config{
Mode: "invalid",
PollInterval: "30s",
},
tracker: validTracker,
work: validWork,
wantErr: "invalid config",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewWorker(
tt.cfg,
filter.Filters{},
imap.Config{},
nil, // log
tt.tracker,
tt.work,
)
if err == nil {
t.Fatal("NewWorker() expected error, got nil")
}
if !strings.Contains(err.Error(), tt.wantErr) {
t.Errorf(
"NewWorker() error = %q, want containing %q",
err.Error(), tt.wantErr,
)
}
})
}
}

View File

@@ -0,0 +1,43 @@
// Package tracker prevents duplicate processing of IMAP messages by
// maintaining a transient set of active, in-flight UIDs.
package tracker
import (
"sync"
"github.com/emersion/go-imap/v2"
)
// Tracker ensures unique concurrent processing of messages.
// This prevents race conditions where a message might be fetched
// again before its initial processing completes.
type Tracker struct {
mu sync.Mutex
ids map[imap.UID]struct{}
}
// New returns a ready-to-use Tracker.
func New() *Tracker {
return &Tracker{
ids: make(map[imap.UID]struct{}),
}
}
// TryAcquire attempts to claim exclusive processing rights for a UID.
// It returns true only if the UID is not currently being tracked.
func (t *Tracker) TryAcquire(uid imap.UID) bool {
t.mu.Lock()
defer t.mu.Unlock()
if _, exists := t.ids[uid]; exists {
return false
}
t.ids[uid] = struct{}{}
return true
}
// Release relinquishes processing rights for a UID, allowing it to be acquired again.
func (t *Tracker) Release(uid imap.UID) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.ids, uid)
}

View File

@@ -0,0 +1,94 @@
package tracker
import (
"sync"
"testing"
"github.com/emersion/go-imap/v2"
)
func TestNew(t *testing.T) {
tr := New()
if tr == nil {
t.Fatal("New() returned nil")
}
if tr.ids == nil {
t.Fatal("New() initialized with nil map")
}
}
func TestLifecycle(t *testing.T) {
tr := New()
uid := imap.UID(123)
// 1. First acquire should succeed.
if !tr.TryAcquire(uid) {
t.Fatalf(
"expected TryAcquire(%d) to return true, got false",
uid,
)
}
// 2. Second acquire on same UID should fail.
if tr.TryAcquire(uid) {
t.Fatalf(
"expected TryAcquire(%d) to return false when already held, got true",
uid,
)
}
// 3. Different UID should succeed.
otherUID := imap.UID(456)
if !tr.TryAcquire(otherUID) {
t.Fatalf(
"expected TryAcquire(%d) to return true, got false",
otherUID,
)
}
// 4. Release first UID.
tr.Release(uid)
// 5. First UID should be acquirable again.
if !tr.TryAcquire(uid) {
t.Fatalf(
"expected TryAcquire(%d) to return true after Release, got false",
uid,
)
}
}
func TestConcurrency(t *testing.T) {
tr := New()
uid := imap.UID(1)
routines := 16
var wg sync.WaitGroup
// Counter for successful acquisitions.
successCount := 0
var mu sync.Mutex
for range routines {
wg.Go(func() {
if tr.TryAcquire(uid) {
mu.Lock()
successCount++
mu.Unlock()
}
})
}
wg.Wait()
if successCount != 1 {
t.Errorf(
"expected exactly 1 successful acquisition for concurrent access to same UID, got %d",
successCount,
)
}
// Verify we can release safely after concurrent hammer.
tr.Release(uid)
if !tr.TryAcquire(uid) {
t.Error("failed to acquire UID after concurrent test release")
}
}