Overview
The KYC API uses AWS Cognito for user authentication. This guide shows you how to authenticate users and obtain JWT tokens to make API requests.Cognito Configuration
Staging Environment
- User Pool ID:
us-east-2_oGu06xbPS - App Client ID:
2raj3isqgeivk0m4b94858hice - Region:
us-east-2
Production Environment
- User Pool ID:
us-east-2_L37NAaTbI - App Client ID:
17motn53rgao4bk26ojguv0ldu - Region:
us-east-2
Authentication Flow
- User provides username and password
- Authenticate with AWS Cognito
- Receive ID token (JWT) and Access token
- Use ID token in
Authorization: Bearer <id_token>header for API requests
Python Authentication
Copy
import boto3
import requests
# Configuration
STAGING_CONFIG = {
"region": "us-east-2",
"user_pool_id": "us-east-2_oGu06xbPS",
"client_id": "2raj3isqgeivk0m4b94858hice"
}
PROD_CONFIG = {
"region": "us-east-2",
"user_pool_id": "us-east-2_L37NAaTbI",
"client_id": "17motn53rgao4bk26ojguv0ldu"
}
def authenticate_user(username, password, environment="staging"):
"""
Authenticate user with Cognito and return ID token.
Args:
username: Cognito username or email
password: User password
environment: "staging" or "prod"
Returns:
dict with 'id_token', 'access_token', and 'refresh_token'
"""
config = STAGING_CONFIG if environment == "staging" else PROD_CONFIG
client = boto3.client('cognito-idp', region_name=config["region"])
try:
response = client.initiate_auth(
ClientId=config["client_id"],
AuthFlow='USER_PASSWORD_AUTH',
AuthParameters={
'USERNAME': username,
'PASSWORD': password
}
)
tokens = response['AuthenticationResult']
return {
'id_token': tokens['IdToken'],
'access_token': tokens['AccessToken'],
'refresh_token': tokens['RefreshToken'],
'expires_in': tokens['ExpiresIn']
}
except client.exceptions.NotAuthorizedException:
raise Exception("Invalid username or password")
except Exception as e:
raise Exception(f"Authentication failed: {str(e)}")
# Usage
try:
tokens = authenticate_user("user@example.com", "YourPassword123!", "staging")
# Use ID token for API requests
id_token = tokens['id_token']
# Make API request
api_url = "https://stg.kyc.legaltalent.ai/kyc"
headers = {
"Authorization": f"Bearer {id_token}",
"Content-Type": "application/json"
}
response = requests.post(
api_url,
headers=headers,
json={
"subject": {"full_name": "John Doe"},
"list_name": "ofac"
}
)
print(response.json())
except Exception as e:
print(f"Error: {e}")
JavaScript/TypeScript Authentication
Copy
// Using AWS SDK v3
import { CognitoIdentityProviderClient, InitiateAuthCommand } from '@aws-sdk/client-cognito-identity-provider';
const STAGING_CONFIG = {
region: 'us-east-2',
userPoolId: 'us-east-2_oGu06xbPS',
clientId: '2raj3isqgeivk0m4b94858hice'
};
const PROD_CONFIG = {
region: 'us-east-2',
userPoolId: 'us-east-2_L37NAaTbI',
clientId: '17motn53rgao4bk26ojguv0ldu'
};
async function authenticateUser(username, password, environment = 'staging') {
const config = environment === 'staging' ? STAGING_CONFIG : PROD_CONFIG;
const client = new CognitoIdentityProviderClient({ region: config.region });
try {
const command = new InitiateAuthCommand({
ClientId: config.clientId,
AuthFlow: 'USER_PASSWORD_AUTH',
AuthParameters: {
USERNAME: username,
PASSWORD: password
}
});
const response = await client.send(command);
const tokens = response.AuthenticationResult;
return {
idToken: tokens.IdToken,
accessToken: tokens.AccessToken,
refreshToken: tokens.RefreshToken,
expiresIn: tokens.ExpiresIn
};
} catch (error) {
if (error.name === 'NotAuthorizedException') {
throw new Error('Invalid username or password');
}
throw new Error(`Authentication failed: ${error.message}`);
}
}
// Usage
async function makeApiCall() {
try {
const tokens = await authenticateUser('user@example.com', 'YourPassword123!', 'staging');
// Use ID token for API requests
const apiUrl = 'https://stg.kyc.legaltalent.ai/kyc';
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
'Authorization': `Bearer ${tokens.idToken}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
subject: { full_name: 'John Doe' },
list_name: 'ofac'
})
});
const data = await response.json();
console.log(data);
} catch (error) {
console.error('Error:', error.message);
}
}
Complete Authentication Class
Copy
import boto3
from botocore.exceptions import ClientError
from typing import Optional, Dict
import time
class CognitoAuth:
"""AWS Cognito authentication client for KYC API."""
STAGING = {
"region": "us-east-2",
"user_pool_id": "us-east-2_oGu06xbPS",
"client_id": "2raj3isqgeivk0m4b94858hice"
}
PROD = {
"region": "us-east-2",
"user_pool_id": "us-east-2_L37NAaTbI",
"client_id": "17motn53rgao4bk26ojguv0ldu"
}
def __init__(self, environment="staging"):
"""
Initialize Cognito client.
Args:
environment: "staging" or "prod"
"""
self.config = self.STAGING if environment == "staging" else self.PROD
self.client = boto3.client('cognito-idp', region_name=self.config["region"])
self.tokens = None
self.token_expiry = None
def login(self, username: str, password: str) -> Dict[str, str]:
"""
Authenticate user and store tokens.
Args:
username: Cognito username or email
password: User password
Returns:
dict with tokens
"""
try:
response = self.client.initiate_auth(
ClientId=self.config["client_id"],
AuthFlow='USER_PASSWORD_AUTH',
AuthParameters={
'USERNAME': username,
'PASSWORD': password
}
)
tokens = response['AuthenticationResult']
self.tokens = {
'id_token': tokens['IdToken'],
'access_token': tokens['AccessToken'],
'refresh_token': tokens['RefreshToken']
}
self.token_expiry = time.time() + tokens['ExpiresIn']
return self.tokens
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NotAuthorizedException':
raise Exception("Invalid username or password")
raise Exception(f"Authentication failed: {str(e)}")
def get_id_token(self) -> Optional[str]:
"""
Get current ID token, refreshing if necessary.
Returns:
ID token string or None
"""
if not self.tokens:
return None
# Refresh if expired or expiring soon (within 5 minutes)
if time.time() >= (self.token_expiry - 300):
if self.tokens.get('refresh_token'):
self.refresh()
return self.tokens.get('id_token')
def refresh(self) -> Dict[str, str]:
"""
Refresh tokens using refresh token.
Returns:
dict with new tokens
"""
if not self.tokens or not self.tokens.get('refresh_token'):
raise Exception("No refresh token available")
try:
response = self.client.initiate_auth(
ClientId=self.config["client_id"],
AuthFlow='REFRESH_TOKEN_AUTH',
AuthParameters={
'REFRESH_TOKEN': self.tokens['refresh_token']
}
)
tokens = response['AuthenticationResult']
self.tokens['id_token'] = tokens['IdToken']
self.tokens['access_token'] = tokens['AccessToken']
self.token_expiry = time.time() + tokens['ExpiresIn']
return self.tokens
except Exception as e:
raise Exception(f"Token refresh failed: {str(e)}")
def get_authorization_header(self) -> str:
"""
Get Authorization header value.
Returns:
"Bearer <id_token>" string
"""
id_token = self.get_id_token()
if not id_token:
raise Exception("Not authenticated. Call login() first.")
return f"Bearer {id_token}"
# Usage
auth = CognitoAuth(environment="staging")
# Login
auth.login("user@example.com", "YourPassword123!")
# Get authorization header for API requests
headers = {
"Authorization": auth.get_authorization_header(),
"Content-Type": "application/json"
}
# Token auto-refreshes when needed
id_token = auth.get_id_token()
Making API Requests
Copy
import requests
# After authentication
auth = CognitoAuth(environment="staging")
auth.login("user@example.com", "YourPassword123!")
# Example: List check
def make_kyc_check(subject_name, list_name="ofac"):
url = "https://stg.kyc.legaltalent.ai/kyc"
headers = {
"Authorization": auth.get_authorization_header(),
"Content-Type": "application/json"
}
payload = {
"subject": {"full_name": subject_name},
"list_name": list_name
}
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
# Usage
result = make_kyc_check("John Doe", "ofac")
print(result)
# Example: Create watchlist
def create_watchlist(name, lists_to_monitor=None):
url = "https://stg.kyc.legaltalent.ai/kyc/watchlists"
headers = {
"Authorization": auth.get_authorization_header(),
"Content-Type": "application/json"
}
payload = {"name": name}
if lists_to_monitor:
payload["lists_to_monitor"] = lists_to_monitor
response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
# Usage
watchlist = create_watchlist("My Watchlist", ["ofac", "un"])
Error Handling
Copy
from botocore.exceptions import ClientError
def safe_authenticate(username, password, environment="staging"):
"""
Authenticate with proper error handling.
"""
try:
auth = CognitoAuth(environment=environment)
tokens = auth.login(username, password)
return auth
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'NotAuthorizedException':
raise Exception("Invalid username or password")
elif error_code == 'UserNotFoundException':
raise Exception("User not found")
elif error_code == 'UserNotConfirmedException':
raise Exception("User account not confirmed. Please verify your email.")
elif error_code == 'PasswordResetRequiredException':
raise Exception("Password reset required")
elif error_code == 'TooManyRequestsException':
raise Exception("Too many requests. Please try again later.")
else:
raise Exception(f"Authentication error: {error_code}")
except Exception as e:
raise Exception(f"Unexpected error: {str(e)}")
# Usage
try:
auth = safe_authenticate("user@example.com", "password123", "staging")
print("Authentication successful!")
except Exception as e:
print(f"Authentication failed: {e}")
Best Practices
- Store Tokens Securely: Never expose tokens in client-side code or logs
- Implement Token Refresh: Automatically refresh tokens before they expire
- Handle Errors Gracefully: Implement proper error handling for all authentication flows
- Use Environment Variables: Store Cognito configuration in environment variables
- Validate Tokens: Always verify token validity before making API requests
Related References
- Authentication API Reference - Token usage and permissions
- AWS Cognito Documentation - Official AWS Cognito docs
- Python Boto3 Cognito - Boto3 Cognito client docs