Wire the intake and answer subsystems together into a running application. - config: maps YAML file to internal package configuration. - service: manages lifecycle and graceful shutdown. - cmd/raven: entry point for flag parsing and signal handling.
220 lines
4.8 KiB
Go
220 lines
4.8 KiB
Go
// Package service orchestrates the raven email processing pipeline.
|
|
// It coordinates intake workers (IMAP monitoring), answer workers (LLM
|
|
// processing and reply composition), and handles graceful shutdown.
|
|
package service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"os"
|
|
"os/signal"
|
|
"runtime/debug"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"raven/internal/answer"
|
|
"raven/internal/config"
|
|
"raven/internal/intake"
|
|
"raven/internal/llm"
|
|
"raven/internal/tool"
|
|
"raven/internal/tracker"
|
|
|
|
"github.com/emersion/go-imap/v2"
|
|
)
|
|
|
|
// Service is the main application coordinator.
|
|
// It owns the intake worker, answer workers, and shared resources like
|
|
// the LLM client and message tracker.
|
|
type Service struct {
|
|
cfg *config.Config
|
|
iw *intake.Worker
|
|
llm *llm.Client
|
|
log *slog.Logger
|
|
rw []*answer.Worker
|
|
tools *tool.Registry
|
|
tracker *tracker.Tracker
|
|
// work queues message UIDs for processing by answer workers.
|
|
work chan imap.UID
|
|
}
|
|
|
|
// Run starts the service and blocks until shutdown.
|
|
// It launches the intake worker to monitor IMAP for new messages and
|
|
// multiple answer workers to process them concurrently.
|
|
// Shutdown is triggered by SIGINT or SIGTERM.
|
|
func (svc *Service) Run(ctx context.Context) error {
|
|
svc.log.Info(
|
|
"starting raven",
|
|
slog.Int("concurrency", svc.cfg.Concurrency),
|
|
slog.Group(
|
|
"filters",
|
|
slog.String(
|
|
"body",
|
|
strings.Join(svc.cfg.Filters.Body, ","),
|
|
),
|
|
slog.String("from", svc.cfg.Filters.From),
|
|
slog.Any(
|
|
"senders",
|
|
strings.Join(svc.cfg.Filters.Senders, ","),
|
|
),
|
|
slog.String("subject", svc.cfg.Filters.Subject),
|
|
slog.String("to", svc.cfg.Filters.To),
|
|
),
|
|
slog.Group(
|
|
"imap",
|
|
slog.String("server", svc.cfg.IMAP.Address()),
|
|
slog.String("user", svc.cfg.IMAP.User),
|
|
),
|
|
slog.Group(
|
|
"intake",
|
|
slog.Any("mode", svc.cfg.Intake.Mode),
|
|
slog.Any("poll_interval", svc.cfg.Intake.PollInterval),
|
|
),
|
|
slog.Group(
|
|
"llm",
|
|
slog.String("url", svc.cfg.LLM.URL),
|
|
slog.String("model", svc.cfg.LLM.Model),
|
|
),
|
|
slog.Group(
|
|
"smtp",
|
|
slog.String("server", svc.cfg.SMTP.Address()),
|
|
slog.String("user", svc.cfg.SMTP.User),
|
|
),
|
|
slog.Any("shutdown_timeout", svc.cfg.GetShutdownTimeout()),
|
|
slog.Group(
|
|
"tools",
|
|
slog.Int("count", len(svc.tools.List())),
|
|
slog.Any(
|
|
"names",
|
|
strings.Join(svc.tools.List(), ","),
|
|
),
|
|
),
|
|
)
|
|
if len(svc.cfg.Filters.Senders) == 0 {
|
|
svc.log.Warn("ACCEPTING MAIL FROM ANY SENDER")
|
|
}
|
|
|
|
// Coordinate worker shutdown.
|
|
var wg sync.WaitGroup
|
|
|
|
// Setup signal handling for graceful shutdown.
|
|
ctx, cancel := signal.NotifyContext(
|
|
ctx,
|
|
os.Interrupt, syscall.SIGTERM,
|
|
)
|
|
defer cancel()
|
|
|
|
// Start intake worker.
|
|
wg.Go(func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
svc.log.Error(
|
|
"intake worker panicked",
|
|
slog.Any("panic", r),
|
|
slog.String(
|
|
"stack", string(debug.Stack()),
|
|
),
|
|
)
|
|
}
|
|
close(svc.work)
|
|
cancel()
|
|
}()
|
|
if err := svc.iw.Run(ctx); err != nil {
|
|
svc.log.ErrorContext(
|
|
ctx,
|
|
"failed to run intake worker",
|
|
slog.Any("error", err),
|
|
)
|
|
}
|
|
})
|
|
|
|
// Start processor workers.
|
|
for _, p := range svc.rw {
|
|
wg.Go(func() { p.Run(ctx) })
|
|
}
|
|
|
|
// Wait for shutdown signal.
|
|
<-ctx.Done()
|
|
svc.log.Info("shutdown signal received")
|
|
|
|
// Wait for graceful shutdown with timeout.
|
|
done := make(chan struct{})
|
|
go func() {
|
|
wg.Wait()
|
|
close(done)
|
|
}()
|
|
select {
|
|
case <-done:
|
|
svc.log.Info("all workers stopped")
|
|
svc.log.Info("terminating")
|
|
case <-time.After(svc.cfg.GetShutdownTimeout()):
|
|
svc.log.Warn("shutdown timeout reached; workers may still be running")
|
|
svc.log.Info("terminating")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// New creates a Service from the provided configuration.
|
|
// It initializes the tool registry, LLM client, intake worker,
|
|
// and the configured number of answer workers.
|
|
func New(cfg *config.Config, log *slog.Logger) (*Service, error) {
|
|
var svc = &Service{
|
|
cfg: cfg,
|
|
log: log,
|
|
tracker: tracker.New(),
|
|
work: make(chan imap.UID),
|
|
}
|
|
|
|
// Setup clients and tools.
|
|
registry, err := tool.NewRegistry(cfg.Tools)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("load tools: %v", err)
|
|
}
|
|
svc.tools = registry
|
|
|
|
// Create the LLM client.
|
|
llmClient, err := llm.NewClient(cfg.LLM, registry, log)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create LLM client: %v", err)
|
|
}
|
|
svc.llm = llmClient
|
|
|
|
// Create the IMAP worker.
|
|
w, err := intake.NewWorker(
|
|
svc.cfg.Intake,
|
|
svc.cfg.Filters,
|
|
svc.cfg.IMAP,
|
|
svc.log,
|
|
svc.tracker,
|
|
svc.work,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create intake worker: %v", err)
|
|
}
|
|
svc.iw = w
|
|
|
|
// Create processors.
|
|
for id := range svc.cfg.Concurrency {
|
|
w, err := answer.NewWorker(
|
|
id,
|
|
svc.cfg.Filters,
|
|
svc.cfg.IMAP,
|
|
svc.cfg.SMTP,
|
|
svc.llm,
|
|
svc.tracker,
|
|
svc.log,
|
|
svc.work,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create answer worker: %v", err)
|
|
}
|
|
|
|
svc.rw = append(svc.rw, w)
|
|
}
|
|
|
|
return svc, nil
|
|
}
|