Use Case Three

Refinitiv Data Platform News Middle Office Compliance Use Cases

Zoya Farberov
Developer Advocate Developer Advocate

Introduction to Use Case

How a user can make a streaming subscription for news:

  • Where he messages are tagged with PermIds (1, 2, 3, 4)
  • And the messages contain topic codes (x, y or z)
  • And, how the user can pick up the messages from an SQS queue every 24 hours at time (HH:MM:SS) and then delete the messages from the SQS queue- we are going to retrieve and remove all received messages on demand, consequently, the script can be scheduled to run every 24 hours or at any required time N using a standard scheduler

The Functionality

  • Delete
    • The subscription per subscriptionId
    •  All subscriptions of type
  • Create 
    • Subscription to news heandlines
    • Subscription to new stories

and store subscriptionId information

  • List all active subscriptions
  • Retrieve, print out and remove from the queue all queued messages per subscriptionId

The Reuse

This use case implementation:

  • Is built upon Quickstart Python example

https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-platform-apis/download

  • Uses rdpToken.py to authenticate with RDP
  • Uses sqsQueueRetrieveAll.py to work with AWS queue

Connecting Pieces

  • credentials.ini - a file that contains valid RDP credentials, one piece of infromation per line, username, password and clientId,  and that enable us to generate a valid authentication token that is required for RDP interactions
  • lastSubscribed.cfg - a file that stores that information of the last  subscription that was established, one piece of information per line, endpoint, cryptographyKey, currentSubscriptionID
  • token.txt - a file where the newly generated RDP authentication token is stored, to enable its reuse during it's valid lifespan.

Key Implementation Details

We would like to omit the description of the RDP and SQS access code that are described excellently n other materials, and focus on the key implementations details for the use case only:

  • Subscription and filtering
  • Preserve the last subscription information
  • Retrieve the messages from SQS  queue, print and remove iteratively until the queue is fully exhausted

Subscribe to News

The subscription includes filtering.  This is where we define PermIDs and TopicCodes that we are interested in 

    	
            

#==============================================

def subscribeToNews():

#==============================================

    if gHeadlines:

        RESOURCE_ENDPOINT = base_URL + category_URL + RDP_version + endpoint_URL_headlines

    else:

        RESOURCE_ENDPOINT = base_URL + category_URL + RDP_version + endpoint_URL_stories

 

# Optional filters can be applied to subscription. Filter by mulitple Topic Codes and mulitple PermIds

    requestData = {

        "transport": {

            "transportType": "AWS-SQS"

        },

        "filter": {

            "type": "operator",

            "operator": "and",

            "operands": [

            {

                "operator": "or",

                "operands": [

                {

                    "type": "newscode",

                    "value": "G:9"

                },

                {

                    "type": "newscode",

                    "value": "G:6J"

                }],

                "type": "operator"

            },

            {

                "operator": "or",

                "operands": [

                {

                    "value": "P:4295905573",

                    "type": "permid"

                },

                {

                    "value": "P:5030853586",

                    "type": "permid"

                }],

                "type": "operator"

            }

    ]},

    "filterOverrides": {},

  "maxcount": 10,

  "dateFrom": "null",

  "dateTo": "null",

  "relevanceGroup": "all",

        "payloadVersion": "2.0"

    }

 

    # get the latest access token

    accessToken = rdpToken.getToken()

    hdrs = {

        "Authorization": "Bearer " + accessToken,

        "Content-Type": "application/json"

    }

 

    dResp = requests.post(RESOURCE_ENDPOINT, headers = hdrs, data = json.dumps(requestData))

    if dResp.status_code != 200:

        raise ValueError("Unable to subscribe. Code %s, Message: %s" % (dResp.status_code, dResp.text))

    else:

        jResp = json.loads(dResp.text)

        preserveLastSubscription(jResp["transportInfo"]["endpoint"],

            jResp["transportInfo"]["cryptographyKey"], jResp["subscriptionID"])

        return jResp["transportInfo"]["endpoint"], jResp["transportInfo"]["cryptographyKey"], jResp["subscriptionID"]

Preserve and Retrieve Last Subscription Information

This is a very small but key piece of the implementation, as this is what allows us, when we are ready to drain our queue of the accumulated filtered messages to just retrieve the information that is stored in the file previously, and to do so without entering complex and easy to mistype parameters

And yes, there is also an option to enter specific parameters to the queue and to retrieve and remove based on that.

    	
            

#==============================================

def preserveLastSubscription(line1, line2, line3):

#==============================================

    f = open(lastSubscriptionFile, "w")

    f.write(line1)

    f.write("\n")

    f.write(line2)

    f.write("\n")

    f.write(line3)

    f.close()

Retrieve and Remove Exhaustively

We connect to the queue that we have defined and until any messages remain

  • Retrieve them in chunks of ten
  • Print them out
  • Remove them from the queue

Hereby we are laying foundation for a multitude of use cases, where the contents of the queue will be processed periodically and not just printed.

    	
            

#==============================================

def retrieveAndRemove(accessID, secretKey, sessionToken, endpoint, cryptographyKey, callback=None):

#==============================================

    # create a SQS session

    session = boto3.Session(

        aws_access_key_id = accessID,

        aws_secret_access_key = secretKey,

        aws_session_token = sessionToken,

        region_name = REGION

    )

 

#   sqs = session.client('sqs')

    sqs = session.client('sqs', use_ssl=False, verify=False)

 

    response = sqs.get_queue_attributes(

    QueueUrl=endpoint,

    AttributeNames=[

        'All',

    ]

    )

 

    print('Queue Attributes are:\n')

    print(json.dumps(response, indent=2))



    print('*** Retrieving all messages from queue... ***')

    i = 0

    while 1:

        resp = sqs.receive_message(QueueUrl = endpoint, MaxNumberOfMessages=10, WaitTimeSeconds = 0)

           

        if 'Messages' in resp:

            messages = resp['Messages']

 

            print(f"&&&Number of messages received: {len(messages)}, iteration {i} &&&")

            # print and remove all the nested messages

            for message in messages:

                mBody = message['Body']

                # decrypt this message

                m = decrypt(cryptographyKey, mBody)

                processPayload(m, callback)

                # *** accumulate and remove all the nested message

                sqs.delete_message(QueueUrl = endpoint, ReceiptHandle = message['ReceiptHandle'])

            i += 1

        else:

            print('No more messages on queue... after ',i, ' iterations')

            break

 

We conclude our present brief discussion of RDP News middle office compliance use case.  If you have questions or would like to learn more, get in touch with us on Refinitiv Developers Forums

References

  • The complete source code for the use case is available on GitHub: 

Refinitiv-API-Samples/Example.RDP.Python.RDPNewsMiddleOfficeComplianceUseCases (github.com)

  • Quickstart Python example is available on Refinitiv developers portal:

https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-platform-apis/download

  • Refinitiv developers forums - RDP section:

https://community.developers.refinitiv.com/index.html