Overview
Watchlists enable continuous monitoring of entities against sanctions lists. This guide provides ready-to-use code snippets for common watchlist operations. Key features:- Tags: Organize watchlists and subjects with custom tags for filtering and categorization
- Session linking: Connect subjects to onboarding sessions via
session_id - TTL-based expiration: Subjects automatically expire based on tenant configuration
Copy
import requests
BASE_URL = "https://stg.kyc.legaltalent.ai"
headers = {
"Authorization": "Bearer YOUR_TOKEN",
"Content-Type": "application/json"
}
Create Watchlist
Copy
# Create a basic watchlist with tags
def create_watchlist(name, lists_to_monitor=None, check_frequency="daily", tags=None):
url = f"{BASE_URL}/kyc/watchlists"
payload = {
"name": name,
"check_frequency": check_frequency
}
if lists_to_monitor:
payload["lists_to_monitor"] = lists_to_monitor
if tags:
payload["tags"] = tags
response = requests.post(url, json=payload, headers=headers)
return response.json()
# Usage
watchlist = create_watchlist(
"High Risk Customers",
["ofac", "un"],
tags=["compliance", "q4-review"]
)
print(f"Created watchlist: {watchlist['data']['watchlist_id']}")
# Create watchlist with initial subjects (including tags and session_id)
def create_watchlist_with_subjects(name, subjects, lists_to_monitor=None, tags=None):
url = f"{BASE_URL}/kyc/watchlists"
payload = {
"name": name,
"subjects": subjects,
"check_frequency": "daily"
}
if lists_to_monitor:
payload["lists_to_monitor"] = lists_to_monitor
if tags:
payload["tags"] = tags
response = requests.post(url, json=payload, headers=headers)
return response.json()
# Usage with subjects including tags and session_id
subjects = [
{
"full_name": "John Doe",
"identifier": "12345678",
"identifier_type": "document",
"tags": ["vip", "high-risk"],
"session_id": "sess_abc123"
},
{
"full_name": "Acme Corporation",
"identifier": "TAX-987654",
"identifier_type": "tax_id",
"tags": ["vendor"]
}
]
watchlist = create_watchlist_with_subjects(
"Vendor Monitoring",
subjects,
tags=["vendors", "2024"]
)
Create Watchlist with Alerts
Notification channels (webhooks, emails) are configured at the tenant level. The
alert_config only controls when to send alerts, not where. Contact support to configure your notification channels.Copy
# Create watchlist with alert configuration
def create_watchlist_with_alerts(name, on_new_match=True, on_status_change=True):
url = f"{BASE_URL}/kyc/watchlists"
payload = {
"name": name,
"check_frequency": "daily",
"lists_to_monitor": ["ofac", "un", "eu"],
"alert_config": {
"on_new_match": on_new_match,
"on_status_change": on_status_change
}
}
response = requests.post(url, json=payload, headers=headers)
return response.json()
# Usage
watchlist = create_watchlist_with_alerts(
"Executive Monitoring",
on_new_match=True,
on_status_change=True
)
List All Watchlists
Copy
# List all watchlists with optional filtering
def list_watchlists(active_only=False, tags=None):
url = f"{BASE_URL}/kyc/watchlists"
params = {}
if active_only:
params["active"] = "true"
if tags:
params["tags"] = ",".join(tags)
response = requests.get(url, headers=headers, params=params)
return response.json()
# Usage - get all watchlists
watchlists = list_watchlists()
for wl in watchlists.get("data", {}).get("watchlists", []):
print(f"{wl['name']} - {wl['watchlist_id']} ({wl['status']}) - Tags: {wl.get('tags', [])}")
# Get only active watchlists
active_watchlists = list_watchlists(active_only=True)
print(f"Active watchlists: {active_watchlists['data']['count']}")
# Filter by tags
compliance_watchlists = list_watchlists(tags=["compliance", "high-priority"])
print(f"Compliance watchlists: {compliance_watchlists['data']['count']}")
Get Watchlist Details
Copy
# Get watchlist details
def get_watchlist(watchlist_id):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}"
response = requests.get(url, headers=headers)
return response.json()
# Usage
watchlist = get_watchlist("550e8400-e29b-41d4-a716-446655440000")
print(f"Name: {watchlist['data']['name']}")
print(f"Subjects: {len(watchlist['data']['subjects'])}")
print(f"Last checked: {watchlist['data'].get('last_checked_at')}")
Add Subjects
Copy
# Add single subject with tags and session_id
def add_subject(watchlist_id, full_name, identifier=None, identifier_type=None, tags=None, session_id=None):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}/subjects"
payload = {"full_name": full_name}
if identifier:
payload["identifier"] = identifier
if identifier_type:
payload["identifier_type"] = identifier_type
if tags:
payload["tags"] = tags
if session_id:
payload["session_id"] = session_id
response = requests.post(url, json=payload, headers=headers)
return response.json()
# Usage
result = add_subject(
"550e8400-e29b-41d4-a716-446655440000",
"Jane Smith",
identifier="98765432",
identifier_type="document",
tags=["priority", "manual-add"],
session_id="sess_xyz789"
)
# Response includes expiration info
print(f"Subject ID: {result['data']['subject_id']}")
print(f"Expires at: {result['data']['expires_at']}")
print(f"Duration: {result['data']['duration_days']} days")
# Add multiple subjects (batch) with tags
def add_subjects_batch(watchlist_id, subjects):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}/subjects/batch"
payload = {"subjects": subjects}
response = requests.post(url, json=payload, headers=headers)
return response.json()
# Usage with tags and session_id
subjects = [
{
"full_name": "John Doe",
"identifier": "12345678",
"identifier_type": "document",
"tags": ["batch-import", "priority"]
},
{
"full_name": "Acme Corporation",
"identifier": "TAX-987654",
"identifier_type": "tax_id",
"tags": ["vendor", "verified"]
},
{
"full_name": "Crypto Wallet Owner",
"identifier": "0x742d35Cc6634C0532925a3b844Bc9e7595f0bEb",
"identifier_type": "wallet",
"tags": ["crypto"],
"session_id": "sess_crypto001"
}
]
result = add_subjects_batch("550e8400-e29b-41d4-a716-446655440000", subjects)
print(f"Added {result['data']['added_count']} subjects")
List Subjects
Copy
# List subjects from a watchlist with optional tag filtering
def list_subjects(watchlist_id, tags=None):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}/subjects"
params = {}
if tags:
params["tags"] = ",".join(tags)
response = requests.get(url, headers=headers, params=params)
return response.json()
# Usage - get all subjects
result = list_subjects("550e8400-e29b-41d4-a716-446655440000")
for subject in result["data"]["subjects"]:
print(f"{subject['full_name']} - Tags: {subject.get('tags', [])}")
# Filter by tags
priority_subjects = list_subjects(
"550e8400-e29b-41d4-a716-446655440000",
tags=["priority", "vip"]
)
print(f"Found {priority_subjects['data']['count']} priority subjects")
Update Subject Tags
Copy
# Update tags for a subject
def update_subject_tags(watchlist_id, subject_id, tags):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}/subjects/{subject_id}"
payload = {"tags": tags}
response = requests.patch(url, json=payload, headers=headers)
return response.json()
# Usage - replace subject tags
result = update_subject_tags(
"550e8400-e29b-41d4-a716-446655440000",
"660e8400-e29b-41d4-a716-446655440001",
["reviewed", "cleared", "low-risk"]
)
print(f"Updated tags: {result['data']['tags']}")
Remove Subject
Copy
# Remove subject from watchlist
def remove_subject(watchlist_id, subject_id):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}/subjects/{subject_id}"
response = requests.delete(url, headers=headers)
return response.json()
# Usage
result = remove_subject(
"550e8400-e29b-41d4-a716-446655440000",
"660e8400-e29b-41d4-a716-446655440001"
)
Update Watchlist
Copy
# Update watchlist configuration
def update_watchlist(watchlist_id, name=None, check_frequency=None,
lists_to_monitor=None, status=None, alert_config=None, tags=None):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}"
payload = {}
if name:
payload["name"] = name
if check_frequency:
payload["check_frequency"] = check_frequency
if lists_to_monitor:
payload["lists_to_monitor"] = lists_to_monitor
if status:
payload["status"] = status
if alert_config:
payload["alert_config"] = alert_config
if tags is not None:
payload["tags"] = tags
response = requests.patch(url, json=payload, headers=headers) # Use PATCH, not PUT
return response.json()
# Usage examples
# Update name
update_watchlist("550e8400-e29b-41d4-a716-446655440000", name="Updated Name")
# Change frequency
update_watchlist("550e8400-e29b-41d4-a716-446655440000", check_frequency="weekly")
# Pause watchlist
update_watchlist("550e8400-e29b-41d4-a716-446655440000", status="paused")
# Resume watchlist
update_watchlist("550e8400-e29b-41d4-a716-446655440000", status="active")
# Update tags
update_watchlist("550e8400-e29b-41d4-a716-446655440000", tags=["reviewed", "priority"])
# Update alert config (only when to alert, not where)
update_watchlist(
"550e8400-e29b-41d4-a716-446655440000",
alert_config={
"on_new_match": True,
"on_status_change": True
}
)
Trigger Monitoring
Run an immediate screening check on all subjects in a watchlist without waiting for the scheduled check.Copy
# Trigger monitoring immediately
def trigger_monitoring(watchlist_id):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}/monitor"
response = requests.post(url, headers=headers)
return response.json()
# Usage
result = trigger_monitoring("550e8400-e29b-41d4-a716-446655440000")
if result["status"] == "success":
data = result["data"]
print(f"Checked {data['subjects_checked']} subjects")
print(f"Found {data['new_matches_count']} new matches")
if data["has_changes"]:
print("⚠️ Match status changed since last check!")
Delete Watchlist
Copy
# Delete watchlist
def delete_watchlist(watchlist_id):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}"
response = requests.delete(url, headers=headers)
return response.json()
# Usage
result = delete_watchlist("550e8400-e29b-41d4-a716-446655440000")
Common Workflows
Complete Watchlist Management
Copy
class WatchlistManager:
def __init__(self, base_url, token):
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
def create(self, name, lists_to_monitor=None, tags=None):
url = f"{self.base_url}/kyc/watchlists"
payload = {"name": name, "check_frequency": "daily"}
if lists_to_monitor:
payload["lists_to_monitor"] = lists_to_monitor
if tags:
payload["tags"] = tags
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def get(self, watchlist_id):
url = f"{self.base_url}/kyc/watchlists/{watchlist_id}"
response = requests.get(url, headers=self.headers)
return response.json()
def add_subject(self, watchlist_id, full_name, identifier=None, identifier_type=None, tags=None, session_id=None):
url = f"{self.base_url}/kyc/watchlists/{watchlist_id}/subjects"
payload = {"full_name": full_name}
if identifier:
payload["identifier"] = identifier
if identifier_type:
payload["identifier_type"] = identifier_type
if tags:
payload["tags"] = tags
if session_id:
payload["session_id"] = session_id
response = requests.post(url, json=payload, headers=self.headers)
return response.json()
def list_subjects(self, watchlist_id, tags=None):
url = f"{self.base_url}/kyc/watchlists/{watchlist_id}/subjects"
params = {}
if tags:
params["tags"] = ",".join(tags)
response = requests.get(url, headers=self.headers, params=params)
return response.json()
def update_subject_tags(self, watchlist_id, subject_id, tags):
url = f"{self.base_url}/kyc/watchlists/{watchlist_id}/subjects/{subject_id}"
payload = {"tags": tags}
response = requests.patch(url, json=payload, headers=self.headers)
return response.json()
def list_all(self, tags=None):
url = f"{self.base_url}/kyc/watchlists"
params = {}
if tags:
params["tags"] = ",".join(tags)
response = requests.get(url, headers=self.headers, params=params)
return response.json()
def delete(self, watchlist_id):
url = f"{self.base_url}/kyc/watchlists/{watchlist_id}"
response = requests.delete(url, headers=self.headers)
return response.json()
# Usage
manager = WatchlistManager(BASE_URL, "YOUR_TOKEN")
# Create watchlist with tags
watchlist = manager.create("Customer Monitoring", ["ofac", "un"], tags=["compliance"])
watchlist_id = watchlist["data"]["watchlist_id"]
# Add subjects with tags and session_id
manager.add_subject(watchlist_id, "John Doe", "12345678", "document", tags=["vip"], session_id="sess_001")
manager.add_subject(watchlist_id, "Jane Smith", "87654321", "document", tags=["priority"])
# Get details
details = manager.get(watchlist_id)
print(f"Monitoring {len(details['data']['subjects'])} subjects")
# List subjects filtered by tags
vip_subjects = manager.list_subjects(watchlist_id, tags=["vip"])
print(f"VIP subjects: {vip_subjects['data']['count']}")
Bulk Import Subjects
Copy
# Import multiple subjects from a list with tags
def import_subjects_to_watchlist(watchlist_id, subjects_list):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}/subjects/batch"
# Process in batches of 50
batch_size = 50
results = []
for i in range(0, len(subjects_list), batch_size):
batch = subjects_list[i:i + batch_size]
payload = {"subjects": batch}
response = requests.post(url, json=payload, headers=headers)
results.append(response.json())
return results
# Usage with tags
subjects_to_import = [
{
"full_name": f"Person {i}",
"identifier": f"ID{i}",
"identifier_type": "document",
"tags": ["bulk-import", "batch-1"]
}
for i in range(1, 101)
]
results = import_subjects_to_watchlist("550e8400-e29b-41d4-a716-446655440000", subjects_to_import)
# Check results
for batch_result in results:
if batch_result["status"] == "success":
print(f"Added {batch_result['data']['added_count']} subjects")
Error Handling
Copy
import requests
from requests.exceptions import RequestException
def safe_watchlist_operation(operation_func, *args, **kwargs):
try:
result = operation_func(*args, **kwargs)
if "error" in result:
print(f"API Error: {result['error']}")
return None
return result
except RequestException as e:
print(f"Request failed: {e}")
return None
except Exception as e:
print(f"Unexpected error: {e}")
return None
# Usage
def create_watchlist_safe(name):
def _create():
url = f"{BASE_URL}/kyc/watchlists"
response = requests.post(url, json={"name": name}, headers=headers, timeout=30)
response.raise_for_status()
return response.json()
return safe_watchlist_operation(_create)
Handling Seat Limits
When adding subjects, the API validates available seats. Handle 402 errors gracefully:Copy
def add_subject_with_seat_check(watchlist_id, full_name, identifier=None, tags=None):
url = f"{BASE_URL}/kyc/watchlists/{watchlist_id}/subjects"
payload = {"full_name": full_name}
if identifier:
payload["identifier"] = identifier
if tags:
payload["tags"] = tags
response = requests.post(url, json=payload, headers=headers)
result = response.json()
if response.status_code == 402:
print("⚠️ No seats available! Please purchase more seats.")
return None
if result["status"] == "success":
data = result["data"]
# Subject expires at this timestamp (Unix timestamp)
expires_at = data["expires_at"]
duration_days = data["duration_days"]
subject_id = data["subject_id"]
print(f"Subject {subject_id} added. Expires in {duration_days} days (at {expires_at})")
return result
Related References
- Watchlists API Reference - Full API documentation
- Create Watchlist - Create watchlist endpoint
- Add Subjects - Add subjects to watchlist
- Trigger Monitoring - On-demand screening
- List Check API - One-time checks
- Validate Person or Company - Individual entity validation guide