Overview
Adverse media checks use AI-powered analysis to search for negative news, criminal records, sanctions, and other risk factors about entities. This guide provides ready-to-use code snippets for integrating adverse media checks into your application.Supported Entity Types
| Type | Description | Use Case |
|---|---|---|
person | Individual analysis (default) | Background checks, KYC |
company | Company/organization analysis | Corporate due diligence |
website | Website/domain reputation | Merchant verification |
Copy
import requests
BASE_URL = "https://stg.kyc.legaltalent.ai"
headers = {
"Authorization": "Bearer YOUR_TOKEN",
"Content-Type": "application/json"
}
Basic Adverse Media Check
Copy
# Simple adverse media check
def check_adverse_media(name, entity_type="person", country=None, age=None, additional_info=None):
url = f"{BASE_URL}/kyc/adverse-media"
payload = {
"name": name,
"entity_type": entity_type
}
if country:
payload["country"] = country
if age:
payload["age"] = age
if additional_info:
payload["additional_info"] = additional_info
response = requests.post(url, json=payload, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
# Usage
result = check_adverse_media("John Doe")
print(f"Risk Score: {result['final_risk_score']}")
print(f"Decision: {result['decision']}")
print(f"Summary: {result['summary']}")
Person Check with Context
Copy
# Check person with additional context for better accuracy
def check_person_detailed(name, country, age, position=None, company=None):
url = f"{BASE_URL}/kyc/adverse-media"
payload = {
"name": name,
"entity_type": "person",
"country": country,
"age": age
}
if position and company:
payload["additional_info"] = f"{position} of {company}"
elif company:
payload["additional_info"] = f"Works at {company}"
response = requests.post(url, json=payload, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
# Usage
result = check_person_detailed(
"John Doe",
country="US",
age=45,
position="CEO",
company="Tech Corp"
)
# Process results
if result['decision'] == 'HIGH_RISK':
print("HIGH RISK - Manual review required")
for source in result['sources']:
print(f"- [{source['id']}] {source['title']}: {source['url']}")
Company Analysis
Copy
# Check company for adverse media
def check_company(company_name, country=None, additional_info=None):
url = f"{BASE_URL}/kyc/adverse-media"
payload = {
"name": company_name,
"entity_type": "company"
}
if country:
payload["country"] = country
if additional_info:
payload["additional_info"] = additional_info
response = requests.post(url, json=payload, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
# Usage
result = check_company(
"Tech Corp Inc",
country="US",
additional_info="Technology company, founded 2010"
)
print(f"Company: {result['entity_name']}")
print(f"Risk Score: {result['final_risk_score']}")
print(f"Decision: {result['decision']}")
print(f"Summary: {result['summary']}")
if result['sources']:
print(f"\nFound {len(result['sources'])} adverse sources:")
for source in result['sources']:
print(f"- [{source['id']}] {source['title']}")
print(f" URL: {source['url']}")
print(f" Summary: {source['summary']}")
Website/Domain Analysis
Analyze websites for fraud reports, security issues, blacklists, and consumer complaints.Copy
# Check website/domain reputation
def check_website(url_or_domain, additional_info=None):
url = f"{BASE_URL}/kyc/adverse-media"
payload = {
"name": url_or_domain,
"entity_type": "website"
}
if additional_info:
payload["additional_info"] = additional_info
response = requests.post(url, json=payload, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
# Usage - Check an e-commerce website
result = check_website(
"example-shop.com",
additional_info="E-commerce platform selling electronics"
)
print(f"Website: {result['entity_name']}")
print(f"Risk Score: {result['final_risk_score']}")
print(f"Decision: {result['decision']}")
print(f"Summary: {result['summary']}")
# Check decision
if result['decision'] in ['MEDIUM_RISK', 'HIGH_RISK']:
print("\n⚠️ Website requires manual review!")
for source in result['sources']:
print(f"\n [{source['id']}] {source['title']}")
print(f" {source['summary']}")
Using Different LLM Providers
Copy
# Check using OpenAI provider
def check_adverse_media_openai(name, entity_type="person", country=None, model="gpt-4o-mini"):
url = f"{BASE_URL}/kyc/adverse-media"
payload = {
"name": name,
"entity_type": entity_type,
"provider": "openai",
"model": model
}
if country:
payload["country"] = country
response = requests.post(url, json=payload, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
# Usage with OpenAI
result = check_adverse_media_openai("John Doe", country="US")
# Default provider (Bedrock with Claude 3.5 Haiku)
def check_adverse_media_bedrock(name, entity_type="person", country=None):
url = f"{BASE_URL}/kyc/adverse-media"
payload = {
"name": name,
"entity_type": entity_type
}
if country:
payload["country"] = country
# provider defaults to "bedrock"
response = requests.post(url, json=payload, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
# Usage with Bedrock (default)
result = check_adverse_media_bedrock("John Doe", country="US")
Analyzing Results
Copy
# Comprehensive result analysis
def analyze_adverse_media_result(result):
risk_score = result['final_risk_score']
decision = result['decision']
summary = result['summary']
sources = result.get('sources', [])
print(f"Entity: {result['entity_name']} ({result['entity_type']})")
print(f"Risk Score: {risk_score}/100")
print(f"Decision: {decision}")
print(f"\nSummary:\n{summary}\n")
if decision == 'CLEAR':
print("✅ No adverse media found - entity is clear")
elif decision == 'LOW_RISK':
print("ℹ️ Low risk - minor findings, monitor if needed")
elif decision == 'MEDIUM_RISK':
print("⚠️ Medium risk - review recommended")
elif decision == 'HIGH_RISK':
print("🚨 HIGH RISK - immediate review required")
if sources:
print(f"\nFound {len(sources)} adverse source(s):")
for source in sources:
print(f"\n [{source['id']}] {source['title']}")
print(f" URL: {source['url']}")
print(f" Summary: {source['summary']}")
print(f"\nProcessing time: {result.get('processing_time_ms', 0)}ms")
return {
'risk_score': risk_score,
'decision': decision,
'source_count': len(sources),
'requires_review': decision in ['MEDIUM_RISK', 'HIGH_RISK']
}
# Usage
result = check_adverse_media("John Doe", country="US", age=45)
analysis = analyze_adverse_media_result(result)
if analysis['requires_review']:
print("\n⚠️ Manual compliance review required")
Risk-Based Filtering
Copy
# Filter results by risk level
def filter_by_risk_level(result, min_risk_score=None, risk_decisions=None):
risk_score = result['final_risk_score']
decision = result['decision']
if min_risk_score and risk_score < min_risk_score:
return False
if risk_decisions and decision not in risk_decisions:
return False
return True
# Check multiple entities and filter
def check_multiple_entities(entities, min_risk_score=41):
results = []
for entity in entities:
try:
result = check_adverse_media(
entity['name'],
entity_type=entity.get('entity_type', 'person'),
country=entity.get('country'),
age=entity.get('age'),
additional_info=entity.get('additional_info')
)
if filter_by_risk_level(result, min_risk_score=min_risk_score):
results.append({
'entity': entity['name'],
'entity_type': result['entity_type'],
'result': result
})
except Exception as e:
print(f"Error checking {entity['name']}: {e}")
return results
# Usage
entities = [
{"name": "John Doe", "entity_type": "person", "country": "US", "age": 45},
{"name": "Tech Corp Inc", "entity_type": "company", "country": "US"},
{"name": "example-shop.com", "entity_type": "website"}
]
flagged_entities = check_multiple_entities(entities, min_risk_score=41)
print(f"Found {len(flagged_entities)} entities requiring review")
Merchant Onboarding Workflow
Check both a company and their website before onboarding as a merchant partner.Copy
# Complete merchant onboarding check
def merchant_onboarding_check(company_name, website, country=None):
results = {
'company_check': None,
'website_check': None,
'overall_decision': 'PENDING'
}
# 1. Company adverse media check
company_payload = {
"name": company_name,
"entity_type": "company"
}
if country:
company_payload["country"] = country
company_response = requests.post(
f"{BASE_URL}/kyc/adverse-media",
json=company_payload,
headers=headers,
timeout=30
)
results['company_check'] = company_response.json()
# 2. Website reputation check
website_payload = {
"name": website,
"entity_type": "website",
"additional_info": f"Website for {company_name}"
}
website_response = requests.post(
f"{BASE_URL}/kyc/adverse-media",
json=website_payload,
headers=headers,
timeout=30
)
results['website_check'] = website_response.json()
# 3. Determine overall decision
company_risk = results['company_check']['decision']
website_risk = results['website_check']['decision']
if company_risk == 'HIGH_RISK' or website_risk == 'HIGH_RISK':
results['overall_decision'] = 'REJECTED'
elif company_risk == 'MEDIUM_RISK' or website_risk == 'MEDIUM_RISK':
results['overall_decision'] = 'MANUAL_REVIEW'
elif company_risk == 'LOW_RISK' or website_risk == 'LOW_RISK':
results['overall_decision'] = 'REVIEW_RECOMMENDED'
else:
results['overall_decision'] = 'APPROVED'
return results
# Usage
onboarding = merchant_onboarding_check(
"Example Shop Inc",
"example-shop.com",
country="US"
)
print(f"Company Risk: {onboarding['company_check']['decision']}")
print(f"Website Risk: {onboarding['website_check']['decision']}")
print(f"Overall Decision: {onboarding['overall_decision']}")
if onboarding['overall_decision'] != 'APPROVED':
print("⚠️ Additional review required before merchant onboarding")
Customer Onboarding Workflow
Combine watchlist check and adverse media check for comprehensive KYC.Copy
# Complete customer onboarding check combining watchlist and adverse media
def customer_onboarding_check(name, country=None, age=None, document_id=None):
results = {
'watchlist_check': None,
'adverse_media_check': None,
'overall_decision': 'PENDING'
}
# 1. Watchlist check
watchlist_payload = {
"subject": {"full_name": name}
}
if country:
watchlist_payload["subject"]["nationality"] = country
if document_id:
watchlist_payload["subject"]["document_id"] = document_id
watchlist_response = requests.post(
f"{BASE_URL}/kyc",
json=watchlist_payload,
headers=headers,
timeout=30
)
results['watchlist_check'] = watchlist_response.json()
# 2. Adverse media check
adverse_media_payload = {
"name": name,
"entity_type": "person"
}
if country:
adverse_media_payload["country"] = country
if age:
adverse_media_payload["age"] = age
adverse_response = requests.post(
f"{BASE_URL}/kyc/adverse-media",
json=adverse_media_payload,
headers=headers,
timeout=30
)
results['adverse_media_check'] = adverse_response.json()
# 3. Determine overall decision
watchlist_match = results['watchlist_check'].get('result', {}).get('is_match', False)
adverse_risk = results['adverse_media_check']['decision']
if watchlist_match:
results['overall_decision'] = 'REJECTED'
elif adverse_risk == 'HIGH_RISK':
results['overall_decision'] = 'REVIEW_REQUIRED'
elif adverse_risk == 'MEDIUM_RISK':
results['overall_decision'] = 'MANUAL_REVIEW'
else:
results['overall_decision'] = 'APPROVED'
return results
# Usage
onboarding_result = customer_onboarding_check(
"John Doe",
country="US",
age=45,
document_id="P123456"
)
print(f"Onboarding Decision: {onboarding_result['overall_decision']}")
if onboarding_result['overall_decision'] != 'APPROVED':
print("⚠️ Additional review required")
Error Handling and Retries
Copy
import time
from requests.exceptions import RequestException, Timeout
def check_adverse_media_with_retry(name, entity_type="person", country=None, max_retries=3, timeout=30):
url = f"{BASE_URL}/kyc/adverse-media"
payload = {
"name": name,
"entity_type": entity_type
}
if country:
payload["country"] = country
for attempt in range(max_retries):
try:
response = requests.post(
url,
json=payload,
headers=headers,
timeout=timeout
)
response.raise_for_status()
return response.json()
except Timeout:
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2 # Exponential backoff
print(f"Timeout on attempt {attempt + 1}, retrying in {wait_time}s...")
time.sleep(wait_time)
else:
raise Exception("Request timed out after all retries")
except RequestException as e:
if attempt < max_retries - 1:
print(f"Request failed on attempt {attempt + 1}: {e}, retrying...")
time.sleep(2)
else:
raise
return None
# Usage with retry logic
try:
result = check_adverse_media_with_retry("John Doe", country="US")
print(f"Risk Score: {result['final_risk_score']}")
except Exception as e:
print(f"Failed to check adverse media: {e}")
Risk Score Thresholds
Copy
# Risk-based decision making
def get_risk_category(risk_score):
if risk_score <= 20:
return "CLEAR"
elif risk_score <= 40:
return "LOW_RISK"
elif risk_score <= 70:
return "MEDIUM_RISK"
else:
return "HIGH_RISK"
def should_require_review(result, risk_threshold=41):
risk_score = result['final_risk_score']
decision = result['decision']
if risk_score >= risk_threshold:
return True
# Also review if there are any adverse sources
if len(result.get('sources', [])) > 0:
return True
return False
# Automated decision workflow
def process_adverse_media_check(name, entity_type="person", country=None, auto_approve_threshold=20):
result = check_adverse_media(name, entity_type=entity_type, country=country)
risk_score = result['final_risk_score']
decision = result['decision']
if risk_score <= auto_approve_threshold:
return {
'status': 'AUTO_APPROVED',
'risk_score': risk_score,
'entity_type': result['entity_type'],
'reason': 'Risk score below threshold'
}
elif should_require_review(result):
return {
'status': 'REVIEW_REQUIRED',
'risk_score': risk_score,
'decision': decision,
'entity_type': result['entity_type'],
'sources': result.get('sources', []),
'reason': f'Risk score {risk_score} exceeds threshold'
}
else:
return {
'status': 'MANUAL_REVIEW',
'risk_score': risk_score,
'entity_type': result['entity_type'],
'reason': 'Standard review process'
}
Related References
- Adverse Media API Reference - Complete API documentation
- List Check API - Traditional watchlist checks
- Validate Person or Company - Entity validation guide
- Watchlists Guide - Continuous monitoring with watchlists