From 0de079350087c76f6eb7e1490e8e215f8be7eb7c Mon Sep 17 00:00:00 2001 From: dwrz Date: Sun, 4 Jan 2026 21:07:26 +0000 Subject: [PATCH] Add service orchestration and command Wire the intake and answer subsystems together into a running application. - config: maps YAML file to internal package configuration. - service: manages lifecycle and graceful shutdown. - cmd/raven: entry point for flag parsing and signal handling. --- cmd/raven/main.go | 82 +++++++ internal/config/config.go | 151 +++++++++++++ internal/config/config_test.go | 375 +++++++++++++++++++++++++++++++++ internal/service/service.go | 219 +++++++++++++++++++ 4 files changed, 827 insertions(+) create mode 100644 cmd/raven/main.go create mode 100644 internal/config/config.go create mode 100644 internal/config/config_test.go create mode 100644 internal/service/service.go diff --git a/cmd/raven/main.go b/cmd/raven/main.go new file mode 100644 index 0000000..1051087 --- /dev/null +++ b/cmd/raven/main.go @@ -0,0 +1,82 @@ +// Command raven monitors an IMAP mailbox for new messages, generates replies +// using an LLM, and sends them via SMTP. See package service for the main +// orchestration logic and package config for configuration options. +package main + +import ( + "context" + "flag" + "log/slog" + "os" + "runtime/debug" + + "raven/internal/config" + "raven/internal/service" +) + +func main() { + var cfgPath = flag.String( + "c", "", "path to configuration file (required)", + ) + flag.Parse() + + var log = slog.New(slog.NewTextHandler( + os.Stderr, + &slog.HandlerOptions{ + AddSource: true, + Level: slog.LevelDebug, + }, + )) + if *cfgPath == "" { + log.Error("missing config path") + os.Exit(1) + } + + // Log build info. + bi, ok := debug.ReadBuildInfo() + if ok { + var commit, time string + for _, s := range bi.Settings { + switch s.Key { + case "vcs.revision": + commit = s.Value + case "vcs.time": + time = s.Value + } + } + log.Debug( + "raven", + slog.Group( + "build", + slog.String("commit", commit), + slog.String("time", time), + ), + ) + } + + cfg, err := config.Load(*cfgPath) + if err != nil { + log.Error( + "failed to load config", + slog.Any("error", err), + ) + os.Exit(1) + } + + svc, err := service.New(cfg, log) + if err != nil { + log.Error( + "failed to create service", + slog.Any("error", err), + ) + os.Exit(1) + } + + if err := svc.Run(context.Background()); err != nil { + log.Error( + "failed to run", + slog.Any("error", err), + ) + os.Exit(1) + } +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..d9ce414 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,151 @@ +// Package config loads and validates YAML configuration for raven. +package config + +import ( + "fmt" + "os" + "regexp" + "time" + + "raven/internal/filter" + "raven/internal/imap" + "raven/internal/intake" + "raven/internal/llm" + "raven/internal/smtp" + "raven/internal/tool" + + "gopkg.in/yaml.v3" +) + +// Config holds all application configuration sections. +type Config struct { + // Concurrency is the number of concurrent answer workers. + // Defaults to 1. + Concurrency int `yaml:"concurrency"` + // ShutdownTimeout is the maximum time to wait for graceful shutdown. + // Defaults to "30s". + ShutdownTimeout string `yaml:"shutdown_timeout"` + // shutdownTimeout is the parsed duration, set during Load. + shutdownTimeout time.Duration `yaml:"-"` + + // Filters configures server-side IMAP search criteria. + Filters filter.Filters `yaml:"filters"` + // IMAP configures the mail server connection for reading messages. + IMAP imap.Config `yaml:"imap"` + // Intake configures the message retrieval strategy (IDLE or poll). + Intake intake.Config `yaml:"intake"` + // LLM configures the language model client. + LLM llm.Config `yaml:"llm"` + // SMTP configures the mail server connection for sending responses. + // Credentials default to IMAP values if not specified. + SMTP smtp.SMTP `yaml:"smtp"` + // Tools defines external commands available to the LLM. + Tools []tool.Tool `yaml:"tools"` +} + +// ApplyDefaults sets default values for optional configuration fields. +// Called automatically by Load before validation. +func (cfg *Config) ApplyDefaults() { + if cfg.Concurrency == 0 { + cfg.Concurrency = 1 + } + if cfg.ShutdownTimeout == "" { + cfg.ShutdownTimeout = "30s" + } + + // Intake + if cfg.Intake.Mode == "" { + cfg.Intake.Mode = intake.ModePoll + } + if cfg.Intake.PollInterval == "" { + cfg.Intake.PollInterval = "30s" + } + + // SMTP defaults to IMAP credentials. + if cfg.SMTP.User == "" { + cfg.SMTP.User = cfg.IMAP.User + } + if cfg.SMTP.Password == "" { + cfg.SMTP.Password = cfg.IMAP.Password + } + if cfg.SMTP.From == "" { + cfg.SMTP.From = cfg.IMAP.User + } +} + +// GetShutdownTimeout returns the parsed shutdown timeout duration. +func (c *Config) GetShutdownTimeout() time.Duration { + return c.shutdownTimeout +} + +// Validate checks that all required fields are present and valid. +// Delegates to each subsection's Validate method. +func (c *Config) Validate() error { + if err := c.IMAP.Validate(); err != nil { + return fmt.Errorf("invalid imap config: %v", err) + } + if err := c.Intake.Validate(); err != nil { + return fmt.Errorf("invalid intake config: %v", err) + } + if err := c.LLM.Validate(); err != nil { + return fmt.Errorf("invalid llm config: %v", err) + } + if err := c.SMTP.Validate(); err != nil { + return fmt.Errorf("invalid smtp config: %v", err) + } + if _, err := time.ParseDuration(c.ShutdownTimeout); err != nil { + return fmt.Errorf( + "invalid shutdown_timeout %q: %w", + c.ShutdownTimeout, err, + ) + } + for i, t := range c.Tools { + if err := t.Validate(); err != nil { + return fmt.Errorf("tools[%d] invalid: %v", i, err) + } + } + + return nil +} + +// Load reads a YAML configuration file, expands environment variables, +// applies defaults, and validates the result. Returns an error if the +// file cannot be read, parsed, or contains invalid configuration. +func Load(path string) (*Config, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read file: %w", err) + } + + // Expand environment variables. + // Unset variables are replaced with empty strings. + re := regexp.MustCompile(`\$\{([^}]+)\}`) + expanded := re.ReplaceAllStringFunc( + string(data), + func(match string) string { + // Extract variable name from ${VAR}. + v := match[2 : len(match)-1] + return os.Getenv(v) + }, + ) + + var cfg Config + if err := yaml.Unmarshal([]byte(expanded), &cfg); err != nil { + return nil, fmt.Errorf("parse config: %w", err) + } + cfg.ApplyDefaults() + + if err := cfg.Validate(); err != nil { + return nil, fmt.Errorf("invalid config: %v", err) + } + + d, err := time.ParseDuration(cfg.ShutdownTimeout) + if err != nil { + return nil, fmt.Errorf( + "parse shutdown timeout: %v", err, + ) + } + cfg.shutdownTimeout = d + + return &cfg, nil +} diff --git a/internal/config/config_test.go b/internal/config/config_test.go new file mode 100644 index 0000000..f0c305d --- /dev/null +++ b/internal/config/config_test.go @@ -0,0 +1,375 @@ +package config + +import ( + "os" + "path/filepath" + "testing" + "time" +) + +// validConfig returns a minimal valid YAML configuration. +func validConfig() string { + return ` +imap: + host: imap.example.com + port: 993 + user: test@example.com + password: secret + mailbox: INBOX + +intake: + mode: poll + poll_interval: 30s + +llm: + provider: llama.cpp + model: qwen3-4b + url: http://localhost:8080 + +smtp: + host: smtp.example.com + port: 587 +` +} + +func writeConfig(t *testing.T, content string) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "config.yaml") + if err := os.WriteFile(path, []byte(content), 0644); err != nil { + t.Fatalf("write config file: %v", err) + } + return path +} + +func TestLoad(t *testing.T) { + tests := []struct { + name string + config string + setup func(t *testing.T) + shouldErr bool + check func(t *testing.T, cfg *Config) + }{ + { + name: "valid config with defaults", + config: validConfig(), + check: func(t *testing.T, cfg *Config) { + if cfg.Concurrency != 1 { + t.Errorf( + "Concurrency = %d, want 1", + cfg.Concurrency, + ) + } + if cfg.ShutdownTimeout != "30s" { + t.Errorf( + "ShutdownTimeout = %q, want %q", + cfg.ShutdownTimeout, "30s", + ) + } + if cfg.GetShutdownTimeout() != 30*time.Second { + t.Errorf( + "GetShutdownTimeout() = %v, want 30s", + cfg.GetShutdownTimeout(), + ) + } + if cfg.SMTP.User != cfg.IMAP.User { + t.Errorf( + "SMTP.User = %q, want %q", + cfg.SMTP.User, cfg.IMAP.User, + ) + } + if cfg.SMTP.Password != cfg.IMAP.Password { + t.Errorf( + "SMTP.Password not inherited from IMAP", + ) + } + if cfg.SMTP.From != cfg.IMAP.User { + t.Errorf( + "SMTP.From = %q, want %q", + cfg.SMTP.From, cfg.IMAP.User, + ) + } + if cfg.Intake.Mode != "poll" { + t.Errorf( + "Intake.Mode = %q, want %q", + cfg.Intake.Mode, "poll", + ) + } + if cfg.Intake.PollInterval != "30s" { + t.Errorf( + "Intake.PollInterval = %q, want %q", + cfg.Intake.PollInterval, "30s", + ) + } + }, + }, + { + name: "explicit values not overwritten", + config: ` +concurrency: 5 +shutdown_timeout: "60s" + +imap: + host: imap.example.com + port: 993 + user: test@example.com + password: secret + mailbox: INBOX + +intake: + mode: idle + +llm: + provider: llama.cpp + model: qwen3-4b + url: http://localhost:8080 + +smtp: + host: smtp.example.com + port: 587 + user: smtp-user + password: smtp-password + from: sender@example.com +`, + check: func(t *testing.T, cfg *Config) { + if cfg.Concurrency != 5 { + t.Errorf( + "Concurrency = %d, want 5", + cfg.Concurrency, + ) + } + if cfg.ShutdownTimeout != "60s" { + t.Errorf( + "ShutdownTimeout = %q, want %q", + cfg.ShutdownTimeout, "60s", + ) + } + if cfg.GetShutdownTimeout() != 60*time.Second { + t.Errorf( + "GetShutdownTimeout() = %v, want 60s", + cfg.GetShutdownTimeout(), + ) + } + if cfg.SMTP.User != "smtp-user" { + t.Errorf( + "SMTP.User = %q, want %q", + cfg.SMTP.User, "smtp-user", + ) + } + if cfg.SMTP.Password != "smtp-password" { + t.Errorf( + "SMTP.Password = %q, want %q", + cfg.SMTP.Password, + "smtp-password", + ) + } + if cfg.SMTP.From != "sender@example.com" { + t.Errorf( + "SMTP.From = %q, want %q", + cfg.SMTP.From, + "sender@example.com", + ) + } + }, + }, + { + name: "env expansion", + config: ` +imap: + host: imap.example.com + port: 993 + user: test@example.com + password: ${TEST_IMAP_PASSWORD} + mailbox: INBOX + +intake: + mode: poll + poll_interval: 30s + +llm: + provider: llama.cpp + model: qwen3-4b + url: http://localhost:8080 + +smtp: + host: smtp.example.com + port: 587 +`, + setup: func(t *testing.T) { + t.Setenv("TEST_IMAP_PASSWORD", "env-secret") + }, + check: func(t *testing.T, cfg *Config) { + if cfg.IMAP.Password != "env-secret" { + t.Errorf( + "IMAP.Password = %q, want %q", + cfg.IMAP.Password, + "env-secret", + ) + } + }, + }, + { + name: "env expansion unset var becomes empty", + config: ` +imap: + host: imap.example.com + port: 993 + user: test@example.com + password: ${TEST_UNSET_VAR} + mailbox: INBOX + +intake: + mode: poll + poll_interval: 30s + +llm: + provider: llama.cpp + model: qwen3-4b + url: http://localhost:8080 + +smtp: + host: smtp.example.com + port: 587 +`, + setup: func(t *testing.T) { + t.Setenv("TEST_UNSET_VAR", "") + }, + shouldErr: true, + }, + { + name: "invalid yaml", + config: `imap: [invalid yaml`, + shouldErr: true, + }, + { + name: "invalid shutdown_timeout", + config: ` +imap: + host: imap.example.com + port: 993 + user: test@example.com + password: secret + mailbox: INBOX + +intake: + mode: poll + poll_interval: 30s + +llm: + provider: llama.cpp + model: qwen3-4b + url: http://localhost:8080 + +smtp: + host: smtp.example.com + port: 587 + +shutdown_timeout: "not-a-duration" +`, + shouldErr: true, + }, + { + name: "config with tools", + config: ` +imap: + host: imap.example.com + port: 993 + user: test@example.com + password: secret + mailbox: INBOX + +intake: + mode: poll + poll_interval: 30s + +llm: + provider: llama.cpp + model: qwen3-4b + url: http://localhost:8080 + +smtp: + host: smtp.example.com + port: 587 + +tools: + - name: echo + description: echoes input + command: echo +`, + check: func(t *testing.T, cfg *Config) { + if len(cfg.Tools) != 1 { + t.Errorf( + "len(Tools) = %d, want 1", + len(cfg.Tools), + ) + } + if cfg.Tools[0].Name != "echo" { + t.Errorf( + "Tools[0].Name = %q, want %q", + cfg.Tools[0].Name, "echo", + ) + } + }, + }, + { + name: "intake mode defaults when empty", + config: ` +imap: + host: imap.example.com + port: 993 + user: test@example.com + password: secret + mailbox: INBOX + +llm: + provider: llama.cpp + model: qwen3-4b + url: http://localhost:8080 + +smtp: + host: smtp.example.com + port: 587 +`, + check: func(t *testing.T, cfg *Config) { + if cfg.Intake.Mode != "poll" { + t.Errorf( + "Intake.Mode = %q, want %q", + cfg.Intake.Mode, "poll", + ) + } + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.setup != nil { + tt.setup(t) + } + + path := writeConfig(t, tt.config) + cfg, err := Load(path) + + if tt.shouldErr { + if err == nil { + t.Fatal("Load() expected error") + } + return + } + + if err != nil { + t.Fatalf("Load() error = %v", err) + } + + if tt.check != nil { + tt.check(t, cfg) + } + }) + } +} + +func TestLoad_FileNotFound(t *testing.T) { + if _, err := Load("/nonexistent/path/config.yaml"); err == nil { + t.Fatal("Load() expected error for missing file") + } +} diff --git a/internal/service/service.go b/internal/service/service.go new file mode 100644 index 0000000..3d5b0e5 --- /dev/null +++ b/internal/service/service.go @@ -0,0 +1,219 @@ +// Package service orchestrates the raven email processing pipeline. +// It coordinates intake workers (IMAP monitoring), answer workers (LLM +// processing and reply composition), and handles graceful shutdown. +package service + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/signal" + "runtime/debug" + "strings" + "sync" + "syscall" + "time" + + "raven/internal/answer" + "raven/internal/config" + "raven/internal/intake" + "raven/internal/llm" + "raven/internal/tool" + "raven/internal/tracker" + + "github.com/emersion/go-imap/v2" +) + +// Service is the main application coordinator. +// It owns the intake worker, answer workers, and shared resources like +// the LLM client and message tracker. +type Service struct { + cfg *config.Config + iw *intake.Worker + llm *llm.Client + log *slog.Logger + rw []*answer.Worker + tools *tool.Registry + tracker *tracker.Tracker + // work queues message UIDs for processing by answer workers. + work chan imap.UID +} + +// Run starts the service and blocks until shutdown. +// It launches the intake worker to monitor IMAP for new messages and +// multiple answer workers to process them concurrently. +// Shutdown is triggered by SIGINT or SIGTERM. +func (svc *Service) Run(ctx context.Context) error { + svc.log.Info( + "starting raven", + slog.Int("concurrency", svc.cfg.Concurrency), + slog.Group( + "filters", + slog.String( + "body", + strings.Join(svc.cfg.Filters.Body, ","), + ), + slog.String("from", svc.cfg.Filters.From), + slog.Any( + "senders", + strings.Join(svc.cfg.Filters.Senders, ","), + ), + slog.String("subject", svc.cfg.Filters.Subject), + slog.String("to", svc.cfg.Filters.To), + ), + slog.Group( + "imap", + slog.String("server", svc.cfg.IMAP.Address()), + slog.String("user", svc.cfg.IMAP.User), + ), + slog.Group( + "intake", + slog.Any("mode", svc.cfg.Intake.Mode), + slog.Any("poll_interval", svc.cfg.Intake.PollInterval), + ), + slog.Group( + "llm", + slog.String("url", svc.cfg.LLM.URL), + slog.String("model", svc.cfg.LLM.Model), + ), + slog.Group( + "smtp", + slog.String("server", svc.cfg.SMTP.Address()), + slog.String("user", svc.cfg.SMTP.User), + ), + slog.Any("shutdown_timeout", svc.cfg.GetShutdownTimeout()), + slog.Group( + "tools", + slog.Int("count", len(svc.tools.List())), + slog.Any( + "names", + strings.Join(svc.tools.List(), ","), + ), + ), + ) + if len(svc.cfg.Filters.Senders) == 0 { + svc.log.Warn("ACCEPTING MAIL FROM ANY SENDER") + } + + // Coordinate worker shutdown. + var wg sync.WaitGroup + + // Setup signal handling for graceful shutdown. + ctx, cancel := signal.NotifyContext( + ctx, + os.Interrupt, syscall.SIGTERM, + ) + defer cancel() + + // Start intake worker. + wg.Go(func() { + defer func() { + if r := recover(); r != nil { + svc.log.Error( + "intake worker panicked", + slog.Any("panic", r), + slog.String( + "stack", string(debug.Stack()), + ), + ) + } + close(svc.work) + cancel() + }() + if err := svc.iw.Run(ctx); err != nil { + svc.log.ErrorContext( + ctx, + "failed to run intake worker", + slog.Any("error", err), + ) + } + }) + + // Start processor workers. + for _, p := range svc.rw { + wg.Go(func() { p.Run(ctx) }) + } + + // Wait for shutdown signal. + <-ctx.Done() + svc.log.Info("shutdown signal received") + + // Wait for graceful shutdown with timeout. + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + select { + case <-done: + svc.log.Info("all workers stopped") + svc.log.Info("terminating") + case <-time.After(svc.cfg.GetShutdownTimeout()): + svc.log.Warn("shutdown timeout reached; workers may still be running") + svc.log.Info("terminating") + } + + return nil +} + +// New creates a Service from the provided configuration. +// It initializes the tool registry, LLM client, intake worker, +// and the configured number of answer workers. +func New(cfg *config.Config, log *slog.Logger) (*Service, error) { + var svc = &Service{ + cfg: cfg, + log: log, + tracker: tracker.New(), + work: make(chan imap.UID), + } + + // Setup clients and tools. + registry, err := tool.NewRegistry(cfg.Tools) + if err != nil { + return nil, fmt.Errorf("load tools: %v", err) + } + svc.tools = registry + + // Create the LLM client. + llmClient, err := llm.NewClient(cfg.LLM, registry, log) + if err != nil { + return nil, fmt.Errorf("create LLM client: %v", err) + } + svc.llm = llmClient + + // Create the IMAP worker. + w, err := intake.NewWorker( + svc.cfg.Intake, + svc.cfg.Filters, + svc.cfg.IMAP, + svc.log, + svc.tracker, + svc.work, + ) + if err != nil { + return nil, fmt.Errorf("create intake worker: %v", err) + } + svc.iw = w + + // Create processors. + for id := range svc.cfg.Concurrency { + w, err := answer.NewWorker( + id, + svc.cfg.Filters, + svc.cfg.IMAP, + svc.cfg.SMTP, + svc.llm, + svc.tracker, + svc.log, + svc.work, + ) + if err != nil { + return nil, fmt.Errorf("create answer worker: %v", err) + } + + svc.rw = append(svc.rw, w) + } + + return svc, nil +}