Skip to main content
Session webhooks notify your application in real-time as users progress through KYC onboarding. Receive instant updates when sessions are created, completed, processed, or when their status changes.

Event Types

kyc.session.created

New session created via your backend

kyc.session.completed

User finished all workflow steps

kyc.session.processed

Processing complete with decision

kyc.session.approved

Session manually approved

kyc.session.rejected

Session manually rejected

kyc.session.manual_review

Sent to manual review queue

Session Lifecycle

Flagged vs Manual Review: A flagged session is automatically approved but marked for attention. The session status is APPROVED, but final_decision is flagged. Use this to approve users while tracking risk signals for monitoring.
EventWhen It Fires
kyc.session.createdWhen you create a new session via POST /kyc/sessions
kyc.session.completedWhen the end-user completes the final step in the workflow
kyc.session.processedAfter session processing completes with automation results
kyc.session.approvedWhen a compliance officer manually approves a session
kyc.session.rejectedWhen a compliance officer manually rejects a session
kyc.session.manual_reviewWhen a session is flagged for manual review

Configuration

Enable Session Webhooks

Configure your webhook endpoint in your tenant’s notification settings:
curl -X PATCH https://kyc.legaltalent.ai/kyc/tenants/me/notification-channels \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "webhook_url": "https://your-server.com/webhooks/kyc"
  }'

Response

{
  "status": "success",
  "data": {
    "notification_config": {
      "webhook_url": "https://your-server.com/webhooks/kyc",
      "webhook_secret": "****************************a1b2",
      "notification_emails": [],
      "slack_webhook_url": null,
      "batch_notifications_enabled": true
    }
  }
}
A webhook_secret is automatically generated when you set a webhook_url. Use this secret to verify webhook signatures. The full secret is only shown once - store it securely.

Regenerate Webhook Secret

To generate a new secret (invalidating the old one):
curl -X PATCH https://kyc.legaltalent.ai/kyc/tenants/me/notification-channels \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "webhook_secret": null
  }'
Then set the webhook_url again to generate a new secret.

Webhook Delivery

Request Format

Webhooks are delivered as POST requests with the following structure: Headers:
POST /webhooks/kyc HTTP/1.1
Host: your-server.com
Content-Type: application/json
User-Agent: KYC-Webhooks/1.0
X-Webhook-ID: evt_7f8a9b2c3d4e5f6a7b8c9d0e
X-Webhook-Timestamp: 1735228800
X-Webhook-Signature: sha256=a1b2c3d4e5f6789012345678901234567890abcdef...
Body (Envelope):
{
  "id": "evt_7f8a9b2c3d4e5f6a7b8c9d0e",
  "type": "kyc.session.processed",
  "created": 1735228800,
  "data": {
    // Event-specific payload
  }
}

Headers Reference

HeaderTypeDescription
X-Webhook-IDstringUnique event identifier (evt_<uuid>). Use for idempotency.
X-Webhook-TimestampintegerUnix timestamp when the event was created
X-Webhook-SignaturestringHMAC-SHA256 signature prefixed with sha256=
User-AgentstringAlways KYC-Webhooks/1.0
Content-TypestringAlways application/json

Signature Verification

All webhooks are signed using HMAC-SHA256. Verify the signature to ensure the webhook is authentic and hasn’t been tampered with.

Signature Algorithm

message = "{timestamp}.{payload}"
signature = HMAC-SHA256(webhook_secret, message)
Where:
  • timestamp = Value from X-Webhook-Timestamp header
  • payload = Raw JSON body (the entire envelope)
  • webhook_secret = Your tenant’s webhook secret

Implementation Examples

import hmac
import hashlib
from fastapi import FastAPI, Request, HTTPException

app = FastAPI()
WEBHOOK_SECRET = "whsec_your_secret_here"

def verify_webhook_signature(payload: bytes, timestamp: str, signature: str) -> bool:
    """Verify HMAC-SHA256 webhook signature."""
    # Extract the signature value (remove 'sha256=' prefix)
    if signature.startswith('sha256='):
        signature = signature[7:]
    
    # Construct the message: timestamp.payload
    message = f"{timestamp}.{payload.decode('utf-8')}"
    
    # Calculate expected signature
    expected = hmac.new(
        key=WEBHOOK_SECRET.encode('utf-8'),
        msg=message.encode('utf-8'),
        digestmod=hashlib.sha256
    ).hexdigest()
    
    # Constant-time comparison to prevent timing attacks
    return hmac.compare_digest(expected, signature)

@app.post("/webhooks/kyc")
async def handle_webhook(request: Request):
    # Get raw body
    body = await request.body()
    
    # Get headers
    timestamp = request.headers.get('X-Webhook-Timestamp')
    signature = request.headers.get('X-Webhook-Signature')
    event_id = request.headers.get('X-Webhook-ID')
    
    if not timestamp or not signature:
        raise HTTPException(status_code=400, detail="Missing signature headers")
    
    # Verify signature
    if not verify_webhook_signature(body, timestamp, signature):
        raise HTTPException(status_code=401, detail="Invalid signature")
    
    # Parse and process the event
    payload = await request.json()
    event_type = payload.get('type')
    data = payload.get('data', {})
    
    # Handle different event types
    if event_type == 'kyc.session.processed':
        handle_session_processed(data)
    elif event_type == 'kyc.session.completed':
        handle_session_completed(data)
    
    return {"received": True, "event_id": event_id}

Replay Attack Prevention

To prevent replay attacks, verify that the timestamp is recent:
import time

MAX_TIMESTAMP_AGE = 300  # 5 minutes

def verify_timestamp(timestamp: str) -> bool:
    try:
        ts = int(timestamp)
        now = int(time.time())
        return abs(now - ts) < MAX_TIMESTAMP_AGE
    except ValueError:
        return False

Event Payloads

kyc.session.created

Sent when a new session is created via POST /kyc/sessions.
{
  "id": "evt_abc123def456",
  "type": "kyc.session.created",
  "created": 1735228800,
  "data": {
    "event_type": "kyc.session.created",
    "tenant_id": "tenant_abc",
    "session_id": "sess_123456",
    "workflow_id": "wf_789012",
    "access_link": "https://kyc.legaltalent.ai/public/sessions/abc123token",
    "expires_at": 1735833600,
    "timestamp": "2024-12-26T12:00:00Z"
  }
}
Data Fields:
FieldTypeDescription
event_typestringAlways kyc.session.created
tenant_idstringYour tenant ID
session_idstringUnique session identifier
workflow_idstringID of the workflow used
access_linkstringPublic URL for end-user to complete KYC
expires_atintegerUnix timestamp when session expires
timestampstringISO8601 timestamp of the event

kyc.session.completed

Sent when the end-user completes all workflow steps. The session is now ready for processing.
This event means the user has finished their part. Processing (document extraction, validations, automation rules) happens next - either automatically if auto_process is enabled, or when you call POST /kyc/sessions/{id}/process.
{
  "id": "evt_def456ghi789",
  "type": "kyc.session.completed",
  "created": 1735230600,
  "data": {
    "event_type": "kyc.session.completed",
    "tenant_id": "tenant_abc",
    "session_id": "sess_123456",
    "workflow_id": "wf_789012",
    "current_step_index": 4,
    "total_steps": 5,
    "completed_at": "2024-12-26T12:30:00Z",
    "timestamp": "2024-12-26T12:30:00Z",
    "form_data": {
      "full_name": {
        "field_id": "field_abc123",
        "name": "Full Name",
        "type": "name",
        "value": "John Doe"
      },
      "date_of_birth": {
        "field_id": "field_xyz789",
        "name": "Date of Birth",
        "type": "date",
        "value": "1990-01-15"
      },
      "is_pep": {
        "field_id": "field_pep456",
        "name": "Are you a PEP?",
        "type": "boolean",
        "value": false
      }
    }
  }
}
Data Fields:
FieldTypeDescription
event_typestringAlways kyc.session.completed
tenant_idstringYour tenant ID
session_idstringSession identifier
workflow_idstringWorkflow ID used
current_step_indexintegerIndex of the last completed step (0-based)
total_stepsintegerTotal number of steps in workflow
completed_atstringISO8601 timestamp when user finished
timestampstringISO8601 timestamp of the event
form_dataobjectForm fields filled by the user (see below)

Form Data Object

The form_data object contains all form fields completed by the user, organized by a portable key. This makes it easy to access specific fields programmatically without relying on auto-generated field IDs. Structure:
{
  "<field_key>": {
    "field_id": "field_abc123",
    "name": "Human-readable label",
    "type": "field_type",
    "value": "user_entered_value"
  }
}
PropertyTypeDescription
Key (e.g., full_name)stringThe field_key defined in the workflow, or field_id if no key was set
field_idstringAuto-generated field identifier
namestringHuman-readable field label
typestringField type: text, date, boolean, select, name, email, country, etc.
valueanyThe value entered by the user
Portable Field Keys: Define a field_key (e.g., full_name, date_of_birth) in your workflow to get consistent keys across environments. This eliminates the need to map auto-generated field_id values between staging and production.

kyc.session.processed

Sent after session processing completes. This is the most important webhook for automation - it contains the final decision and all automation rule evaluations. The final_decision can be one of:
  • approved - Session automatically approved, no issues found
  • flagged - Session approved but marked for attention (e.g., high-risk country, unusual patterns)
  • rejected - Session automatically denied (e.g., sanctions match, fraud detected)
  • manual_review - Requires human review before final decision
{
  "id": "evt_ghi789jkl012",
  "type": "kyc.session.processed",
  "created": 1735231200,
  "data": {
    "event_type": "kyc.session.processed",
    "tenant_id": "tenant_abc",
    "session_id": "sess_123456",
    "workflow_id": "wf_789012",
    "status": "manual_review",
    "final_decision": "manual_review",
    "automation_result": {
      "enabled": true,
      "final_action": "manual_review",
      "triggered_rules": [
        {
          "rule_type": "list_match",
          "rule_details": {
            "list_type": "ofac",
            "match_level": "high_confidence"
          },
          "action": "manual_review",
          "severity": "warning",
          "message": "Potential OFAC match found (85% confidence)"
        },
        {
          "rule_type": "country",
          "rule_details": {
            "field": "nationality",
            "countries": ["IR", "KP", "SY"]
          },
          "action": "flag",
          "severity": "info",
          "message": "Nationality requires additional review"
        }
      ],
      "risk_score": 45.5,
      "flags": ["potential_sanctions_match", "high_risk_nationality"],
      "recommendation": "Manual review required due to potential sanctions match",
      "face_dedup_result": {
        "found_matches": false,
        "previous_sessions": 0,
        "approved_sessions": 0,
        "rejected_sessions": 0,
        "pending_sessions": 0,
        "matched_session_ids": [],
        "similarity_scores": []
      },
      "email_verification_result": {
        "email": "user@example.com",
        "is_valid": true,
        "is_disposable": false,
        "is_role_based": false,
        "is_deliverable": true
      },
      "evaluated_at": "2024-12-26T12:35:00Z",
      "evaluation_time_ms": 1250
    },
    "form_data": {
      "full_name": {
        "field_id": "field_abc123",
        "name": "Full Name",
        "type": "name",
        "value": "John Doe"
      },
      "nationality": {
        "field_id": "field_nat456",
        "name": "Nationality",
        "type": "country",
        "value": "IR"
      }
    },
    "timestamp": "2024-12-26T12:35:00Z"
  }
}
Data Fields:
FieldTypeDescription
event_typestringAlways kyc.session.processed
tenant_idstringYour tenant ID
session_idstringSession identifier
workflow_idstringWorkflow ID used
statusstringFinal session status: approved, rejected, manual_review
final_decisionstringDecision value: approved, flagged, rejected, manual_review
automation_resultobjectFull automation evaluation result (see below)
form_dataobjectForm fields filled by the user (same structure as kyc.session.completed)
timestampstringISO8601 timestamp of the event
Understanding status vs final_decision:
  • status is the session state in the system (approved, rejected, manual_review)
  • final_decision is what the automation decided (approved, flagged, rejected, manual_review)
A flagged decision results in status: approved because the session is approved, but the final_decision: flagged tells you it needs monitoring.

Automation Result Object

FieldTypeDescription
enabledbooleanWhether automation rules were evaluated
final_actionstringHighest-priority action: auto_deny, manual_review, flag, auto_approve, no_action
triggered_rulesarrayList of rules that fired (see below)
risk_scorenumberCalculated risk score (0-100)
flagsarrayString flags for UI display
recommendationstringHuman-readable recommendation
face_dedup_resultobjectCross-session face deduplication results
email_verification_resultobjectEmail validation results
evaluated_atstringISO8601 timestamp of evaluation
evaluation_time_msintegerProcessing time in milliseconds

Triggered Rule Object

FieldTypeDescription
rule_typestringType of rule (see table below)
rule_detailsobjectConfiguration details of the rule
actionstringAction triggered: auto_deny, manual_review, flag, auto_approve, no_action
severitystringSeverity level: critical, warning, info
messagestringHuman-readable explanation

Rule Types

Rule TypeDescription
countryNationality, residence, or incorporation country checks
list_matchSanctions/watchlist matching (OFAC, UN, EU, UK, PEP, etc.)
face_dedupCross-session face deduplication detection
time_to_completeSession completion time thresholds
documentDocument age, expiry validation
volumeDeclared transaction volume thresholds
emailEmail validity, disposable, role-based checks
web_validationWebsite reliability, SSL, industry, adverse media
face_matchID photo vs selfie face matching confidence

Action Priority

Actions are evaluated in priority order. The highest-priority triggered action becomes the final_action:
PriorityActionfinal_decisionstatusResult
5 (Highest)auto_denyrejectedrejectedSession rejected automatically
4manual_reviewmanual_reviewmanual_reviewSent to review queue
3flagflaggedapprovedApproved but marked for monitoring
2auto_approveapprovedapprovedSession approved automatically
1 (Lowest)no_actionmanual_reviewmanual_reviewDefault (safety fallback)

Triggered Rules Reference

Each triggered_rule object in the automation_result.triggered_rules array contains a message field with a human-readable explanation. Use this to communicate rejection reasons to your users.
The message field is designed to be user-friendly and can be displayed directly to your customers. The rule_details object contains machine-readable data for programmatic handling.

Sanctions & Watchlist Match (list_match)

Triggered when a name matches sanctions or watchlist databases.
{
  "rule_type": "list_match",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Found 2 match(es) in sanction lists",
  "rule_details": {
    "list_type": "ofac",
    "match_level": "high_confidence",
    "matching_entries": 2,
    "lists_matched": ["OFAC SDN", "UN Consolidated"]
  }
}
Detail FieldTypeDescription
list_typestringRule filter: ofac, un, eu, uk, pep, or any
match_levelstringConfidence filter: exact (≥95%), high_confidence (≥80%), or any
matching_entriesintegerNumber of matches found
lists_matchedarrayNames of lists where matches were found

Country Restriction (country)

Triggered when nationality, residence, or incorporation country matches a blocked list.
{
  "rule_type": "country",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Country KP detected in nationality. Reason: Sanctioned country",
  "rule_details": {
    "country_code": "KP",
    "applies_to": ["all"],
    "matched_fields": ["nationality"]
  }
}
Detail FieldTypeDescription
country_codestringISO 3166-1 alpha-2 country code
applies_toarrayWhich fields to check: nationality, residence, incorporation, tax_residence, birth_country, business_operation, or all
matched_fieldsarrayWhich fields actually matched

Face Match Failed (face_match)

Triggered when the selfie doesn’t match the ID photo.
{
  "rule_type": "face_match",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Face match failed (confidence: 45.00%, threshold: 80.00%)",
  "rule_details": {
    "confidence": 0.45,
    "threshold": 0.8,
    "is_match": false
  }
}
Detail FieldTypeDescription
confidencenumberMatch confidence (0-1)
thresholdnumberRequired threshold (0-1)
is_matchbooleanWhether face matched

Face Deduplication (face_dedup)

Triggered when the same face is detected in previous sessions.
{
  "rule_type": "face_dedup",
  "action": "manual_review",
  "severity": "warning",
  "message": "Face detected in 3 previous session(s) (2 approved, 1 rejected)",
  "rule_details": {
    "min_sessions_required": 1,
    "sessions_found": 3,
    "approved": 2,
    "rejected": 1,
    "pending": 0,
    "matched_session_ids": ["sess_abc", "sess_def", "sess_ghi"]
  }
}
Detail FieldTypeDescription
sessions_foundintegerTotal previous sessions with this face
approvedintegerPreviously approved sessions
rejectedintegerPreviously rejected sessions
pendingintegerPending sessions
matched_session_idsarrayUp to 5 session IDs (for investigation)

Document Expired or Near Expiry (document)

Triggered when documents are expired or about to expire. Expired document:
{
  "rule_type": "document",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Document id_document expired 45 days ago",
  "rule_details": {
    "document_type": "id_document",
    "doc_id": "doc_abc123",
    "check_type": "expired",
    "expiry_date": "2024-11-01",
    "days_expired": 45
  }
}
Document too old (e.g., proof of address):
{
  "rule_type": "document",
  "action": "manual_review",
  "severity": "info",
  "message": "Document proof_of_address issued 120 days ago (max: 90)",
  "rule_details": {
    "document_type": "proof_of_address",
    "doc_id": "doc_def456",
    "check_type": "too_old",
    "issue_date": "2024-09-01",
    "days_since_issue": 120,
    "max_allowed": 90
  }
}
Detail FieldTypeDescription
document_typestringType: id_document, proof_of_address, bank_letter, etc.
check_typestringexpired, near_expiry, too_old, or too_new
expiry_datestringISO date of expiration (for expiry checks)
issue_datestringISO date of issue (for age checks)
days_expiredintegerDays since expiration
days_since_issueintegerDays since document was issued

Invalid Document Type (document_validity)

Triggered when uploaded document doesn’t match expected type.
{
  "rule_type": "document_validity",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Invalid document: expected id_document, got receipt. Document type validation failed",
  "rule_details": {
    "doc_id": "doc_abc123",
    "expected_type": "id_document",
    "detected_type": "receipt",
    "validation_reason": "Document type validation failed"
  }
}
Detail FieldTypeDescription
expected_typestringWhat was expected
detected_typestringWhat was detected
validation_reasonstringExplanation of why validation failed

Email Verification (email)

Triggered when email validation detects issues.
{
  "rule_type": "email",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Email test@tempmail.com is from a disposable/temporary domain",
  "rule_details": {
    "email": "test@tempmail.com",
    "check_type": "disposable"
  }
}
Detail FieldTypeDescription
emailstringThe email address
check_typestringinvalid, disposable, role_based, or unverifiable
Possible check_type values:
ValueDescription
invalidEmail doesn’t exist or is malformed
disposableFrom temporary email service (e.g., tempmail, guerrillamail)
role_basedGeneric address (info@, admin@, support@)
unverifiableValid format but catch-all domain (can’t verify existence)

Transaction Volume (volume)

Triggered when declared transaction volume exceeds thresholds.
{
  "rule_type": "volume",
  "action": "manual_review",
  "severity": "warning",
  "message": "Transaction volume: volume $150,000.00/month",
  "rule_details": {
    "monthly_volume": 150000,
    "monthly_transactions": 500,
    "rule_min_volume": 100000,
    "rule_max_volume": null,
    "rule_min_transactions": null,
    "rule_max_transactions": null
  }
}
Detail FieldTypeDescription
monthly_volumenumberDeclared monthly volume
monthly_transactionsintegerDeclared monthly transaction count
rule_min_volumenumberRule’s minimum threshold
rule_max_volumenumberRule’s maximum threshold

Time to Complete (time_to_complete)

Triggered when session completion time is suspicious. Too fast (possible bot):
{
  "rule_type": "time_to_complete",
  "action": "manual_review",
  "severity": "warning",
  "message": "Session completed in 15s (threshold: 60s)",
  "rule_details": {
    "completion_seconds": 15,
    "min_seconds": null,
    "max_seconds": 60
  }
}
Detail FieldTypeDescription
completion_secondsintegerActual time to complete (seconds)
min_secondsintegerMaximum allowed time (slow detection)
max_secondsintegerMinimum required time (bot detection)

Form Field Declaration (form_field)

Triggered when user declares high-risk information in forms.
{
  "rule_type": "form_field",
  "action": "manual_review",
  "severity": "warning",
  "message": "User declared PEP status (field 'pep_status' = 'Former PEP')",
  "rule_details": {
    "field_id": "pep_status",
    "field_value": "Former PEP",
    "operator": "in",
    "rule_value": ["PEP", "Former PEP", "Related to PEP"],
    "reason": "User declared PEP status"
  }
}
Detail FieldTypeDescription
field_idstringForm field identifier
field_valueanyValue submitted by user
operatorstringComparison: equals, in, is_true, gt, etc.
rule_valueanyValue(s) the rule checks against
reasonstringHuman-readable reason

Web Validation Rules

Multiple rule types for website validation:

SSL Certificate (web_ssl)

{
  "rule_type": "web_ssl",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Website example.com has no SSL certificate",
  "rule_details": { "url": "example.com", "has_ssl": false }
}

Blocked Industry (web_industry)

{
  "rule_type": "web_industry",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Website example.com industry 'gambling' is blocked",
  "rule_details": { "url": "example.com", "industry": "gambling", "allowed": false }
}

Adverse Media (web_adverse_media)

{
  "rule_type": "web_adverse_media",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Website example.com has HIGH_RISK adverse media (score: 85)",
  "rule_details": { "url": "example.com", "decision": "HIGH_RISK", "risk_score": 85 }
}

Domain Sanctioned (web_sanction)

{
  "rule_type": "web_sanction",
  "action": "auto_deny",
  "severity": "critical",
  "message": "Website example.com found on sanction lists",
  "rule_details": { "url": "example.com", "matches": ["OFAC"] }
}

Extraction Validation (extraction_validation)

Triggered when extracted document data fails validation. Sanctions match on extracted name:
{
  "rule_type": "extraction_validation",
  "action": "manual_review",
  "severity": "critical",
  "message": "Extracted full_name 'John Doe' matched in sanctions lists",
  "rule_details": {
    "validation_type": "sanctions_lists",
    "extracted_field": "full_name",
    "extracted_value": "John Doe",
    "doc_id": "doc_abc",
    "doc_type": "id_document"
  }
}
Form data mismatch:
{
  "rule_type": "extraction_validation",
  "action": "flag",
  "severity": "warning",
  "message": "Extracted full_name 'JOHN D. DOE' differs from form value 'John Doe' (similarity: 85%)",
  "rule_details": {
    "validation_type": "cross_validate_form",
    "extracted_field": "full_name",
    "extracted_value": "JOHN D. DOE",
    "form_field": "full_name",
    "form_value": "John Doe",
    "similarity": 0.85,
    "threshold": 0.8
  }
}
Detail FieldTypeDescription
validation_typestringsanctions_lists, adverse_media, cross_validate_form, or pep_check
extracted_fieldstringField name from document
extracted_valuestringValue extracted from document
form_fieldstringCorresponding form field (for cross-validation)
form_valuestringValue from form (for cross-validation)
similaritynumberMatch similarity (0-1)

Using Triggered Rules in Your Application

Displaying Rejection Reasons

function getCustomerFacingMessage(triggeredRules) {
  // Get the most critical rule
  const criticalRule = triggeredRules.find(r => r.severity === 'critical');
  
  if (criticalRule) {
    // Map rule types to user-friendly messages
    const messages = {
      'list_match': 'Your information could not be verified against our security databases.',
      'country': 'Unfortunately, we cannot accept applications from your country at this time.',
      'face_match': 'The photo on your ID could not be matched with your selfie. Please try again.',
      'document': 'Your document appears to be expired. Please upload a valid document.',
      'document_validity': 'The uploaded document type could not be verified. Please upload a valid ID.',
      'email': 'The email address provided could not be verified.',
    };
    
    return messages[criticalRule.rule_type] || criticalRule.message;
  }
  
  return 'Your application requires additional review.';
}

Logging for Compliance

def log_session_decision(webhook_data):
    automation = webhook_data['data'].get('automation_result', {})
    
    for rule in automation.get('triggered_rules', []):
        logger.info({
            'event': 'automation_rule_triggered',
            'session_id': webhook_data['data']['session_id'],
            'rule_type': rule['rule_type'],
            'action': rule['action'],
            'severity': rule['severity'],
            'message': rule['message'],
            'details': rule['rule_details']
        })

kyc.session.approved

Sent when a compliance officer manually approves a session.
{
  "id": "evt_jkl012mno345",
  "type": "kyc.session.approved",
  "created": 1735238400,
  "data": {
    "event_type": "kyc.session.approved",
    "tenant_id": "tenant_abc",
    "session_id": "sess_123456",
    "status": "approved",
    "notes": "Verified via phone call with customer. Documentation confirmed.",
    "updated_by": "compliance@yourcompany.com",
    "timestamp": "2024-12-26T14:00:00Z"
  }
}
Data Fields:
FieldTypeDescription
event_typestringAlways kyc.session.approved
tenant_idstringYour tenant ID
session_idstringSession identifier
statusstringAlways approved
notesstringOptional notes from the reviewer
updated_bystringEmail/ID of the compliance officer
timestampstringISO8601 timestamp

kyc.session.rejected

Sent when a compliance officer manually rejects a session.
{
  "id": "evt_mno345pqr678",
  "type": "kyc.session.rejected",
  "created": 1735238400,
  "data": {
    "event_type": "kyc.session.rejected",
    "tenant_id": "tenant_abc",
    "session_id": "sess_123456",
    "status": "rejected",
    "notes": "Document appears to be fraudulent. Inconsistent information provided.",
    "updated_by": "compliance@yourcompany.com",
    "timestamp": "2024-12-26T14:00:00Z"
  }
}
Data Fields:
FieldTypeDescription
event_typestringAlways kyc.session.rejected
tenant_idstringYour tenant ID
session_idstringSession identifier
statusstringAlways rejected
notesstringOptional rejection reason
updated_bystringEmail/ID of the compliance officer
timestampstringISO8601 timestamp

kyc.session.manual_review

Sent when a session is flagged for manual review (either automatically or manually).
{
  "id": "evt_pqr678stu901",
  "type": "kyc.session.manual_review",
  "created": 1735238400,
  "data": {
    "event_type": "kyc.session.manual_review",
    "tenant_id": "tenant_abc",
    "session_id": "sess_123456",
    "status": "manual_review",
    "notes": "Additional documentation required",
    "updated_by": "system",
    "timestamp": "2024-12-26T14:00:00Z"
  }
}

Handling Webhooks

Example: Full Webhook Handler

from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
import hmac
import hashlib
import time
import json
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

WEBHOOK_SECRET = "whsec_your_secret"
MAX_TIMESTAMP_AGE = 300  # 5 minutes

async def process_session_created(data: dict):
    """Handle new session - maybe send link to user."""
    session_id = data['session_id']
    access_link = data['access_link']
    logger.info(f"New session created: {session_id}")
    # Send email to user with access_link
    
async def process_session_completed(data: dict):
    """Handle completed session - user finished their part."""
    session_id = data['session_id']
    logger.info(f"Session completed by user: {session_id}")
    # Notify internal team that documents are ready

async def process_session_processed(data: dict):
    """Handle processed session - main decision webhook."""
    session_id = data['session_id']
    decision = data['final_decision']
    automation = data.get('automation_result', {})
    risk_score = automation.get('risk_score', 0)
    flags = automation.get('flags', [])
    
    logger.info(f"Session {session_id} processed: {decision} (risk: {risk_score})")
    
    if decision == 'approved':
        # Activate user account immediately
        await activate_user(session_id)
    elif decision == 'flagged':
        # Activate user but add to monitoring queue
        await activate_user(session_id)
        await add_to_monitoring_queue(session_id, flags, risk_score)
    elif decision == 'rejected':
        # Send rejection notification
        await notify_rejection(session_id, automation.get('recommendation'))
    else:  # manual_review
        # Queue for compliance team
        await queue_for_review(session_id, automation)

async def process_status_update(data: dict):
    """Handle manual status updates."""
    session_id = data['session_id']
    status = data['status']
    notes = data.get('notes', '')
    
    logger.info(f"Session {session_id} status changed to: {status}")
    
    if status == 'approved':
        await activate_user(session_id)
    elif status == 'rejected':
        await notify_rejection(session_id, notes)

EVENT_HANDLERS = {
    'kyc.session.created': process_session_created,
    'kyc.session.completed': process_session_completed,
    'kyc.session.processed': process_session_processed,
    'kyc.session.approved': process_status_update,
    'kyc.session.rejected': process_status_update,
    'kyc.session.manual_review': process_status_update,
}

@app.post("/webhooks/kyc")
async def handle_kyc_webhook(request: Request, background_tasks: BackgroundTasks):
    # Get raw body for signature verification
    body = await request.body()
    
    # Get headers
    timestamp = request.headers.get('X-Webhook-Timestamp')
    signature = request.headers.get('X-Webhook-Signature')
    event_id = request.headers.get('X-Webhook-ID')
    
    # Validate headers
    if not timestamp or not signature:
        raise HTTPException(400, "Missing signature headers")
    
    # Verify timestamp freshness
    try:
        ts = int(timestamp)
        if abs(time.time() - ts) > MAX_TIMESTAMP_AGE:
            raise HTTPException(401, "Timestamp too old")
    except ValueError:
        raise HTTPException(400, "Invalid timestamp")
    
    # Verify signature
    sig = signature.replace('sha256=', '')
    message = f"{timestamp}.{body.decode('utf-8')}"
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        message.encode(),
        hashlib.sha256
    ).hexdigest()
    
    if not hmac.compare_digest(expected, sig):
        raise HTTPException(401, "Invalid signature")
    
    # Parse payload
    payload = json.loads(body)
    event_type = payload.get('type')
    data = payload.get('data', {})
    
    # Check idempotency (optional but recommended)
    if await is_already_processed(event_id):
        return {"received": True, "status": "already_processed"}
    
    # Find and queue handler
    handler = EVENT_HANDLERS.get(event_type)
    if handler:
        background_tasks.add_task(handler, data)
        await mark_as_processed(event_id)
    else:
        logger.warning(f"Unknown event type: {event_type}")
    
    return {"received": True, "event_id": event_id}

Testing Webhooks

Using the Staging Environment

Always test webhooks in staging first:
  1. Configure your staging webhook URL
  2. Create test sessions
  3. Complete the flow to trigger events
  4. Verify your handler processes them correctly

Local Development with ngrok

# Start your local server
python -m uvicorn main:app --port 8000

# In another terminal, expose it
ngrok http 8000

# Use the ngrok URL as your webhook_url
# https://abc123.ngrok.io/webhooks/kyc

Webhook Payload Generator

For testing your signature verification, generate test payloads:
import hmac
import hashlib
import time
import json

def generate_test_webhook(secret: str, event_type: str, data: dict) -> dict:
    """Generate a test webhook payload with valid signature."""
    timestamp = int(time.time())
    
    envelope = {
        "id": f"evt_test_{timestamp}",
        "type": event_type,
        "created": timestamp,
        "data": data
    }
    
    payload = json.dumps(envelope)
    message = f"{timestamp}.{payload}"
    signature = hmac.new(secret.encode(), message.encode(), hashlib.sha256).hexdigest()
    
    return {
        "headers": {
            "X-Webhook-ID": envelope["id"],
            "X-Webhook-Timestamp": str(timestamp),
            "X-Webhook-Signature": f"sha256={signature}",
            "Content-Type": "application/json"
        },
        "body": envelope
    }

# Generate test webhook
test = generate_test_webhook(
    secret="whsec_test123",
    event_type="kyc.session.processed",
    data={
        "event_type": "kyc.session.processed",
        "tenant_id": "test_tenant",
        "session_id": "test_session",
        "status": "approved",
        "final_decision": "approved"
    }
)
print(json.dumps(test, indent=2))

Error Handling

Your Endpoint Should:

  1. Return 2xx quickly - Within 10 seconds
  2. Process asynchronously - Use background tasks
  3. Be idempotent - Handle duplicate deliveries
  4. Log failures - For debugging

Common Issues

IssueSolution
Signature mismatchVerify you’re using the raw body, not parsed JSON
Timestamp validation failsCheck server clock synchronization
Missing eventsEnsure endpoint returns 2xx, check logs
Duplicate processingImplement idempotency using X-Webhook-ID

Rate Limits

Webhooks are not subject to API rate limits, but your endpoint should handle bursts:
  • Peak: Up to 100 webhooks/minute during high activity
  • Timeout: 10 seconds per webhook delivery
  • No retries: Failed webhooks are not automatically retried
For high-volume scenarios, consider using a queue-based architecture (SQS, Redis, RabbitMQ) to buffer incoming webhooks.

See Also