Files
raven/internal/answer/answer.go

264 lines
6.1 KiB
Go
Raw Permalink Normal View History

// Package answer implements the email response pipeline.
// Workers receive message UIDs, fetch and parse messages, query an LLM
// for responses, and send replies via SMTP.
package answer
import (
"context"
"fmt"
"log/slog"
"time"
2026-02-21 19:40:35 +00:00
"code.chimeric.al/chimerical/raven/internal/filter"
"code.chimeric.al/chimerical/raven/internal/imap"
"code.chimeric.al/chimerical/raven/internal/llm"
"code.chimeric.al/chimerical/raven/internal/message"
"code.chimeric.al/chimerical/raven/internal/smtp"
"code.chimeric.al/chimerical/raven/internal/tracker"
goimap "github.com/emersion/go-imap/v2"
"github.com/emersion/go-message/mail"
)
const fallbackResponse = "Your message was received, but I was unable to generate a response. Please try again later or contact the administrator."
// Worker processes incoming messages through the response pipeline.
// Each worker maintains its own IMAP connection and processes UIDs
// received from a shared work channel. The pipeline:
// 1. Fetch message from IMAP
// 2. Check sender against allowlist
// 3. Query LLM for response
// 4. Send reply via SMTP
// 5. Mark original message as seen
type Worker struct {
// from is the sender address for outgoing replies.
from *mail.Address
// ic is the IMAP client for fetching messages and marking seen.
ic *imap.Client
// id identifies this worker in logs.
id int
// filters determines which senders are allowed.
filters filter.Filters
// llm generates response content.
llm *llm.Client
// log is the worker's logger with worker ID context.
log *slog.Logger
// smtp sends composed replies.
smtp smtp.SMTP
// tracker prevents duplicate processing across workers.
tracker *tracker.Tracker
// work receives UIDs to process.
work <-chan goimap.UID
}
// NewWorker creates a Worker with its own IMAP connection.
// The from address is parsed from smtp.From for reply composition.
func NewWorker(
id int,
filters filter.Filters,
imapConfig imap.Config,
smtp smtp.SMTP,
llm *llm.Client,
tracker *tracker.Tracker,
log *slog.Logger,
work <-chan goimap.UID,
) (*Worker, error) {
ic, err := imap.NewClient(imapConfig, filters, log)
if err != nil {
return nil, fmt.Errorf("create imap client: %v", err)
}
from, err := mail.ParseAddress(smtp.From)
if err != nil {
return nil, fmt.Errorf("parse from: %v", err)
}
return &Worker{
from: from,
ic: ic,
id: id,
filters: filters,
smtp: smtp,
llm: llm,
log: log.With(slog.String(
"worker", fmt.Sprintf("answer[%d]", id),
)),
tracker: tracker,
work: work,
}, nil
}
// Run processes UIDs from the work channel until ctx is canceled
// or the channel is closed. Each UID is processed independently;
// errors are logged but do not stop the worker.
func (w *Worker) Run(ctx context.Context) {
defer w.log.Info("worker stopped", slog.Int("id", w.id))
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
w.log.InfoContext(
ctx,
"context closed",
slog.Any("error", err),
)
}
return
case uid, ok := <-w.work:
if !ok {
w.log.InfoContext(
ctx,
"channel closed",
)
return
}
w.respond(ctx, uid)
w.tracker.Release(uid)
}
}
}
// respond handles a single message through the full pipeline.
func (w *Worker) respond(ctx context.Context, uid goimap.UID) {
defer w.ic.Disconnect(ctx)
w.log.InfoContext(ctx, "processing", slog.Any("uid", uid))
// Connect to the IMAP server and retrieve the message.
if err := w.ic.Connect(ctx, w.log, nil); err != nil {
w.log.ErrorContext(
ctx,
"failed to connect IMAP client",
slog.Any("error", err),
slog.Any("uid", uid),
)
return
}
fetchedMail, err := w.ic.Fetch(uid, true)
if err != nil {
w.log.ErrorContext(
ctx,
"failed to fetch message",
slog.Any("error", err),
slog.Any("uid", uid),
)
return
}
msg, err := message.New(fetchedMail, w.log)
if err != nil {
w.log.ErrorContext(
ctx,
"skipping: failed to parse message",
slog.Any("error", err),
slog.Any("uid", uid),
)
if err := w.ic.MarkSeen(uid); err != nil {
w.log.ErrorContext(
ctx,
"failed to mark message as seen",
slog.Any("error", err),
slog.Any("uid", uid),
)
}
return
}
// Enforce allowlist.
if !w.filters.MatchSender(msg.Envelope.From[0].Addr()) {
w.log.InfoContext(
ctx,
"skipping: sender not in allowlist",
slog.String("from", msg.Envelope.From[0].Addr()),
slog.Any("uid", uid),
)
if err := w.ic.MarkSeen(uid); err != nil {
w.log.ErrorContext(
ctx,
"failed to mark message as seen",
slog.Any("error", err),
slog.Any("uid", uid),
)
}
return
}
// Disconnect during LLM query to avoid IMAP timeout.
// LLM queries can take significant time depending on model and load.
w.ic.Disconnect(ctx)
res, err := w.llm.Query(ctx, msg)
if err != nil {
w.log.ErrorContext(
ctx,
"failed to query LLM",
slog.Any("error", err),
slog.Any("uid", uid),
)
res = fallbackResponse
}
reply, err := msg.ComposeReply(time.Now(), w.from, res)
if err != nil {
w.log.ErrorContext(
ctx,
"failed to compose reply",
slog.Any("error", err),
slog.Any("uid", uid),
)
return
}
to, err := reply.Recipients()
if err != nil {
w.log.ErrorContext(
ctx,
"failed to retrieve reply recipients",
slog.Any("error", err),
slog.Any("uid", uid),
)
return
}
body, err := reply.Bytes()
if err != nil {
w.log.ErrorContext(
ctx,
"failed to retrieve reply body",
slog.Any("error", err),
slog.Any("uid", uid),
)
return
}
if err := w.smtp.Send(to, body); err != nil {
w.log.ErrorContext(
ctx,
"failed to reply",
slog.Any("error", err),
slog.Any("uid", uid),
)
return
}
// Reconnect to flag the message as processed.
if err := w.ic.Connect(ctx, w.log, nil); err != nil {
w.log.ErrorContext(
ctx,
"failed to connect IMAP client",
slog.Any("error", err),
slog.Any("uid", uid),
)
return
}
if err := w.ic.MarkSeen(uid); err != nil {
w.log.ErrorContext(
ctx,
"failed to mark message as seen",
slog.Any("error", err),
slog.Any("uid", uid),
)
return
}
w.log.InfoContext(ctx, "completed", slog.Any("uid", uid))
}