feat: add Resend inbound webhook endpoint and client (refs #155)
- ResendInboundClient: webhook signature verification via Svix, email fetch/download/parse with mailparser - POST /api/webhooks/resend/inbound endpoint with rawBody, signature verification, idempotency check, queue insertion, async processing - Config: resend_webhook_secret (optional) in secrets schema - Route registration in app.ts following Stripe webhook pattern Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,131 @@
|
||||
/**
|
||||
* @ai-summary Controller for Resend inbound email webhook
|
||||
* @ai-context Verifies signatures, checks idempotency, queues emails for async processing
|
||||
*/
|
||||
|
||||
import { FastifyRequest, FastifyReply } from 'fastify';
|
||||
import { ResendInboundClient } from '../external/resend-inbound.client';
|
||||
import { pool } from '../../../core/config/database';
|
||||
import { logger } from '../../../core/logging/logger';
|
||||
import type { ResendWebhookEvent } from '../domain/email-ingestion.types';
|
||||
|
||||
export class EmailIngestionController {
|
||||
private resendClient: ResendInboundClient;
|
||||
|
||||
constructor() {
|
||||
this.resendClient = new ResendInboundClient();
|
||||
}
|
||||
|
||||
async handleInboundWebhook(request: FastifyRequest, reply: FastifyReply): Promise<void> {
|
||||
try {
|
||||
const rawBody = (request as any).rawBody;
|
||||
if (!rawBody) {
|
||||
logger.error('Missing raw body in Resend webhook request');
|
||||
return reply.status(400).send({ error: 'Missing raw body' });
|
||||
}
|
||||
|
||||
// Extract Svix headers for signature verification
|
||||
const headers: Record<string, string> = {
|
||||
'svix-id': (request.headers['svix-id'] as string) || '',
|
||||
'svix-timestamp': (request.headers['svix-timestamp'] as string) || '',
|
||||
'svix-signature': (request.headers['svix-signature'] as string) || '',
|
||||
};
|
||||
|
||||
// Verify webhook signature
|
||||
let event: ResendWebhookEvent;
|
||||
try {
|
||||
event = this.resendClient.verifyWebhookSignature(rawBody, headers);
|
||||
} catch (error: any) {
|
||||
logger.warn('Invalid Resend webhook signature', { error: error.message });
|
||||
return reply.status(400).send({ error: 'Invalid signature' });
|
||||
}
|
||||
|
||||
const emailId = event.data.email_id;
|
||||
const senderEmail = event.data.from;
|
||||
|
||||
// Idempotency check: reject if email_id already exists in queue
|
||||
const existing = await pool.query(
|
||||
'SELECT id FROM email_ingestion_queue WHERE email_id = $1',
|
||||
[emailId]
|
||||
);
|
||||
|
||||
if (existing.rows.length > 0) {
|
||||
logger.info('Duplicate email webhook received, skipping', { emailId });
|
||||
return reply.status(200).send({ received: true, duplicate: true });
|
||||
}
|
||||
|
||||
// Insert queue record with status=pending
|
||||
// user_id is set to sender_email initially; async processing resolves to Auth0 sub
|
||||
await pool.query(
|
||||
`INSERT INTO email_ingestion_queue
|
||||
(email_id, sender_email, user_id, received_at, subject, status)
|
||||
VALUES ($1, $2, $3, $4, $5, 'pending')`,
|
||||
[
|
||||
emailId,
|
||||
senderEmail,
|
||||
senderEmail,
|
||||
event.data.created_at || new Date().toISOString(),
|
||||
event.data.subject,
|
||||
]
|
||||
);
|
||||
|
||||
logger.info('Inbound email queued for processing', { emailId, senderEmail });
|
||||
|
||||
// Return 200 immediately before processing begins
|
||||
reply.status(200).send({ received: true });
|
||||
|
||||
// Trigger async processing via setImmediate
|
||||
setImmediate(() => {
|
||||
this.processEmailAsync(emailId, event).catch((error) => {
|
||||
logger.error('Async email processing failed', {
|
||||
emailId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
});
|
||||
});
|
||||
});
|
||||
} catch (error: any) {
|
||||
logger.error('Resend webhook handler error', {
|
||||
error: error.message,
|
||||
stack: error.stack,
|
||||
});
|
||||
return reply.status(500).send({ error: 'Webhook processing failed' });
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Async email processing stub - full implementation in later sub-issue.
|
||||
* Will handle: sender validation, user lookup, OCR, record creation, notifications.
|
||||
*/
|
||||
private async processEmailAsync(emailId: string, event: ResendWebhookEvent): Promise<void> {
|
||||
try {
|
||||
await pool.query(
|
||||
"UPDATE email_ingestion_queue SET status = 'processing' WHERE email_id = $1",
|
||||
[emailId]
|
||||
);
|
||||
|
||||
logger.info('Async email processing started', {
|
||||
emailId,
|
||||
subject: event.data.subject,
|
||||
attachmentCount: event.data.attachments?.length || 0,
|
||||
});
|
||||
|
||||
// Full processing pipeline will be implemented in subsequent sub-issues:
|
||||
// 1. Sender validation (lookup user by email)
|
||||
// 2. Fetch and parse raw email via ResendInboundClient
|
||||
// 3. OCR attachments via existing OCR service
|
||||
// 4. Classify record type (fuel vs maintenance)
|
||||
// 5. Create record or queue for vehicle association
|
||||
// 6. Send notification emails
|
||||
} catch (error) {
|
||||
const errorMessage = error instanceof Error ? error.message : String(error);
|
||||
logger.error('Email processing pipeline error', { emailId, error: errorMessage });
|
||||
|
||||
await pool.query(
|
||||
"UPDATE email_ingestion_queue SET status = 'failed', error_message = $2 WHERE email_id = $1",
|
||||
[emailId, errorMessage]
|
||||
).catch((dbError) => {
|
||||
logger.error('Failed to update queue status to failed', { emailId, error: dbError });
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
/**
|
||||
* @ai-summary Resend inbound webhook route registration
|
||||
* @ai-context Public endpoint (no JWT auth) with rawBody for signature verification
|
||||
*/
|
||||
|
||||
import { FastifyPluginAsync } from 'fastify';
|
||||
import { EmailIngestionController } from './email-ingestion.controller';
|
||||
|
||||
export const emailIngestionWebhookRoutes: FastifyPluginAsync = async (fastify) => {
|
||||
const controller = new EmailIngestionController();
|
||||
|
||||
// POST /api/webhooks/resend/inbound - PUBLIC endpoint (no JWT auth)
|
||||
// Resend authenticates via webhook signature verification (Svix)
|
||||
// rawBody MUST be enabled for signature verification to work
|
||||
fastify.post(
|
||||
'/webhooks/resend/inbound',
|
||||
{
|
||||
config: {
|
||||
rawBody: true,
|
||||
},
|
||||
},
|
||||
controller.handleInboundWebhook.bind(controller)
|
||||
);
|
||||
};
|
||||
110
backend/src/features/email-ingestion/external/resend-inbound.client.ts
vendored
Normal file
110
backend/src/features/email-ingestion/external/resend-inbound.client.ts
vendored
Normal file
@@ -0,0 +1,110 @@
|
||||
/**
|
||||
* @ai-summary Resend inbound email client for webhook verification and email parsing
|
||||
* @ai-context Verifies Resend webhook signatures via Svix, fetches raw emails, parses with mailparser
|
||||
*/
|
||||
|
||||
import { Webhook } from 'svix';
|
||||
import { simpleParser } from 'mailparser';
|
||||
import { logger } from '../../../core/logging/logger';
|
||||
import type { ResendWebhookEvent } from '../domain/email-ingestion.types';
|
||||
|
||||
export interface ParsedEmailResult {
|
||||
text: string | null;
|
||||
html: string | null;
|
||||
attachments: ParsedEmailAttachment[];
|
||||
}
|
||||
|
||||
export interface ParsedEmailAttachment {
|
||||
filename: string;
|
||||
contentType: string;
|
||||
content: Buffer;
|
||||
size: number;
|
||||
}
|
||||
|
||||
export class ResendInboundClient {
|
||||
private webhookSecret: string | undefined;
|
||||
private apiKey: string;
|
||||
|
||||
constructor() {
|
||||
this.apiKey = process.env['RESEND_API_KEY'] || '';
|
||||
this.webhookSecret = process.env['RESEND_WEBHOOK_SECRET'];
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify Resend webhook signature using Svix
|
||||
* @throws Error if signature is invalid or secret is not configured
|
||||
*/
|
||||
verifyWebhookSignature(rawBody: string | Buffer, headers: Record<string, string>): ResendWebhookEvent {
|
||||
if (!this.webhookSecret) {
|
||||
throw new Error('RESEND_WEBHOOK_SECRET is not configured');
|
||||
}
|
||||
|
||||
const wh = new Webhook(this.webhookSecret);
|
||||
const verified = wh.verify(
|
||||
typeof rawBody === 'string' ? rawBody : rawBody.toString(),
|
||||
{
|
||||
'svix-id': headers['svix-id'] || '',
|
||||
'svix-timestamp': headers['svix-timestamp'] || '',
|
||||
'svix-signature': headers['svix-signature'] || '',
|
||||
}
|
||||
);
|
||||
|
||||
return verified as unknown as ResendWebhookEvent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch email metadata from Resend API including raw download URL
|
||||
*/
|
||||
async getEmail(emailId: string): Promise<{ downloadUrl: string }> {
|
||||
const response = await fetch(`https://api.resend.com/emails/${emailId}`, {
|
||||
headers: { 'Authorization': `Bearer ${this.apiKey}` },
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to fetch email ${emailId}: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
const data = await response.json() as { raw?: { download_url?: string } };
|
||||
const downloadUrl = data.raw?.download_url;
|
||||
|
||||
if (!downloadUrl) {
|
||||
throw new Error(`No download URL for email ${emailId}`);
|
||||
}
|
||||
|
||||
logger.info('Fetched email metadata from Resend', { emailId });
|
||||
return { downloadUrl };
|
||||
}
|
||||
|
||||
/**
|
||||
* Download raw RFC 5322 email content from Resend download URL
|
||||
*/
|
||||
async downloadRawEmail(downloadUrl: string): Promise<string> {
|
||||
const response = await fetch(downloadUrl);
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to download raw email: ${response.status} ${response.statusText}`);
|
||||
}
|
||||
|
||||
const rawEmail = await response.text();
|
||||
logger.info('Downloaded raw email', { size: rawEmail.length });
|
||||
return rawEmail;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse raw RFC 5322 email into structured text/html body and attachments
|
||||
*/
|
||||
async parseEmail(rawEmail: string): Promise<ParsedEmailResult> {
|
||||
const parsed = await simpleParser(rawEmail);
|
||||
|
||||
return {
|
||||
text: parsed.text || null,
|
||||
html: typeof parsed.html === 'string' ? parsed.html : null,
|
||||
attachments: (parsed.attachments || []).map((att) => ({
|
||||
filename: att.filename || 'unnamed',
|
||||
contentType: att.contentType || 'application/octet-stream',
|
||||
content: att.content,
|
||||
size: att.size,
|
||||
})),
|
||||
};
|
||||
}
|
||||
}
|
||||
14
backend/src/features/email-ingestion/index.ts
Normal file
14
backend/src/features/email-ingestion/index.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
/**
|
||||
* @ai-summary Email ingestion feature barrel export
|
||||
* @ai-context Exports webhook routes for Resend inbound email processing
|
||||
*/
|
||||
|
||||
export { emailIngestionWebhookRoutes } from './api/email-ingestion.routes';
|
||||
export { ResendInboundClient } from './external/resend-inbound.client';
|
||||
export type { ParsedEmailResult, ParsedEmailAttachment } from './external/resend-inbound.client';
|
||||
export type {
|
||||
EmailIngestionQueueRecord,
|
||||
EmailIngestionStatus,
|
||||
ResendWebhookEvent,
|
||||
ResendWebhookEventData,
|
||||
} from './domain/email-ingestion.types';
|
||||
Reference in New Issue
Block a user