// Package service orchestrates the raven email processing pipeline. // It coordinates intake workers (IMAP monitoring), answer workers (LLM // processing and reply composition), and handles graceful shutdown. package service import ( "context" "fmt" "log/slog" "os" "os/signal" "runtime/debug" "strings" "sync" "syscall" "time" "code.chimeric.al/chimerical/raven/internal/answer" "code.chimeric.al/chimerical/raven/internal/config" "code.chimeric.al/chimerical/raven/internal/intake" "code.chimeric.al/chimerical/raven/internal/llm" "code.chimeric.al/chimerical/raven/internal/tool" "code.chimeric.al/chimerical/raven/internal/tracker" "github.com/emersion/go-imap/v2" ) // Service is the main application coordinator. // It owns the intake worker, answer workers, and shared resources like // the LLM client and message tracker. type Service struct { cfg *config.Config iw *intake.Worker llm *llm.Client log *slog.Logger rw []*answer.Worker tools *tool.Registry tracker *tracker.Tracker // work queues message UIDs for processing by answer workers. work chan imap.UID } // Run starts the service and blocks until shutdown. // It launches the intake worker to monitor IMAP for new messages and // multiple answer workers to process them concurrently. // Shutdown is triggered by SIGINT or SIGTERM. func (svc *Service) Run(ctx context.Context) error { svc.log.Info( "starting raven", slog.Int("concurrency", svc.cfg.Concurrency), slog.Group( "filters", slog.String( "body", strings.Join(svc.cfg.Filters.Body, ","), ), slog.String("from", svc.cfg.Filters.From), slog.Any( "senders", strings.Join(svc.cfg.Filters.Senders, ","), ), slog.String("subject", svc.cfg.Filters.Subject), slog.String("to", svc.cfg.Filters.To), ), slog.Group( "imap", slog.String("server", svc.cfg.IMAP.Address()), slog.String("user", svc.cfg.IMAP.User), ), slog.Group( "intake", slog.Any("mode", svc.cfg.Intake.Mode), slog.Any("poll_interval", svc.cfg.Intake.PollInterval), ), slog.Group( "llm", slog.String("url", svc.cfg.LLM.URL), slog.String("model", svc.cfg.LLM.Model), ), slog.Group( "smtp", slog.String("server", svc.cfg.SMTP.Address()), slog.String("user", svc.cfg.SMTP.User), ), slog.Any("shutdown_timeout", svc.cfg.GetShutdownTimeout()), slog.Group( "tools", slog.Int("count", len(svc.tools.List())), slog.Any( "names", strings.Join(svc.tools.List(), ","), ), ), ) if len(svc.cfg.Filters.Senders) == 0 { svc.log.Warn("ACCEPTING MAIL FROM ANY SENDER") } // Coordinate worker shutdown. var wg sync.WaitGroup // Setup signal handling for graceful shutdown. ctx, cancel := signal.NotifyContext( ctx, os.Interrupt, syscall.SIGTERM, ) defer cancel() // Start intake worker. wg.Go(func() { defer func() { if r := recover(); r != nil { svc.log.Error( "intake worker panicked", slog.Any("panic", r), slog.String( "stack", string(debug.Stack()), ), ) } close(svc.work) cancel() }() if err := svc.iw.Run(ctx); err != nil { svc.log.ErrorContext( ctx, "failed to run intake worker", slog.Any("error", err), ) } }) // Start processor workers. for _, p := range svc.rw { wg.Go(func() { p.Run(ctx) }) } // Wait for shutdown signal. <-ctx.Done() svc.log.Info("shutdown signal received") // Wait for graceful shutdown with timeout. done := make(chan struct{}) go func() { wg.Wait() close(done) }() select { case <-done: svc.log.Info("all workers stopped") svc.log.Info("terminating") case <-time.After(svc.cfg.GetShutdownTimeout()): svc.log.Warn("shutdown timeout reached; workers may still be running") svc.log.Info("terminating") } return nil } // New creates a Service from the provided configuration. // It initializes the tool registry, LLM client, intake worker, // and the configured number of answer workers. func New(cfg *config.Config, log *slog.Logger) (*Service, error) { var svc = &Service{ cfg: cfg, log: log, tracker: tracker.New(), work: make(chan imap.UID), } // Setup clients and tools. registry, err := tool.NewRegistry(cfg.Tools) if err != nil { return nil, fmt.Errorf("load tools: %v", err) } svc.tools = registry // Create the LLM client. llmClient, err := llm.NewClient(cfg.LLM, registry, log) if err != nil { return nil, fmt.Errorf("create LLM client: %v", err) } svc.llm = llmClient // Create the IMAP worker. w, err := intake.NewWorker( svc.cfg.Intake, svc.cfg.Filters, svc.cfg.IMAP, svc.log, svc.tracker, svc.work, ) if err != nil { return nil, fmt.Errorf("create intake worker: %v", err) } svc.iw = w // Create processors. for id := range svc.cfg.Concurrency { w, err := answer.NewWorker( id, svc.cfg.Filters, svc.cfg.IMAP, svc.cfg.SMTP, svc.llm, svc.tracker, svc.log, svc.work, ) if err != nil { return nil, fmt.Errorf("create answer worker: %v", err) } svc.rw = append(svc.rw, w) } return svc, nil }