Real-time Serverless AI news sentiment analysis using Kinesis Data Streams, Lambda and SageMaker

Dr. Marios Skevofylakas
Data Scientist Data Scientist
Jason Ramchandani
Lead Developer Advocate Lead Developer Advocate
Dr. Haykaz Aramyan
Developer Advocate Developer Advocate

Overview

In this Blueprint we present stage II of our real-time serverless news sentiment AI inference prototype which integrates with the first stage of our AWS architecture. This is depicted ias the highlighted section in the diagram below: 

Here, we discuss how Kinesis, that we have already connected to the Amazon EC2 instance presented in Real-time news ingestion using Amazon EC2 and Kinesis Data Streams Blueprint can trigger an AWS Lambda function. We use the lambda function to connect and send news headlines to a Amazon SageMaker FinBERT endpoint. The latter, returns AI inference in the form of the calculated sentiment and the score for the news item. Next, we will showcase how the lambda function wraps the news item with this new information and stores it in a Amazon DynamoDB. Further, we will present how to create all these AWS services, their access control, and integrations fully automatically through Terraform (infrastructure-as-code) configuration scripts. 

The Blueprint presents both the Terraform scripts for automated creation of the AWS services and the lambda function which includes the appropriate python script that connects to these services. Moreover, the Terraform configurations and their respective parts in the AWS Lambda function are introduced in a stepwise approach. First, we show how Lambda handles the kinesis response, then we add the Amazon SageMaker invocation and response handling and finally the section responsible for storing the AI enhanced data items in the Amazon DynamoDB. 

AWS Lambda

To start with, let’s define a Terraform AWS Lambda configuration file for the SageMaker service. For that, we need to define two separate Terraform resources, one for the function itself and another for the event source mapping. A Terraform resource for the function looks as follows:  

    	
            

resource "aws_lambda_function" "my_nlp_lambda_function" {

    

  function_name = "NLPLambda"

  s3_bucket = aws_s3_bucket.my_nlp_bucket.bucket

  s3_key    = aws_s3_object.lambda_triggering_sagemaker.key

  runtime = "python3.7"

  handler = "lambda.lambda_handler"

  source_code_hash = data.archive_file.lambda_triggering_sagemaker.output_base64sha256

  role = aws_iam_role.nlp_application_role.arn

    

}

In this aws_lamda_function resource we provide the function name, we define the runtime as python3.7 and the name of the function that acts as the event handler. Additionally, we specify s3_bucket object and its corresponding key. The S3 object is a bucket where the lambda object will be stored. More about the S3 configuration will be discussed in the following section. We also provide a source_code_hash data base64sha256 hashing argument, this allows Terraform to track any local changes made that need to be synced on AWS. Finally, we grant the Lambda function an execution policy which enables permission to access certain AWS services. 

    	
            

resource "aws_iam_role_policy_attachment" "lambda_execution_policy" {

    role = aws_iam_role.nlp_application_role.name

    policy_arn = "arn:aws:iam::aws:policy/AWSLambda_FullAccess"

}

We note that in our use case we gave the Lambda policy full access, more granular control needs to be considered in production environments. With regards to the lambda event source mapping, we have provided the following terraform configuration:

    	
            

resource "aws_lambda_event_source_mapping" "sagemaker_lambda_event_source" {

  batch_size        = 1

  event_source_arn  = aws_kinesis_stream.nlp_kinesis_stream.arn

  enabled           = true

  function_name     = aws_lambda_function.my_nlp_lambda_function.function_name

  starting_position = "LATEST"

}

The batch size is a critical parameter that defines the maximum number of items to be ingested simultaneously from Kinesis. We set that to 1 as we have designed a single item logic within our Lambda. Higher batch numbers should be considered to maximise performance and lower costs in production. In event_source_arn we provide the Access Resource Name (ARN) of the Kinesis service and in the function_name we refer to the Lambda function we defined above. This is how we can connect the Lambda function to the Kinesis. The position in the stream where AWS Lambda should start reading is set to the “LATEST”, this means that the Lambda function will always try to ingest the last item presented in the Kinesis queue. 

Amazon S3 

As was mentioned earlier, the Lambda service stores the function code in an internal S3 bucket which we configured using several Terraform resources. AWS Lambda expects to find the code in a zipped format in S3 and to accomplish that through Terraform we define a data transform. Parameters include the actual code file, the output type set to zip, the source directory, and the output path.  

    	
            

data "archive_file" "lambda_triggering_sagemaker" { 

  type = "zip" 

  source_dir  = " " # provide source of the object location 

  output_path = " " # provide output path including the file format (e.g. lambda.zip) 

In the below Terraform resources - we specify the bucket name and it’s Access Control List (ACL) which enables us to manage access to buckets and objects. In our case we set the S3 acl as “private” which gives the owner FULL_CONTROL and doesn’t allow access to any other users.

    	
            

resource "aws_s3_bucket" "my_nlp_bucket" {

  bucket = "my-nlp-bucket"

}

 

resource "aws_s3_bucket_acl" "nlp_bucket_acl" {

  bucket = aws_s3_bucket.my_nlp_bucket.bucket

  acl    = "private"

}

Finally, we define the AWS S3 object by specifying the s3 bucket ID we used above, the key which is the name of the object once in the bucket, source directory and the expected file hash in a Terraform “etag” argument.

    	
            

resource "aws_s3_object" "lambda_triggering_sagemaker" {

  bucket = aws_s3_bucket.my_nlp_bucket.id

  key    = "sagemaker_lambda.zip"

  source = data.archive_file.lambda_triggering_sagemaker.output_path

  etag   = filemd5(data.archive_file.lambda_triggering_sagemaker.output_path)

}

Implementing the Lambda function: Step 1 - Handling Kinesis event

Having built the first set of AWS services, Kinesis, Lambda and the S3 bucket, let’s now start implementing the Lambda function script. Below, we define the lambda_handler python function which we build up throughout this blueprint, by adding respective building blocks -   handling the Kinesis response, invoking the SageMaker endpoint and creating the DynamoDB entry. To begin let's define the lambda_handler to deal with the Kinesis response only:

    	
            

def lambda_handler(event, context): 

 

    print(event) 

    output = base64.b64decode(event['Records'][0]['kinesis']['data']) 

    data = json.dumps({"inputs": str(output, 'utf-8')}) 

    data_dict = json.loads(data) 

    print(data_dict) 

 

    return { 

        'statusCode': 200, 

    } 

The lambda_handler function mentioned above receives two parameters, the record put in the kinesis stream in the form of an event, and the context. Its output is the event itself and the decoded news headline we have pushed to Kinesis. Below is the screenshot of the event registered in Amazon CloudWatch.

In the event output above we can also see the metadata attached, amongst which, in the scope of this application, we are mainly interested in the data we have pushed to Kinesis. We can see that the value under the “data” key is hashed and therefore needs to be decrypted to push it to the FinBert SageMaker endpoint. Decryption is accomplished through the “output” variable which is then dumped in a JSON structure with a specified key as required for SageMaker endpoint invocation. The final data looks like the following:

Amazon Sagemaker 

To invoke the SageMaker endpoint and enhance our stream with AI inference, we first need to implement and deploy a SageMaker AI model. As mentioned in the horizontal article of the architecture, to enhance the news items with sentiment inference, we will deploy a pre-trained publicly available FinBert model from HuggingFace. The following Terraform script creates the endpoint and deploys the FinBert model in it. We will be invoking this endpoint from our lambda_function. 

    	
            

module "huggingface_sagemaker" {

  source               = "philschmid/sagemaker-huggingface/aws"

  version              = "0.5.0"

  name_prefix          = "distilbert"

  pytorch_version      = "1.9.1"

  transformers_version = "4.12.3"

  instance_type        = "ml.m5.large"

  instance_count       = 1

  hf_model_id          = "yiyanghkust/finbert-tone"

  hf_task              = "text-classification"

}

This module is configured following the instructions from the Terraform documentation, will create all the necessary resources to deploy the FinBert model on Amazon SageMaker. The FinBert model itself, is a pre-trained NLP sentiment model tailor made to analyse financial documents. It is built by training the BERT language model in the finance domain, using Reuters TRC2 financial corpus and then fine-tuning it for financial sentiment classification. After the model is adapted to the domain-specific language, it is trained with labeled data to solve the sentiment classification task. 

Building the Lambda function: Step 2 – Invoking SageMaker endpoint

We can now add the SageMaker block to our lambda_handler function we defined above, to get a sentiment enhanced news headline by invoking the SageMaker endpoint. To do that, we need to create a SageMaker client using boto3. 

    	
            runtime = boto3.client(service_name='sagemaker-runtime')
        
        
    
    	
            

def lambda_handler(event, context): 

    ... 

    response = runtime.invoke_endpoint(EndpointName="distilbert-endpoint", 

                                       ContentType='application/json', Body=data) 

    print(response) 

    response_body = response['Body'] 

    response_str = response_body.read().decode('utf-8') 

    response_str = response_str.replace('[', '').replace(']', '') 

    response_dict = json.loads(response_str) 

 

    response_dict['headline'] = data_dict['inputs'] 

    response_dict['id'] = context.aws_request_id 

    response_dict['score'] = str(response_dict['score']) 

    print(response_dict) 

 

    ... 

The first part of the lambda_handler function invokes the SageMaker endpoint and receives the response, decodes and unpacks the data:

In the response output above we can again see several items of metadata, for our use case, we are interested in the Body field. The “Body” key is a boto object which we stringify in the response_str variable. Finally, we dump the extracted response string into a JSON structure, and add the headline from the Kinesis output, the ID from the context argument of the lambda_function and the score from the endpoint request body. The bundled output looks like:

Amazon Dynamo DB 

The final step is to store our enhanced news item in a database and in our case, we will be using AWS DynamoDB. DynamoDB is a serverless key-value NoSQL database able to support high performance applications at scale. To automate the creation process, we use an AWS Dynamo Table resource in Terraform.  

    	
            

resource "aws_dynamodb_table" "nlp_news_headlines_table" { 

  name             = "nlp_news_headlines_table" 

  billing_mode     = "PROVISIONED" 

  read_capacity    = 20 

  write_capacity   = 20 

  hash_key         = "id" 

  stream_enabled   = true 

  stream_view_type = "NEW_IMAGE" 

  attribute { 

    name = "id" 

    type = "S" 

  } 

In this resource, we provide the name of the table as well as billing mode and read/write capacities. The latter is mandatory if the billing_mode is set to “PROVISIONED”. This billing mode is more cost efficient compared to the “ON_DEMAND” mode and is suited for use cases with apriori traffic expectation, as it is in our case. Additionally, we provide a hash_key which is similar to the primary_key we see in other databases. According to Terraform documentation, we are required to define the hash_key as an attribute in our aws_dynamodb_table resource. Finally, we enable DynamoDB streams which will allow us to capture a new item event from a lambda function and thereby implement DynamoDB triggering.  This new Lambda layer and its integration with Amazon MQ is the topic of our next Blueprint in the series.

Building the Lambda function: Step 3 – Pushing enhanced data to DynamoDB

It is now time to add the relevant DynamoDB code in our lambda_handler function that will store the AI enhanced items in the Dynamo Table. To accomplish that, we need to create a DynamoDB resource and a Table using boto3.  

    	
            

dynamodb = boto3.resource('dynamodb') 

nlp_news_table = dynamodb.Table('nlp_news_headlines_table') 

    	
            

def lambda_handler(event, context): 

    ... 

 

    nlp_news_table.put_item(Item=response_dict) 

 

    ... 

As we can see from the snippet above, the only operation we need to implement to store the data in the table is the put_item function on the created nlp_news_table. In the function parameters, we provide the response_dict we have already created using the response from the SageMaker endpoint. 

Testing the structure so far

We now have everything set-up for real-time serverless AI news sentiment analysis using AWS Kinesis, Lambda and SageMaker. Let’s apply the Terraform configurations, launch the instances we presented so far and observe the results. To do so, we need to place all the .tf and .py scripts in a single folder, open the terminal or console, navigate to that folder and run the terraform init command which will initialize a working directory containing all the necessary Terraform configuration files. 

    	
            terraform init
        
        
    
    	
            terraform apply
        
        
    

Above we can see all the resources that have been created through Terraform along with the time it took to launch the instances on AWS. In total the process should take around three minutes with the longest action being the deployment of the SageMaker model and the endpoint.

To test if everything we have created is working, we can connect to the EC2 instance, navigate to our ec2_main_stream.py file and run the scripts: 

    	
            

cd ../home/ubuntu

python3.8 ec2_main_stream.py 

What we expect is to see the AI enhanced news headlines stored in the DynamoDB table we have created. To do so, we can navigate to the DynamoDb service in our AWS Management console where we can see the following table which includes our AI enhanced data.

Conclusion

This second Blueprint in the series brings the prototype to a state where we can ingest real-time news headlines through our Amazon  EC2 instance, push the stream to Kinesis that will in turn raise a trigger on a Lambda function. The Lambda function will connect to the SageMaker endpoint retrieve AI inference and bundle it with the source data item. Finally, the enhanced news headline item is stored in a DynamoDB table. Everything is automatically configured and integrated through a series of Terraform scripts.  

In our next Blueprint we finalise the architecture by listening to new insertion events in DynamoDB and pushing the items to a Rabbit MQ Broker hosted in Amazon MQ. Once the items are in queue in the Broker, we present a typical Consumer that will ingest these items to close the lifecycle of the application. You are now ready to move to Real-time streaming using Dynamo DB streams – Lambdas and Amazon MQ.

For more information and resources on this prototype you can also visit the AWS Machine Learning Blog:
Enriching real-time news streams with the Refinitiv Data Library, AWS services, and Amazon SageMaker.