// Package answer implements the email response pipeline. // Workers receive message UIDs, fetch and parse messages, query an LLM // for responses, and send replies via SMTP. package answer import ( "context" "fmt" "log/slog" "time" "code.chimeric.al/chimerical/raven/internal/filter" "code.chimeric.al/chimerical/raven/internal/imap" "code.chimeric.al/chimerical/raven/internal/llm" "code.chimeric.al/chimerical/raven/internal/message" "code.chimeric.al/chimerical/raven/internal/smtp" "code.chimeric.al/chimerical/raven/internal/tracker" goimap "github.com/emersion/go-imap/v2" "github.com/emersion/go-message/mail" ) const fallbackResponse = "Your message was received, but I was unable to generate a response. Please try again later or contact the administrator." // Worker processes incoming messages through the response pipeline. // Each worker maintains its own IMAP connection and processes UIDs // received from a shared work channel. The pipeline: // 1. Fetch message from IMAP // 2. Check sender against allowlist // 3. Query LLM for response // 4. Send reply via SMTP // 5. Mark original message as seen type Worker struct { // from is the sender address for outgoing replies. from *mail.Address // ic is the IMAP client for fetching messages and marking seen. ic *imap.Client // id identifies this worker in logs. id int // filters determines which senders are allowed. filters filter.Filters // llm generates response content. llm *llm.Client // log is the worker's logger with worker ID context. log *slog.Logger // smtp sends composed replies. smtp smtp.SMTP // tracker prevents duplicate processing across workers. tracker *tracker.Tracker // work receives UIDs to process. work <-chan goimap.UID } // NewWorker creates a Worker with its own IMAP connection. // The from address is parsed from smtp.From for reply composition. func NewWorker( id int, filters filter.Filters, imapConfig imap.Config, smtp smtp.SMTP, llm *llm.Client, tracker *tracker.Tracker, log *slog.Logger, work <-chan goimap.UID, ) (*Worker, error) { ic, err := imap.NewClient(imapConfig, filters, log) if err != nil { return nil, fmt.Errorf("create imap client: %v", err) } from, err := mail.ParseAddress(smtp.From) if err != nil { return nil, fmt.Errorf("parse from: %v", err) } return &Worker{ from: from, ic: ic, id: id, filters: filters, smtp: smtp, llm: llm, log: log.With(slog.String( "worker", fmt.Sprintf("answer[%d]", id), )), tracker: tracker, work: work, }, nil } // Run processes UIDs from the work channel until ctx is canceled // or the channel is closed. Each UID is processed independently; // errors are logged but do not stop the worker. func (w *Worker) Run(ctx context.Context) { defer w.log.Info("worker stopped", slog.Int("id", w.id)) for { select { case <-ctx.Done(): if err := ctx.Err(); err != nil { w.log.InfoContext( ctx, "context closed", slog.Any("error", err), ) } return case uid, ok := <-w.work: if !ok { w.log.InfoContext( ctx, "channel closed", ) return } w.respond(ctx, uid) w.tracker.Release(uid) } } } // respond handles a single message through the full pipeline. func (w *Worker) respond(ctx context.Context, uid goimap.UID) { defer w.ic.Disconnect(ctx) w.log.InfoContext(ctx, "processing", slog.Any("uid", uid)) // Connect to the IMAP server and retrieve the message. if err := w.ic.Connect(ctx, w.log, nil); err != nil { w.log.ErrorContext( ctx, "failed to connect IMAP client", slog.Any("error", err), slog.Any("uid", uid), ) return } fetchedMail, err := w.ic.Fetch(uid, true) if err != nil { w.log.ErrorContext( ctx, "failed to fetch message", slog.Any("error", err), slog.Any("uid", uid), ) return } msg, err := message.New(fetchedMail, w.log) if err != nil { w.log.ErrorContext( ctx, "skipping: failed to parse message", slog.Any("error", err), slog.Any("uid", uid), ) if err := w.ic.MarkSeen(uid); err != nil { w.log.ErrorContext( ctx, "failed to mark message as seen", slog.Any("error", err), slog.Any("uid", uid), ) } return } // Enforce allowlist. if !w.filters.MatchSender(msg.Envelope.From[0].Addr()) { w.log.InfoContext( ctx, "skipping: sender not in allowlist", slog.String("from", msg.Envelope.From[0].Addr()), slog.Any("uid", uid), ) if err := w.ic.MarkSeen(uid); err != nil { w.log.ErrorContext( ctx, "failed to mark message as seen", slog.Any("error", err), slog.Any("uid", uid), ) } return } // Disconnect during LLM query to avoid IMAP timeout. // LLM queries can take significant time depending on model and load. w.ic.Disconnect(ctx) res, err := w.llm.Query(ctx, msg) if err != nil { w.log.ErrorContext( ctx, "failed to query LLM", slog.Any("error", err), slog.Any("uid", uid), ) res = fallbackResponse } reply, err := msg.ComposeReply(time.Now(), w.from, res) if err != nil { w.log.ErrorContext( ctx, "failed to compose reply", slog.Any("error", err), slog.Any("uid", uid), ) return } to, err := reply.Recipients() if err != nil { w.log.ErrorContext( ctx, "failed to retrieve reply recipients", slog.Any("error", err), slog.Any("uid", uid), ) return } body, err := reply.Bytes() if err != nil { w.log.ErrorContext( ctx, "failed to retrieve reply body", slog.Any("error", err), slog.Any("uid", uid), ) return } if err := w.smtp.Send(to, body); err != nil { w.log.ErrorContext( ctx, "failed to reply", slog.Any("error", err), slog.Any("uid", uid), ) return } // Reconnect to flag the message as processed. if err := w.ic.Connect(ctx, w.log, nil); err != nil { w.log.ErrorContext( ctx, "failed to connect IMAP client", slog.Any("error", err), slog.Any("uid", uid), ) return } if err := w.ic.MarkSeen(uid); err != nil { w.log.ErrorContext( ctx, "failed to mark message as seen", slog.Any("error", err), slog.Any("uid", uid), ) return } w.log.InfoContext(ctx, "completed", slog.Any("uid", uid)) }