Subsystem for message processing: parses messages, generates LLM responses, and replies with SMTP. Introduces: - answer: message processing worker. - llm: OpenAI API compatible client with support for tool execution. - message: message parsing and response logic. - tool: converts YAML configuration into executable subprocesses. - smtp: simple config and client wrapper for sending email.
264 lines
5.9 KiB
Go
264 lines
5.9 KiB
Go
// Package answer implements the email response pipeline.
|
|
// Workers receive message UIDs, fetch and parse messages, query an LLM
|
|
// for responses, and send replies via SMTP.
|
|
package answer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"raven/internal/filter"
|
|
"raven/internal/imap"
|
|
"raven/internal/llm"
|
|
"raven/internal/message"
|
|
"raven/internal/smtp"
|
|
"raven/internal/tracker"
|
|
|
|
goimap "github.com/emersion/go-imap/v2"
|
|
"github.com/emersion/go-message/mail"
|
|
)
|
|
|
|
const fallbackResponse = "Your message was received, but I was unable to generate a response. Please try again later or contact the administrator."
|
|
|
|
// Worker processes incoming messages through the response pipeline.
|
|
// Each worker maintains its own IMAP connection and processes UIDs
|
|
// received from a shared work channel. The pipeline:
|
|
// 1. Fetch message from IMAP
|
|
// 2. Check sender against allowlist
|
|
// 3. Query LLM for response
|
|
// 4. Send reply via SMTP
|
|
// 5. Mark original message as seen
|
|
type Worker struct {
|
|
// from is the sender address for outgoing replies.
|
|
from *mail.Address
|
|
// ic is the IMAP client for fetching messages and marking seen.
|
|
ic *imap.Client
|
|
// id identifies this worker in logs.
|
|
id int
|
|
// filters determines which senders are allowed.
|
|
filters filter.Filters
|
|
// llm generates response content.
|
|
llm *llm.Client
|
|
// log is the worker's logger with worker ID context.
|
|
log *slog.Logger
|
|
// smtp sends composed replies.
|
|
smtp smtp.SMTP
|
|
// tracker prevents duplicate processing across workers.
|
|
tracker *tracker.Tracker
|
|
// work receives UIDs to process.
|
|
work <-chan goimap.UID
|
|
}
|
|
|
|
// NewWorker creates a Worker with its own IMAP connection.
|
|
// The from address is parsed from smtp.From for reply composition.
|
|
func NewWorker(
|
|
id int,
|
|
filters filter.Filters,
|
|
imapConfig imap.Config,
|
|
smtp smtp.SMTP,
|
|
llm *llm.Client,
|
|
tracker *tracker.Tracker,
|
|
log *slog.Logger,
|
|
work <-chan goimap.UID,
|
|
) (*Worker, error) {
|
|
ic, err := imap.NewClient(imapConfig, filters, log)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create imap client: %v", err)
|
|
}
|
|
|
|
from, err := mail.ParseAddress(smtp.From)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parse from: %v", err)
|
|
}
|
|
|
|
return &Worker{
|
|
from: from,
|
|
ic: ic,
|
|
id: id,
|
|
filters: filters,
|
|
smtp: smtp,
|
|
llm: llm,
|
|
log: log.With(slog.String(
|
|
"worker", fmt.Sprintf("answer[%d]", id),
|
|
)),
|
|
tracker: tracker,
|
|
work: work,
|
|
}, nil
|
|
}
|
|
|
|
// Run processes UIDs from the work channel until ctx is canceled
|
|
// or the channel is closed. Each UID is processed independently;
|
|
// errors are logged but do not stop the worker.
|
|
func (w *Worker) Run(ctx context.Context) {
|
|
defer w.log.Info("worker stopped", slog.Int("id", w.id))
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
if err := ctx.Err(); err != nil {
|
|
w.log.InfoContext(
|
|
ctx,
|
|
"context closed",
|
|
slog.Any("error", err),
|
|
)
|
|
}
|
|
return
|
|
|
|
case uid, ok := <-w.work:
|
|
if !ok {
|
|
w.log.InfoContext(
|
|
ctx,
|
|
"channel closed",
|
|
)
|
|
return
|
|
}
|
|
w.respond(ctx, uid)
|
|
w.tracker.Release(uid)
|
|
}
|
|
}
|
|
}
|
|
|
|
// respond handles a single message through the full pipeline.
|
|
func (w *Worker) respond(ctx context.Context, uid goimap.UID) {
|
|
defer w.ic.Disconnect(ctx)
|
|
w.log.InfoContext(ctx, "processing", slog.Any("uid", uid))
|
|
|
|
// Connect to the IMAP server and retrieve the message.
|
|
if err := w.ic.Connect(ctx, w.log, nil); err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to connect IMAP client",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
return
|
|
}
|
|
fetchedMail, err := w.ic.Fetch(uid, true)
|
|
if err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to fetch message",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
return
|
|
}
|
|
|
|
msg, err := message.New(fetchedMail, w.log)
|
|
if err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"skipping: failed to parse message",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
if err := w.ic.MarkSeen(uid); err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to mark message as seen",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Enforce allowlist.
|
|
if !w.filters.MatchSender(msg.Envelope.From[0].Addr()) {
|
|
w.log.InfoContext(
|
|
ctx,
|
|
"skipping: sender not in allowlist",
|
|
slog.String("from", msg.Envelope.From[0].Addr()),
|
|
slog.Any("uid", uid),
|
|
)
|
|
if err := w.ic.MarkSeen(uid); err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to mark message as seen",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Disconnect during LLM query to avoid IMAP timeout.
|
|
// LLM queries can take significant time depending on model and load.
|
|
w.ic.Disconnect(ctx)
|
|
|
|
res, err := w.llm.Query(ctx, msg)
|
|
if err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to query LLM",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
res = fallbackResponse
|
|
}
|
|
|
|
reply, err := msg.ComposeReply(time.Now(), w.from, res)
|
|
if err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to compose reply",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
return
|
|
}
|
|
to, err := reply.Recipients()
|
|
if err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to retrieve reply recipients",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
return
|
|
}
|
|
body, err := reply.Bytes()
|
|
if err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to retrieve reply body",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
return
|
|
}
|
|
|
|
if err := w.smtp.Send(to, body); err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to reply",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
return
|
|
}
|
|
|
|
// Reconnect to flag the message as processed.
|
|
if err := w.ic.Connect(ctx, w.log, nil); err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to connect IMAP client",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
return
|
|
}
|
|
if err := w.ic.MarkSeen(uid); err != nil {
|
|
w.log.ErrorContext(
|
|
ctx,
|
|
"failed to mark message as seen",
|
|
slog.Any("error", err),
|
|
slog.Any("uid", uid),
|
|
)
|
|
return
|
|
}
|
|
|
|
w.log.InfoContext(ctx, "completed", slog.Any("uid", uid))
|
|
}
|