Files
raven/internal/service/service.go

220 lines
4.8 KiB
Go
Raw Normal View History

// Package service orchestrates the raven email processing pipeline.
// It coordinates intake workers (IMAP monitoring), answer workers (LLM
// processing and reply composition), and handles graceful shutdown.
package service
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
"raven/internal/answer"
"raven/internal/config"
"raven/internal/intake"
"raven/internal/llm"
"raven/internal/tool"
"raven/internal/tracker"
"github.com/emersion/go-imap/v2"
)
// Service is the main application coordinator.
// It owns the intake worker, answer workers, and shared resources like
// the LLM client and message tracker.
type Service struct {
cfg *config.Config
iw *intake.Worker
llm *llm.Client
log *slog.Logger
rw []*answer.Worker
tools *tool.Registry
tracker *tracker.Tracker
// work queues message UIDs for processing by answer workers.
work chan imap.UID
}
// Run starts the service and blocks until shutdown.
// It launches the intake worker to monitor IMAP for new messages and
// multiple answer workers to process them concurrently.
// Shutdown is triggered by SIGINT or SIGTERM.
func (svc *Service) Run(ctx context.Context) error {
svc.log.Info(
"starting raven",
slog.Int("concurrency", svc.cfg.Concurrency),
slog.Group(
"filters",
slog.String(
"body",
strings.Join(svc.cfg.Filters.Body, ","),
),
slog.String("from", svc.cfg.Filters.From),
slog.Any(
"senders",
strings.Join(svc.cfg.Filters.Senders, ","),
),
slog.String("subject", svc.cfg.Filters.Subject),
slog.String("to", svc.cfg.Filters.To),
),
slog.Group(
"imap",
slog.String("server", svc.cfg.IMAP.Address()),
slog.String("user", svc.cfg.IMAP.User),
),
slog.Group(
"intake",
slog.Any("mode", svc.cfg.Intake.Mode),
slog.Any("poll_interval", svc.cfg.Intake.PollInterval),
),
slog.Group(
"llm",
slog.String("url", svc.cfg.LLM.URL),
slog.String("model", svc.cfg.LLM.Model),
),
slog.Group(
"smtp",
slog.String("server", svc.cfg.SMTP.Address()),
slog.String("user", svc.cfg.SMTP.User),
),
slog.Any("shutdown_timeout", svc.cfg.GetShutdownTimeout()),
slog.Group(
"tools",
slog.Int("count", len(svc.tools.List())),
slog.Any(
"names",
strings.Join(svc.tools.List(), ","),
),
),
)
if len(svc.cfg.Filters.Senders) == 0 {
svc.log.Warn("ACCEPTING MAIL FROM ANY SENDER")
}
// Coordinate worker shutdown.
var wg sync.WaitGroup
// Setup signal handling for graceful shutdown.
ctx, cancel := signal.NotifyContext(
ctx,
os.Interrupt, syscall.SIGTERM,
)
defer cancel()
// Start intake worker.
wg.Go(func() {
defer func() {
if r := recover(); r != nil {
svc.log.Error(
"intake worker panicked",
slog.Any("panic", r),
slog.String(
"stack", string(debug.Stack()),
),
)
}
close(svc.work)
cancel()
}()
if err := svc.iw.Run(ctx); err != nil {
svc.log.ErrorContext(
ctx,
"failed to run intake worker",
slog.Any("error", err),
)
}
})
// Start processor workers.
for _, p := range svc.rw {
wg.Go(func() { p.Run(ctx) })
}
// Wait for shutdown signal.
<-ctx.Done()
svc.log.Info("shutdown signal received")
// Wait for graceful shutdown with timeout.
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
svc.log.Info("all workers stopped")
svc.log.Info("terminating")
case <-time.After(svc.cfg.GetShutdownTimeout()):
svc.log.Warn("shutdown timeout reached; workers may still be running")
svc.log.Info("terminating")
}
return nil
}
// New creates a Service from the provided configuration.
// It initializes the tool registry, LLM client, intake worker,
// and the configured number of answer workers.
func New(cfg *config.Config, log *slog.Logger) (*Service, error) {
var svc = &Service{
cfg: cfg,
log: log,
tracker: tracker.New(),
work: make(chan imap.UID),
}
// Setup clients and tools.
registry, err := tool.NewRegistry(cfg.Tools)
if err != nil {
return nil, fmt.Errorf("load tools: %v", err)
}
svc.tools = registry
// Create the LLM client.
llmClient, err := llm.NewClient(cfg.LLM, registry, log)
if err != nil {
return nil, fmt.Errorf("create LLM client: %v", err)
}
svc.llm = llmClient
// Create the IMAP worker.
w, err := intake.NewWorker(
svc.cfg.Intake,
svc.cfg.Filters,
svc.cfg.IMAP,
svc.log,
svc.tracker,
svc.work,
)
if err != nil {
return nil, fmt.Errorf("create intake worker: %v", err)
}
svc.iw = w
// Create processors.
for id := range svc.cfg.Concurrency {
w, err := answer.NewWorker(
id,
svc.cfg.Filters,
svc.cfg.IMAP,
svc.cfg.SMTP,
svc.llm,
svc.tracker,
svc.log,
svc.work,
)
if err != nil {
return nil, fmt.Errorf("create answer worker: %v", err)
}
svc.rw = append(svc.rw, w)
}
return svc, nil
}