Webhooks
实现
完整示例
import express from 'express';
interface PixWebhookPayload {
event: 'CashIn' | 'CashOut' | 'CashInReversal' | 'CashOutReversal';
status: 'PENDING' | 'CONFIRMED' | 'ERROR';
transactionType: 'PIX';
movementType: 'CREDIT' | 'DEBIT';
transactionId: string;
externalId: string | null;
endToEndId: string;
pixKey: string | null;
feeAmount: number;
originalAmount: number;
finalAmount: number;
processingDate: string;
errorCode: string | null;
errorMessage: string | null;
counterpart?: Counterpart;
parentTransaction?: ParentTransaction;
metadata: Record<string, unknown>;
}
interface Counterpart {
name: string;
document: string;
bank: {
bankISPB: string | null;
bankName: string | null;
bankCode: string | null;
accountBranch: string | null;
accountNumber: string | null;
};
}
interface ParentTransaction {
transactionId: string;
externalId: string;
endToEndId: string;
processingDate: string;
wasTotalRefunded: boolean;
remainingAmountForRefund: number;
metadata: Record<string, unknown>;
counterpart: Counterpart;
}
const app = express();
app.use(express.json());
// Basic Auth authentication middleware
function validateBasicAuth(
req: express.Request,
res: express.Response,
next: express.NextFunction
) {
const authHeader = req.headers.authorization;
if (!authHeader || !authHeader.startsWith('Basic ')) {
return res.status(401).json({ error: 'Unauthorized' });
}
const base64Credentials = authHeader.split(' ')[1];
const credentials = Buffer.from(base64Credentials, 'base64').toString('ascii');
const [username, password] = credentials.split(':');
if (
username !== process.env.WEBHOOK_USER ||
password !== process.env.WEBHOOK_PASS
) {
return res.status(401).json({ error: 'Invalid credentials' });
}
next();
}
// Set for idempotency control
const processedTransactions = new Set<string>();
app.post('/webhooks/pix', validateBasicAuth, async (req, res) => {
const payload: PixWebhookPayload = req.body;
// Respond quickly (webhook requires response within 10s)
res.status(200).json({ acknowledged: true });
// Check idempotency
if (processedTransactions.has(payload.transactionId)) {
console.log(`Transaction ${payload.transactionId} already processed`);
return;
}
// Mark as processed
processedTransactions.add(payload.transactionId);
// Process asynchronously
try {
switch (payload.event) {
case 'CashIn':
await handleCashIn(payload);
break;
case 'CashOut':
await handleCashOut(payload);
break;
case 'CashInReversal':
await handleCashInReversal(payload);
break;
case 'CashOutReversal':
await handleCashOutReversal(payload);
break;
}
} catch (error) {
console.error(`Error processing ${payload.event}:`, error);
processedTransactions.delete(payload.transactionId);
}
});
async function handleCashIn(payload: PixWebhookPayload) {
console.log(`[CashIn] Received: R$ ${payload.finalAmount}`);
}
async function handleCashOut(payload: PixWebhookPayload) {
console.log(`[CashOut] Sent: R$ ${payload.originalAmount}`);
}
async function handleCashInReversal(payload: PixWebhookPayload) {
console.log(`[CashInReversal] Refunded: R$ ${payload.originalAmount}`);
}
async function handleCashOutReversal(payload: PixWebhookPayload) {
console.log(`[CashOutReversal] Returned: R$ ${payload.finalAmount}`);
}
app.listen(3000);from flask import Flask, request, jsonify
from functools import wraps
import base64
import os
from typing import Dict, Any, Optional
from dataclasses import dataclass
app = Flask(__name__)
processed_transactions: set = set()
@dataclass
class PixWebhookPayload:
event: str
status: str
transaction_id: str
external_id: Optional[str]
end_to_end_id: str
fee_amount: float
original_amount: float
final_amount: float
counterpart: Optional[Dict[str, Any]]
parent_transaction: Optional[Dict[str, Any]]
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'PixWebhookPayload':
return cls(
event=data.get('event'),
status=data.get('status'),
transaction_id=data.get('transactionId'),
external_id=data.get('externalId'),
end_to_end_id=data.get('endToEndId'),
fee_amount=data.get('feeAmount', 0),
original_amount=data.get('originalAmount', 0),
final_amount=data.get('finalAmount', 0),
counterpart=data.get('counterpart'),
parent_transaction=data.get('parentTransaction'),
)
def require_basic_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Basic '):
return jsonify({'error': 'Unauthorized'}), 401
try:
credentials = base64.b64decode(auth_header.split(' ')[1]).decode('utf-8')
username, password = credentials.split(':')
if username != os.environ.get('WEBHOOK_USER') or password != os.environ.get('WEBHOOK_PASS'):
return jsonify({'error': 'Invalid credentials'}), 401
except Exception:
return jsonify({'error': 'Invalid auth header'}), 401
return f(*args, **kwargs)
return decorated
@app.route('/webhooks/pix', methods=['POST'])
@require_basic_auth
def handle_pix_webhook():
data = request.get_json()
payload = PixWebhookPayload.from_dict(data)
if payload.transaction_id in processed_transactions:
return jsonify({'acknowledged': True}), 200
processed_transactions.add(payload.transaction_id)
if payload.event == 'CashIn':
print(f"[CashIn] R$ {payload.final_amount:.2f}")
elif payload.event == 'CashOut':
print(f"[CashOut] R$ {payload.original_amount:.2f}")
elif payload.event == 'CashInReversal':
print(f"[CashInReversal] R$ {payload.original_amount:.2f}")
elif payload.event == 'CashOutReversal':
print(f"[CashOutReversal] R$ {payload.final_amount:.2f}")
return jsonify({'acknowledged': True}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=3000)<?php
$WEBHOOK_USER = getenv('WEBHOOK_USER') ?: 'safirapay';
$WEBHOOK_PASS = getenv('WEBHOOK_PASS') ?: 'secret';
$PROCESSED_FILE = '/tmp/processed_transactions.json';
function validateBasicAuth($user, $pass): bool {
$authHeader = $_SERVER['HTTP_AUTHORIZATION'] ?? '';
if (empty($authHeader) || !str_starts_with($authHeader, 'Basic ')) {
return false;
}
$credentials = base64_decode(substr($authHeader, 6));
list($u, $p) = explode(':', $credentials, 2);
return $u === $user && $p === $pass;
}
function isProcessed($txId): bool {
global $PROCESSED_FILE;
if (!file_exists($PROCESSED_FILE)) return false;
$processed = json_decode(file_get_contents($PROCESSED_FILE), true) ?? [];
return in_array($txId, $processed);
}
function markProcessed($txId): void {
global $PROCESSED_FILE;
$processed = file_exists($PROCESSED_FILE)
? json_decode(file_get_contents($PROCESSED_FILE), true) ?? []
: [];
$processed[] = $txId;
file_put_contents($PROCESSED_FILE, json_encode(array_slice($processed, -10000)));
}
if ($_SERVER['REQUEST_METHOD'] !== 'POST') {
http_response_code(405);
exit;
}
if (!validateBasicAuth($WEBHOOK_USER, $WEBHOOK_PASS)) {
http_response_code(401);
echo json_encode(['error' => 'Unauthorized']);
exit;
}
$payload = json_decode(file_get_contents('php://input'), true);
http_response_code(200);
header('Content-Type: application/json');
echo json_encode(['acknowledged' => true]);
if (function_exists('fastcgi_finish_request')) {
fastcgi_finish_request();
}
if (isProcessed($payload['transactionId'])) {
exit;
}
markProcessed($payload['transactionId']);
switch ($payload['event']) {
case 'CashIn':
error_log("[CashIn] R$ " . $payload['finalAmount']);
break;
case 'CashOut':
error_log("[CashOut] R$ " . $payload['originalAmount']);
break;
case 'CashInReversal':
error_log("[CashInReversal] R$ " . $payload['originalAmount']);
break;
case 'CashOutReversal':
error_log("[CashOutReversal] R$ " . $payload['finalAmount']);
break;
}幂等性
Webhooks 可能会发送多次(在重试的情况下)。实现幂等性处理以避免重复处理。
使用 transactionId 字段作为唯一键:
const isProcessed = await redis.get(`webhook:${payload.transactionId}`);
if (isProcessed) {
console.log('Webhook already processed, ignoring');
return;
}
await redis.set(`webhook:${payload.transactionId}`, '1', 'EX', 86400);
await processWebhook(payload);最佳实践
重试机制
如果您的端点未在 10 秒内响应 HTTP 200:
| 尝试次数 | 间隔 | 累计时间 |
|---|---|---|
| 第 1 次 | 立即 | 0 分钟 |
| 第 2 次(第 1 次重试) | 5 分钟 | 5 分钟 |
| 第 3 次(第 2 次重试) | 5 分钟 | 10 分钟 |
| 第 4 次(第 3 次重试) | 15 分钟 | 25 分钟 |
在 4 次不成功的尝试后(总时间约 25 分钟),webhook 将被移至死信队列(DLQ)。请实现定期轮询作为后备方案,以确保不会遗漏任何交易。
响应码
您的端点应返回适当的 HTTP 状态码:
| 状态码 | 描述 | 系统行为 |
|---|---|---|
2xx | 成功 (200, 201, 204 等) | Webhook 已确认,不会重试 |
3xx | 重定向 | 视为失败,将重试 |
4xx | 客户端错误 | 视为失败,将重试 |
5xx | 服务器错误 | 视为失败,将重试 |
系统仅验证 HTTP 状态码。任何 2xx 响应(200-299)都被视为成功,无论响应体内容如何。