Add answer worker
Subsystem for message processing: parses messages, generates LLM responses, and replies with SMTP. Introduces: - answer: message processing worker. - llm: OpenAI API compatible client with support for tool execution. - message: message parsing and response logic. - tool: converts YAML configuration into executable subprocesses. - smtp: simple config and client wrapper for sending email.
This commit is contained in:
263
internal/answer/answer.go
Normal file
263
internal/answer/answer.go
Normal file
@@ -0,0 +1,263 @@
|
||||
// Package answer implements the email response pipeline.
|
||||
// Workers receive message UIDs, fetch and parse messages, query an LLM
|
||||
// for responses, and send replies via SMTP.
|
||||
package answer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"raven/internal/filter"
|
||||
"raven/internal/imap"
|
||||
"raven/internal/llm"
|
||||
"raven/internal/message"
|
||||
"raven/internal/smtp"
|
||||
"raven/internal/tracker"
|
||||
|
||||
goimap "github.com/emersion/go-imap/v2"
|
||||
"github.com/emersion/go-message/mail"
|
||||
)
|
||||
|
||||
const fallbackResponse = "Your message was received, but I was unable to generate a response. Please try again later or contact the administrator."
|
||||
|
||||
// Worker processes incoming messages through the response pipeline.
|
||||
// Each worker maintains its own IMAP connection and processes UIDs
|
||||
// received from a shared work channel. The pipeline:
|
||||
// 1. Fetch message from IMAP
|
||||
// 2. Check sender against allowlist
|
||||
// 3. Query LLM for response
|
||||
// 4. Send reply via SMTP
|
||||
// 5. Mark original message as seen
|
||||
type Worker struct {
|
||||
// from is the sender address for outgoing replies.
|
||||
from *mail.Address
|
||||
// ic is the IMAP client for fetching messages and marking seen.
|
||||
ic *imap.Client
|
||||
// id identifies this worker in logs.
|
||||
id int
|
||||
// filters determines which senders are allowed.
|
||||
filters filter.Filters
|
||||
// llm generates response content.
|
||||
llm *llm.Client
|
||||
// log is the worker's logger with worker ID context.
|
||||
log *slog.Logger
|
||||
// smtp sends composed replies.
|
||||
smtp smtp.SMTP
|
||||
// tracker prevents duplicate processing across workers.
|
||||
tracker *tracker.Tracker
|
||||
// work receives UIDs to process.
|
||||
work <-chan goimap.UID
|
||||
}
|
||||
|
||||
// NewWorker creates a Worker with its own IMAP connection.
|
||||
// The from address is parsed from smtp.From for reply composition.
|
||||
func NewWorker(
|
||||
id int,
|
||||
filters filter.Filters,
|
||||
imapConfig imap.Config,
|
||||
smtp smtp.SMTP,
|
||||
llm *llm.Client,
|
||||
tracker *tracker.Tracker,
|
||||
log *slog.Logger,
|
||||
work <-chan goimap.UID,
|
||||
) (*Worker, error) {
|
||||
ic, err := imap.NewClient(imapConfig, filters, log)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create imap client: %v", err)
|
||||
}
|
||||
|
||||
from, err := mail.ParseAddress(smtp.From)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse from: %v", err)
|
||||
}
|
||||
|
||||
return &Worker{
|
||||
from: from,
|
||||
ic: ic,
|
||||
id: id,
|
||||
filters: filters,
|
||||
smtp: smtp,
|
||||
llm: llm,
|
||||
log: log.With(slog.String(
|
||||
"worker", fmt.Sprintf("answer[%d]", id),
|
||||
)),
|
||||
tracker: tracker,
|
||||
work: work,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Run processes UIDs from the work channel until ctx is canceled
|
||||
// or the channel is closed. Each UID is processed independently;
|
||||
// errors are logged but do not stop the worker.
|
||||
func (w *Worker) Run(ctx context.Context) {
|
||||
defer w.log.Info("worker stopped", slog.Int("id", w.id))
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if err := ctx.Err(); err != nil {
|
||||
w.log.InfoContext(
|
||||
ctx,
|
||||
"context closed",
|
||||
slog.Any("error", err),
|
||||
)
|
||||
}
|
||||
return
|
||||
|
||||
case uid, ok := <-w.work:
|
||||
if !ok {
|
||||
w.log.InfoContext(
|
||||
ctx,
|
||||
"channel closed",
|
||||
)
|
||||
return
|
||||
}
|
||||
w.respond(ctx, uid)
|
||||
w.tracker.Release(uid)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// respond handles a single message through the full pipeline.
|
||||
func (w *Worker) respond(ctx context.Context, uid goimap.UID) {
|
||||
defer w.ic.Disconnect(ctx)
|
||||
w.log.InfoContext(ctx, "processing", slog.Any("uid", uid))
|
||||
|
||||
// Connect to the IMAP server and retrieve the message.
|
||||
if err := w.ic.Connect(ctx, w.log, nil); err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to connect IMAP client",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
return
|
||||
}
|
||||
fetchedMail, err := w.ic.Fetch(uid, true)
|
||||
if err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to fetch message",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := message.New(fetchedMail, w.log)
|
||||
if err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"skipping: failed to parse message",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
if err := w.ic.MarkSeen(uid); err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to mark message as seen",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Enforce allowlist.
|
||||
if !w.filters.MatchSender(msg.Envelope.From[0].Addr()) {
|
||||
w.log.InfoContext(
|
||||
ctx,
|
||||
"skipping: sender not in allowlist",
|
||||
slog.String("from", msg.Envelope.From[0].Addr()),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
if err := w.ic.MarkSeen(uid); err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to mark message as seen",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Disconnect during LLM query to avoid IMAP timeout.
|
||||
// LLM queries can take significant time depending on model and load.
|
||||
w.ic.Disconnect(ctx)
|
||||
|
||||
res, err := w.llm.Query(ctx, msg)
|
||||
if err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to query LLM",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
res = fallbackResponse
|
||||
}
|
||||
|
||||
reply, err := msg.ComposeReply(time.Now(), w.from, res)
|
||||
if err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to compose reply",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
return
|
||||
}
|
||||
to, err := reply.Recipients()
|
||||
if err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to retrieve reply recipients",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
return
|
||||
}
|
||||
body, err := reply.Bytes()
|
||||
if err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to retrieve reply body",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
if err := w.smtp.Send(to, body); err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to reply",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
// Reconnect to flag the message as processed.
|
||||
if err := w.ic.Connect(ctx, w.log, nil); err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to connect IMAP client",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
return
|
||||
}
|
||||
if err := w.ic.MarkSeen(uid); err != nil {
|
||||
w.log.ErrorContext(
|
||||
ctx,
|
||||
"failed to mark message as seen",
|
||||
slog.Any("error", err),
|
||||
slog.Any("uid", uid),
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
w.log.InfoContext(ctx, "completed", slog.Any("uid", uid))
|
||||
}
|
||||
Reference in New Issue
Block a user