Skip to main content

Overview

Adverse media checks use AI-powered analysis to search for negative news, criminal records, sanctions, and other risk factors about entities. This guide provides ready-to-use code snippets for integrating adverse media checks into your application.

Supported Entity Types

TypeDescriptionUse Case
personIndividual analysis (default)Background checks, KYC
companyCompany/organization analysisCorporate due diligence
websiteWebsite/domain reputationMerchant verification
import requests

BASE_URL = "https://stg.kyc.legaltalent.ai"
headers = {
    "Authorization": "Bearer YOUR_TOKEN",
    "Content-Type": "application/json"
}

Basic Adverse Media Check

# Simple adverse media check
def check_adverse_media(name, entity_type="person", country=None, age=None, additional_info=None):
    url = f"{BASE_URL}/kyc/adverse-media"
    payload = {
        "name": name,
        "entity_type": entity_type
    }
    
    if country:
        payload["country"] = country
    if age:
        payload["age"] = age
    if additional_info:
        payload["additional_info"] = additional_info
    
    response = requests.post(url, json=payload, headers=headers, timeout=30)
    response.raise_for_status()
    return response.json()

# Usage
result = check_adverse_media("John Doe")
print(f"Risk Score: {result['final_risk_score']}")
print(f"Decision: {result['decision']}")
print(f"Summary: {result['summary']}")

Person Check with Context

# Check person with additional context for better accuracy
def check_person_detailed(name, country, age, position=None, company=None):
    url = f"{BASE_URL}/kyc/adverse-media"
    payload = {
        "name": name,
        "entity_type": "person",
        "country": country,
        "age": age
    }
    
    if position and company:
        payload["additional_info"] = f"{position} of {company}"
    elif company:
        payload["additional_info"] = f"Works at {company}"
    
    response = requests.post(url, json=payload, headers=headers, timeout=30)
    response.raise_for_status()
    return response.json()

# Usage
result = check_person_detailed(
    "John Doe",
    country="US",
    age=45,
    position="CEO",
    company="Tech Corp"
)

# Process results
if result['decision'] == 'HIGH_RISK':
    print("HIGH RISK - Manual review required")
    for source in result['sources']:
        print(f"- [{source['id']}] {source['title']}: {source['url']}")

Company Analysis

# Check company for adverse media
def check_company(company_name, country=None, additional_info=None):
    url = f"{BASE_URL}/kyc/adverse-media"
    payload = {
        "name": company_name,
        "entity_type": "company"
    }
    
    if country:
        payload["country"] = country
    if additional_info:
        payload["additional_info"] = additional_info
    
    response = requests.post(url, json=payload, headers=headers, timeout=30)
    response.raise_for_status()
    return response.json()

# Usage
result = check_company(
    "Tech Corp Inc",
    country="US",
    additional_info="Technology company, founded 2010"
)

print(f"Company: {result['entity_name']}")
print(f"Risk Score: {result['final_risk_score']}")
print(f"Decision: {result['decision']}")
print(f"Summary: {result['summary']}")

if result['sources']:
    print(f"\nFound {len(result['sources'])} adverse sources:")
    for source in result['sources']:
        print(f"- [{source['id']}] {source['title']}")
        print(f"  URL: {source['url']}")
        print(f"  Summary: {source['summary']}")

Website/Domain Analysis

Analyze websites for fraud reports, security issues, blacklists, and consumer complaints.
# Check website/domain reputation
def check_website(url_or_domain, additional_info=None):
    url = f"{BASE_URL}/kyc/adverse-media"
    payload = {
        "name": url_or_domain,
        "entity_type": "website"
    }
    
    if additional_info:
        payload["additional_info"] = additional_info
    
    response = requests.post(url, json=payload, headers=headers, timeout=30)
    response.raise_for_status()
    return response.json()

# Usage - Check an e-commerce website
result = check_website(
    "example-shop.com",
    additional_info="E-commerce platform selling electronics"
)

print(f"Website: {result['entity_name']}")
print(f"Risk Score: {result['final_risk_score']}")
print(f"Decision: {result['decision']}")
print(f"Summary: {result['summary']}")

# Check decision
if result['decision'] in ['MEDIUM_RISK', 'HIGH_RISK']:
    print("\n⚠️ Website requires manual review!")
    for source in result['sources']:
        print(f"\n  [{source['id']}] {source['title']}")
        print(f"  {source['summary']}")

Using Different LLM Providers

# Check using OpenAI provider
def check_adverse_media_openai(name, entity_type="person", country=None, model="gpt-4o-mini"):
    url = f"{BASE_URL}/kyc/adverse-media"
    payload = {
        "name": name,
        "entity_type": entity_type,
        "provider": "openai",
        "model": model
    }
    
    if country:
        payload["country"] = country
    
    response = requests.post(url, json=payload, headers=headers, timeout=30)
    response.raise_for_status()
    return response.json()

# Usage with OpenAI
result = check_adverse_media_openai("John Doe", country="US")

# Default provider (Bedrock with Claude 3.5 Haiku)
def check_adverse_media_bedrock(name, entity_type="person", country=None):
    url = f"{BASE_URL}/kyc/adverse-media"
    payload = {
        "name": name,
        "entity_type": entity_type
    }
    if country:
        payload["country"] = country
    # provider defaults to "bedrock"
    
    response = requests.post(url, json=payload, headers=headers, timeout=30)
    response.raise_for_status()
    return response.json()

# Usage with Bedrock (default)
result = check_adverse_media_bedrock("John Doe", country="US")

Analyzing Results

# Comprehensive result analysis
def analyze_adverse_media_result(result):
    risk_score = result['final_risk_score']
    decision = result['decision']
    summary = result['summary']
    sources = result.get('sources', [])
    
    print(f"Entity: {result['entity_name']} ({result['entity_type']})")
    print(f"Risk Score: {risk_score}/100")
    print(f"Decision: {decision}")
    print(f"\nSummary:\n{summary}\n")
    
    if decision == 'CLEAR':
        print("✅ No adverse media found - entity is clear")
    elif decision == 'LOW_RISK':
        print("ℹ️ Low risk - minor findings, monitor if needed")
    elif decision == 'MEDIUM_RISK':
        print("⚠️ Medium risk - review recommended")
    elif decision == 'HIGH_RISK':
        print("🚨 HIGH RISK - immediate review required")
    
    if sources:
        print(f"\nFound {len(sources)} adverse source(s):")
        for source in sources:
            print(f"\n  [{source['id']}] {source['title']}")
            print(f"  URL: {source['url']}")
            print(f"  Summary: {source['summary']}")
    
    print(f"\nProcessing time: {result.get('processing_time_ms', 0)}ms")
    
    return {
        'risk_score': risk_score,
        'decision': decision,
        'source_count': len(sources),
        'requires_review': decision in ['MEDIUM_RISK', 'HIGH_RISK']
    }

# Usage
result = check_adverse_media("John Doe", country="US", age=45)
analysis = analyze_adverse_media_result(result)

if analysis['requires_review']:
    print("\n⚠️ Manual compliance review required")

Risk-Based Filtering

# Filter results by risk level
def filter_by_risk_level(result, min_risk_score=None, risk_decisions=None):
    risk_score = result['final_risk_score']
    decision = result['decision']
    
    if min_risk_score and risk_score < min_risk_score:
        return False
    
    if risk_decisions and decision not in risk_decisions:
        return False
    
    return True

# Check multiple entities and filter
def check_multiple_entities(entities, min_risk_score=41):
    results = []
    
    for entity in entities:
        try:
            result = check_adverse_media(
                entity['name'],
                entity_type=entity.get('entity_type', 'person'),
                country=entity.get('country'),
                age=entity.get('age'),
                additional_info=entity.get('additional_info')
            )
            
            if filter_by_risk_level(result, min_risk_score=min_risk_score):
                results.append({
                    'entity': entity['name'],
                    'entity_type': result['entity_type'],
                    'result': result
                })
        except Exception as e:
            print(f"Error checking {entity['name']}: {e}")
    
    return results

# Usage
entities = [
    {"name": "John Doe", "entity_type": "person", "country": "US", "age": 45},
    {"name": "Tech Corp Inc", "entity_type": "company", "country": "US"},
    {"name": "example-shop.com", "entity_type": "website"}
]

flagged_entities = check_multiple_entities(entities, min_risk_score=41)
print(f"Found {len(flagged_entities)} entities requiring review")

Merchant Onboarding Workflow

Check both a company and their website before onboarding as a merchant partner.
# Complete merchant onboarding check
def merchant_onboarding_check(company_name, website, country=None):
    results = {
        'company_check': None,
        'website_check': None,
        'overall_decision': 'PENDING'
    }
    
    # 1. Company adverse media check
    company_payload = {
        "name": company_name,
        "entity_type": "company"
    }
    if country:
        company_payload["country"] = country
    
    company_response = requests.post(
        f"{BASE_URL}/kyc/adverse-media",
        json=company_payload,
        headers=headers,
        timeout=30
    )
    results['company_check'] = company_response.json()
    
    # 2. Website reputation check
    website_payload = {
        "name": website,
        "entity_type": "website",
        "additional_info": f"Website for {company_name}"
    }
    
    website_response = requests.post(
        f"{BASE_URL}/kyc/adverse-media",
        json=website_payload,
        headers=headers,
        timeout=30
    )
    results['website_check'] = website_response.json()
    
    # 3. Determine overall decision
    company_risk = results['company_check']['decision']
    website_risk = results['website_check']['decision']
    
    if company_risk == 'HIGH_RISK' or website_risk == 'HIGH_RISK':
        results['overall_decision'] = 'REJECTED'
    elif company_risk == 'MEDIUM_RISK' or website_risk == 'MEDIUM_RISK':
        results['overall_decision'] = 'MANUAL_REVIEW'
    elif company_risk == 'LOW_RISK' or website_risk == 'LOW_RISK':
        results['overall_decision'] = 'REVIEW_RECOMMENDED'
    else:
        results['overall_decision'] = 'APPROVED'
    
    return results

# Usage
onboarding = merchant_onboarding_check(
    "Example Shop Inc",
    "example-shop.com",
    country="US"
)

print(f"Company Risk: {onboarding['company_check']['decision']}")
print(f"Website Risk: {onboarding['website_check']['decision']}")
print(f"Overall Decision: {onboarding['overall_decision']}")

if onboarding['overall_decision'] != 'APPROVED':
    print("⚠️ Additional review required before merchant onboarding")

Customer Onboarding Workflow

Combine watchlist check and adverse media check for comprehensive KYC.
# Complete customer onboarding check combining watchlist and adverse media
def customer_onboarding_check(name, country=None, age=None, document_id=None):
    results = {
        'watchlist_check': None,
        'adverse_media_check': None,
        'overall_decision': 'PENDING'
    }
    
    # 1. Watchlist check
    watchlist_payload = {
        "subject": {"full_name": name}
    }
    if country:
        watchlist_payload["subject"]["nationality"] = country
    if document_id:
        watchlist_payload["subject"]["document_id"] = document_id
    
    watchlist_response = requests.post(
        f"{BASE_URL}/kyc",
        json=watchlist_payload,
        headers=headers,
        timeout=30
    )
    results['watchlist_check'] = watchlist_response.json()
    
    # 2. Adverse media check
    adverse_media_payload = {
        "name": name,
        "entity_type": "person"
    }
    if country:
        adverse_media_payload["country"] = country
    if age:
        adverse_media_payload["age"] = age
    
    adverse_response = requests.post(
        f"{BASE_URL}/kyc/adverse-media",
        json=adverse_media_payload,
        headers=headers,
        timeout=30
    )
    results['adverse_media_check'] = adverse_response.json()
    
    # 3. Determine overall decision
    watchlist_match = results['watchlist_check'].get('result', {}).get('is_match', False)
    adverse_risk = results['adverse_media_check']['decision']
    
    if watchlist_match:
        results['overall_decision'] = 'REJECTED'
    elif adverse_risk == 'HIGH_RISK':
        results['overall_decision'] = 'REVIEW_REQUIRED'
    elif adverse_risk == 'MEDIUM_RISK':
        results['overall_decision'] = 'MANUAL_REVIEW'
    else:
        results['overall_decision'] = 'APPROVED'
    
    return results

# Usage
onboarding_result = customer_onboarding_check(
    "John Doe",
    country="US",
    age=45,
    document_id="P123456"
)

print(f"Onboarding Decision: {onboarding_result['overall_decision']}")
if onboarding_result['overall_decision'] != 'APPROVED':
    print("⚠️ Additional review required")

Error Handling and Retries

import time
from requests.exceptions import RequestException, Timeout

def check_adverse_media_with_retry(name, entity_type="person", country=None, max_retries=3, timeout=30):
    url = f"{BASE_URL}/kyc/adverse-media"
    payload = {
        "name": name,
        "entity_type": entity_type
    }
    if country:
        payload["country"] = country
    
    for attempt in range(max_retries):
        try:
            response = requests.post(
                url,
                json=payload,
                headers=headers,
                timeout=timeout
            )
            response.raise_for_status()
            return response.json()
            
        except Timeout:
            if attempt < max_retries - 1:
                wait_time = (attempt + 1) * 2  # Exponential backoff
                print(f"Timeout on attempt {attempt + 1}, retrying in {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise Exception("Request timed out after all retries")
        
        except RequestException as e:
            if attempt < max_retries - 1:
                print(f"Request failed on attempt {attempt + 1}: {e}, retrying...")
                time.sleep(2)
            else:
                raise
    
    return None

# Usage with retry logic
try:
    result = check_adverse_media_with_retry("John Doe", country="US")
    print(f"Risk Score: {result['final_risk_score']}")
except Exception as e:
    print(f"Failed to check adverse media: {e}")

Risk Score Thresholds

# Risk-based decision making
def get_risk_category(risk_score):
    if risk_score <= 20:
        return "CLEAR"
    elif risk_score <= 40:
        return "LOW_RISK"
    elif risk_score <= 70:
        return "MEDIUM_RISK"
    else:
        return "HIGH_RISK"

def should_require_review(result, risk_threshold=41):
    risk_score = result['final_risk_score']
    decision = result['decision']
    
    if risk_score >= risk_threshold:
        return True
    
    # Also review if there are any adverse sources
    if len(result.get('sources', [])) > 0:
        return True
    
    return False

# Automated decision workflow
def process_adverse_media_check(name, entity_type="person", country=None, auto_approve_threshold=20):
    result = check_adverse_media(name, entity_type=entity_type, country=country)
    
    risk_score = result['final_risk_score']
    decision = result['decision']
    
    if risk_score <= auto_approve_threshold:
        return {
            'status': 'AUTO_APPROVED',
            'risk_score': risk_score,
            'entity_type': result['entity_type'],
            'reason': 'Risk score below threshold'
        }
    elif should_require_review(result):
        return {
            'status': 'REVIEW_REQUIRED',
            'risk_score': risk_score,
            'decision': decision,
            'entity_type': result['entity_type'],
            'sources': result.get('sources', []),
            'reason': f'Risk score {risk_score} exceeds threshold'
        }
    else:
        return {
            'status': 'MANUAL_REVIEW',
            'risk_score': risk_score,
            'entity_type': result['entity_type'],
            'reason': 'Standard review process'
        }