Skip to main content

Overview

The KYC API uses AWS Cognito for user authentication. This guide shows you how to authenticate users and obtain JWT tokens to make API requests.

Cognito Configuration

Staging Environment

  • User Pool ID: us-east-2_oGu06xbPS
  • App Client ID: 2raj3isqgeivk0m4b94858hice
  • Region: us-east-2

Production Environment

  • User Pool ID: us-east-2_L37NAaTbI
  • App Client ID: 17motn53rgao4bk26ojguv0ldu
  • Region: us-east-2

Authentication Flow

  1. User provides username and password
  2. Authenticate with AWS Cognito
  3. Receive ID token (JWT) and Access token
  4. Use ID token in Authorization: Bearer <id_token> header for API requests

Python Authentication

import boto3
import requests

# Configuration
STAGING_CONFIG = {
    "region": "us-east-2",
    "user_pool_id": "us-east-2_oGu06xbPS",
    "client_id": "2raj3isqgeivk0m4b94858hice"
}

PROD_CONFIG = {
    "region": "us-east-2",
    "user_pool_id": "us-east-2_L37NAaTbI",
    "client_id": "17motn53rgao4bk26ojguv0ldu"
}

def authenticate_user(username, password, environment="staging"):
    """
    Authenticate user with Cognito and return ID token.
    
    Args:
        username: Cognito username or email
        password: User password
        environment: "staging" or "prod"
    
    Returns:
        dict with 'id_token', 'access_token', and 'refresh_token'
    """
    config = STAGING_CONFIG if environment == "staging" else PROD_CONFIG
    
    client = boto3.client('cognito-idp', region_name=config["region"])
    
    try:
        response = client.initiate_auth(
            ClientId=config["client_id"],
            AuthFlow='USER_PASSWORD_AUTH',
            AuthParameters={
                'USERNAME': username,
                'PASSWORD': password
            }
        )
        
        tokens = response['AuthenticationResult']
        return {
            'id_token': tokens['IdToken'],
            'access_token': tokens['AccessToken'],
            'refresh_token': tokens['RefreshToken'],
            'expires_in': tokens['ExpiresIn']
        }
    except client.exceptions.NotAuthorizedException:
        raise Exception("Invalid username or password")
    except Exception as e:
        raise Exception(f"Authentication failed: {str(e)}")

# Usage
try:
    tokens = authenticate_user("user@example.com", "YourPassword123!", "staging")
    
    # Use ID token for API requests
    id_token = tokens['id_token']
    
    # Make API request
    api_url = "https://stg.kyc.legaltalent.ai/kyc"
    headers = {
        "Authorization": f"Bearer {id_token}",
        "Content-Type": "application/json"
    }
    
    response = requests.post(
        api_url,
        headers=headers,
        json={
            "subject": {"full_name": "John Doe"},
            "list_name": "ofac"
        }
    )
    
    print(response.json())
    
except Exception as e:
    print(f"Error: {e}")

JavaScript/TypeScript Authentication

// Using AWS SDK v3
import { CognitoIdentityProviderClient, InitiateAuthCommand } from '@aws-sdk/client-cognito-identity-provider';

const STAGING_CONFIG = {
  region: 'us-east-2',
  userPoolId: 'us-east-2_oGu06xbPS',
  clientId: '2raj3isqgeivk0m4b94858hice'
};

const PROD_CONFIG = {
  region: 'us-east-2',
  userPoolId: 'us-east-2_L37NAaTbI',
  clientId: '17motn53rgao4bk26ojguv0ldu'
};

async function authenticateUser(username, password, environment = 'staging') {
  const config = environment === 'staging' ? STAGING_CONFIG : PROD_CONFIG;
  
  const client = new CognitoIdentityProviderClient({ region: config.region });
  
  try {
    const command = new InitiateAuthCommand({
      ClientId: config.clientId,
      AuthFlow: 'USER_PASSWORD_AUTH',
      AuthParameters: {
        USERNAME: username,
        PASSWORD: password
      }
    });
    
    const response = await client.send(command);
    const tokens = response.AuthenticationResult;
    
    return {
      idToken: tokens.IdToken,
      accessToken: tokens.AccessToken,
      refreshToken: tokens.RefreshToken,
      expiresIn: tokens.ExpiresIn
    };
  } catch (error) {
    if (error.name === 'NotAuthorizedException') {
      throw new Error('Invalid username or password');
    }
    throw new Error(`Authentication failed: ${error.message}`);
  }
}

// Usage
async function makeApiCall() {
  try {
    const tokens = await authenticateUser('user@example.com', 'YourPassword123!', 'staging');
    
    // Use ID token for API requests
    const apiUrl = 'https://stg.kyc.legaltalent.ai/kyc';
    const response = await fetch(apiUrl, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${tokens.idToken}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        subject: { full_name: 'John Doe' },
        list_name: 'ofac'
      })
    });
    
    const data = await response.json();
    console.log(data);
  } catch (error) {
    console.error('Error:', error.message);
  }
}

Complete Authentication Class

import boto3
from botocore.exceptions import ClientError
from typing import Optional, Dict
import time

class CognitoAuth:
    """AWS Cognito authentication client for KYC API."""
    
    STAGING = {
        "region": "us-east-2",
        "user_pool_id": "us-east-2_oGu06xbPS",
        "client_id": "2raj3isqgeivk0m4b94858hice"
    }
    
    PROD = {
        "region": "us-east-2",
        "user_pool_id": "us-east-2_L37NAaTbI",
        "client_id": "17motn53rgao4bk26ojguv0ldu"
    }
    
    def __init__(self, environment="staging"):
        """
        Initialize Cognito client.
        
        Args:
            environment: "staging" or "prod"
        """
        self.config = self.STAGING if environment == "staging" else self.PROD
        self.client = boto3.client('cognito-idp', region_name=self.config["region"])
        self.tokens = None
        self.token_expiry = None
    
    def login(self, username: str, password: str) -> Dict[str, str]:
        """
        Authenticate user and store tokens.
        
        Args:
            username: Cognito username or email
            password: User password
        
        Returns:
            dict with tokens
        """
        try:
            response = self.client.initiate_auth(
                ClientId=self.config["client_id"],
                AuthFlow='USER_PASSWORD_AUTH',
                AuthParameters={
                    'USERNAME': username,
                    'PASSWORD': password
                }
            )
            
            tokens = response['AuthenticationResult']
            self.tokens = {
                'id_token': tokens['IdToken'],
                'access_token': tokens['AccessToken'],
                'refresh_token': tokens['RefreshToken']
            }
            self.token_expiry = time.time() + tokens['ExpiresIn']
            
            return self.tokens
            
        except ClientError as e:
            error_code = e.response['Error']['Code']
            if error_code == 'NotAuthorizedException':
                raise Exception("Invalid username or password")
            raise Exception(f"Authentication failed: {str(e)}")
    
    def get_id_token(self) -> Optional[str]:
        """
        Get current ID token, refreshing if necessary.
        
        Returns:
            ID token string or None
        """
        if not self.tokens:
            return None
        
        # Refresh if expired or expiring soon (within 5 minutes)
        if time.time() >= (self.token_expiry - 300):
            if self.tokens.get('refresh_token'):
                self.refresh()
        
        return self.tokens.get('id_token')
    
    def refresh(self) -> Dict[str, str]:
        """
        Refresh tokens using refresh token.
        
        Returns:
            dict with new tokens
        """
        if not self.tokens or not self.tokens.get('refresh_token'):
            raise Exception("No refresh token available")
        
        try:
            response = self.client.initiate_auth(
                ClientId=self.config["client_id"],
                AuthFlow='REFRESH_TOKEN_AUTH',
                AuthParameters={
                    'REFRESH_TOKEN': self.tokens['refresh_token']
                }
            )
            
            tokens = response['AuthenticationResult']
            self.tokens['id_token'] = tokens['IdToken']
            self.tokens['access_token'] = tokens['AccessToken']
            self.token_expiry = time.time() + tokens['ExpiresIn']
            
            return self.tokens
            
        except Exception as e:
            raise Exception(f"Token refresh failed: {str(e)}")
    
    def get_authorization_header(self) -> str:
        """
        Get Authorization header value.
        
        Returns:
            "Bearer <id_token>" string
        """
        id_token = self.get_id_token()
        if not id_token:
            raise Exception("Not authenticated. Call login() first.")
        return f"Bearer {id_token}"

# Usage
auth = CognitoAuth(environment="staging")

# Login
auth.login("user@example.com", "YourPassword123!")

# Get authorization header for API requests
headers = {
    "Authorization": auth.get_authorization_header(),
    "Content-Type": "application/json"
}

# Token auto-refreshes when needed
id_token = auth.get_id_token()

Making API Requests

import requests

# After authentication
auth = CognitoAuth(environment="staging")
auth.login("user@example.com", "YourPassword123!")

# Example: List check
def make_kyc_check(subject_name, list_name="ofac"):
    url = "https://stg.kyc.legaltalent.ai/kyc"
    headers = {
        "Authorization": auth.get_authorization_header(),
        "Content-Type": "application/json"
    }
    
    payload = {
        "subject": {"full_name": subject_name},
        "list_name": list_name
    }
    
    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()
    return response.json()

# Usage
result = make_kyc_check("John Doe", "ofac")
print(result)

# Example: Create watchlist
def create_watchlist(name, lists_to_monitor=None):
    url = "https://stg.kyc.legaltalent.ai/kyc/watchlists"
    headers = {
        "Authorization": auth.get_authorization_header(),
        "Content-Type": "application/json"
    }
    
    payload = {"name": name}
    if lists_to_monitor:
        payload["lists_to_monitor"] = lists_to_monitor
    
    response = requests.post(url, json=payload, headers=headers)
    response.raise_for_status()
    return response.json()

# Usage
watchlist = create_watchlist("My Watchlist", ["ofac", "un"])

Error Handling

from botocore.exceptions import ClientError

def safe_authenticate(username, password, environment="staging"):
    """
    Authenticate with proper error handling.
    """
    try:
        auth = CognitoAuth(environment=environment)
        tokens = auth.login(username, password)
        return auth
        
    except ClientError as e:
        error_code = e.response['Error']['Code']
        
        if error_code == 'NotAuthorizedException':
            raise Exception("Invalid username or password")
        elif error_code == 'UserNotFoundException':
            raise Exception("User not found")
        elif error_code == 'UserNotConfirmedException':
            raise Exception("User account not confirmed. Please verify your email.")
        elif error_code == 'PasswordResetRequiredException':
            raise Exception("Password reset required")
        elif error_code == 'TooManyRequestsException':
            raise Exception("Too many requests. Please try again later.")
        else:
            raise Exception(f"Authentication error: {error_code}")
    
    except Exception as e:
        raise Exception(f"Unexpected error: {str(e)}")

# Usage
try:
    auth = safe_authenticate("user@example.com", "password123", "staging")
    print("Authentication successful!")
except Exception as e:
    print(f"Authentication failed: {e}")

Best Practices

  1. Store Tokens Securely: Never expose tokens in client-side code or logs
  2. Implement Token Refresh: Automatically refresh tokens before they expire
  3. Handle Errors Gracefully: Implement proper error handling for all authentication flows
  4. Use Environment Variables: Store Cognito configuration in environment variables
  5. Validate Tokens: Always verify token validity before making API requests