News and Research messages in Python
Last update October 2020
Operating System Any
Interpreter Python 3.x

Introduction

This tutorial will build upon the previous ones, and develop a news headline application. It demonstrates how to access the message-services delivery mechanism within RDP. Please read the article which describes the basics of the Message Delivery. Although this tutorial is geared towards News, similar mechanism is also applicable to Research message delivery. Research sample - researchMessages.py is included in the download code sample. The Refinitiv Data Platform Interactive Reference Guide can be used to view the API method signature and workflow. We will walk through a python sample, which will:

  1. Authenticate with RDP
  2. Start the News message-services subscription
  3. Get the cloud credentials
  4. Poll and remove messages
  5. Decrypt message
  6. Close open subscription

Following flowchart demonstrates the workflow implemented in the provided code sample - newsMessages.py:

Prerequisite

Tutorial sample requires following libraries to be installed

    	
            

pip install boto3

pip install pycryptodome

1. Authenticate with RDP

The Authorization sample in Python tutorial shows how to authenticate with RDP. We will use the class rdpToken.py, to get the latest RDP Access token whenever needed. The access token typically expires every 5 minutes, so it is automatically renewed as needed.

    	
            

# get the latest access token

accessToken = rdpToken.getToken()

2. Start the News message subscription

The alert subscription is started by expressing an interest in news headline or stories. The code snippet to do that is implemented in method subscribeToNews() and sends in the JSON POST request:

    	
            

requestData = {

  "transport": {

    "transportType": "AWS-SQS"

  },

  "payloadVersion": "2.0"

};

to https://api.refinitiv.com/message-services/<RDP_VERSION>/news-headlines/subscriptions. After a trivial check for a valid status code of HTTP 200, the endpoint parameter is extracted from the JSON response.

The news request above will subscribe to all the english language news. To get top breaking news only following filter can be used:

    	
            

requestData = {

  "transport": {

    "transportType": "AWS-SQS"

  },

  "filter": {

    "query": {

      "type": "operator",

      "operator": "and",

      "operands": [{

          "type": "freetext",

          "match": "contains",

          "value": "TOP NEWS"

        }, {

          "type": "language",

          "value": "L:en"

        }

      ]

    }

  }

}

3. Get the cloud credentials

Similar to message-services subscription, the queue endpoint is used to get the credentials to access this queue. The implementation in getCloudCredentials() follows exact implementation as described above in step #2, and extracts the accessKeyId, sessionToken etc, which will enable the application to now poll this endpoint

4. Poll and remove messages

At this point, message service will start pushing the events which meet our criteria into the message queue. The application now has to check the queue for available events and remove them. This mechanism of polling and retrieving the messages is specific to the queue provider. As per our request, the queue destination is AWS SQS (only supported provider at the moment). Please refer to this documentation for more details on SQS service.

The REST interface for SQS is described here. As is evident from this documentation, there is a lot of overhead in creating the HTTP request and parsing and interpreting the XML response. A user application can directly use the REST API for poll, retrieve and delete actions on the queue. AWS also provides value add libraries in various programming language, which abstract the underlying heavy lifting of login and XML processing, into simple API calls and callbacks. We will use the AWS SDK for Python (Boto3) for this.

The queue specific functionality is implemented in the sqsQueue.py. This module takes care of polling, deleting and decrypting the items in SQS queue. The startPolling() method starts with instantiating a session from boto3 library, passing in the queue endpoint and tokens. Once the application is logged in, the queue is polled in 10 second blocking intervals. The response as a result of this poll is a python dictionary object. All the nested messages are extracted and sent for decryption. The processed message is then deleted from queue.

    	
            

# create a SQS session

session = boto3.Session(

  aws_access_key_id = accessID,

  aws_secret_access_key = secretKey,

  aws_session_token = sessionToken,

  region_name = REGION

)

 

sqs = session.client('sqs')

 

print('Polling messages from queue...')

while 1: 

  resp = sqs.receive_message(QueueUrl = endpoint, WaitTimeSeconds = 10)

 

  if 'Messages' in resp:

    messages = resp['Messages']

    # print and remove all the nested messages

    for message in messages:

      mBody = message['Body']

      # decrypt this message

      m = decrypt(cryptographyKey, mBody)

      printPayload(m)

      # remove this message

      sqs.delete_message(QueueUrl = endpoint, ReceiptHandle = message['ReceiptHandle'])

5. Decrypt message

The received message payload is encrypted, and can only be decrypted by the application holding the cryptographyKey. This key was provided to our application when the subscription service was started. The payload is first encrypted using AES256 with GCM mode, and then encoded with base64. See the section Decrypting the message for more information on payload and key lengths. The sample uses PyCryptodome library, since it provides support for Galois/Counter Mode mode. The decrypt() method implementation is:

    	
            

from Crypto.Cipher import AES

.

.

# constants

GCM_AAD_LENGTH = 16

GCM_TAG_LENGTH = 16

GCM_NONCE_LENGTH = 12

 

# base64 decode key and text

key = base64.b64decode(key)

cipherText = base64.b64decode(source)

 

# extract nonce, tag and message

aad = cipherText[:GCM_AAD_LENGTH]

nonce = aad[-GCM_NONCE_LENGTH:] 

tag = cipherText[-GCM_TAG_LENGTH:]

encMessage = cipherText[GCM_AAD_LENGTH:-GCM_TAG_LENGTH]

 

cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)

cipher.update(aad)

# decrypt and verify

decMessage = cipher.decrypt_and_verify(encMessage, tag)

return decMessage

6. Close open subscription

Any open alerts subscription will need to be explicitly closed, to avoid exceeding the subscription limit. RDP will continue to push events into the queue, which will automatically expire after 24 hours if not processed. It is applications responsibility to close any subscriptions it has opened. Closing the subscription is easy - with a single HTTP DELETE message to the endpoint that was used to create one (https://api.refinitiv.com/message-services/<RDP_VERSION>/news-headlines/subscriptions). If subscriptionID is provided as a parameter that particular subscription is closed, otherwise all the open subscriptions will be closed.

    	
            

# get the latest access token

accessToken = rdpToken.getToken()

 

if currentSubscriptionID:

  print("Deleting the open message subscription")

  dResp = requests.delete(RESOURCE_ENDPOINT, ..., params = {"subscriptionID": currentSubscriptionID});

else:

  print("Deleting ALL open message subscription")

  dResp = requests.delete(RESOURCE_ENDPOINT, ...);

There is no returned data from this Delete subscription call. In this tutorial sample, we register a shutdown hook to receive notification that our application is going to exit, and invoke the removeSubscription method before existing.

    	
            

# unsubscribe before shutting down

atexit.register(removeSubscription)

Run

The output from the provided sample is shown below. The application will continue polling messages, until user closes by pressing Control-Break:

    	
            

>>> python newsMessages.py -s

Subscribing to news headline messages...

Reading the token from: token.txt

  Queue endpoint: https://sqs.us-east-1.amazonaws.com/642157181326/sqs-edsalerts-main-prod-usersqs-a68d1557-414c-4082-8327-049bd27d2a9d

  Subscription ID: a68d1557-414c-4082-8327-049bd27d2a9d

Getting credentials to connect to AWS Queue...

Reading the token from: token.txt

Queue access ID: ASIAZLA4M7GHB6LUGQ4R

Getting news, press BREAK to exit and delete subscription...

Polling messages from queue...

{

  "attributes": [{

      "domain": {

        "type": "string",

        "value": "headline"

      }

    }

  ],

  "envelopeVersion": "1.0",

  "ecpMessageID": "urn:newsml:reuters.com:20190709:nL4N24A2TR:2",

  "sourceSeqNo": 2295970,

  "distributionSeqNo": 4,

  "sourceTimestamp": "2020-10-08T14:27:01.027Z",

  "distributionTimestamp": "2020-10-08T14:27:01.470Z",

  "payloadVersion": "2.0",

  "subscriptionID": "a68d1557-414c-4082-8327-049bd27d2a9d",

  "payload": {

    "newsItem": {

      "_guid": "tag:reuters.com,2019:newsml_L4N24A2TR",

      "itemMeta": {

        "pubStatus": {

          "_qcode": "stat:canceled"

        },

        "fileName": {

          "$": "2019-07-09T142222Z_1_L4N24A2TR_RTRLYNXT_0_TENNIS-WIMBLEDON.XML"

        },

        "role": {

          "_qcode": "itemRole:N"

        },

        "profile": {

          "$": "SNI-Text",

          "_versioninfo": "00.00.01"

        },

        "versionCreated": {

          "$": "2020-10-08T14:27:01.027"

        },

        "generator": [{

            "$": "LYNX:addT:001",

            "_versioninfo": "1.0.0.21"

          }

        ],

.

.

.

User requested break, cleaning up...

Reading the token from: token.txt

Deleting the open message subscription

News alerts unsubscribed!