Focusing on Significant Developments in News - Part 2

Zoya Farberov
Developer Advocate Developer Advocate

Introduction

We are continuing with the use case discussion from Focusing on Significant Developments in News - Part 1 and next are going to look into:

  • Using realtime streaming service with Refinitiv-Realtime Optimized in Python
  • Using SQS queues in Python script

Refinitiv Realtime Optimized- How to Select Significant Development News

  1. Connect to Real-Time stremaing service
    1. If service is Refinitiv Real-Time Optmized, authenticate 
    2. If service is Refinitiv Real-Time Optmized, we do Service Discovery
    3. Connect to the most available endpoint
  2. Subscribe to MRN news from Real-Time streaming service
  3. Assemble the complete stories per MRN model, including the metadata
  4. Use the metadata to select only Significant Developments news per our companies of interest

As Steps 1, 2 and 3 are described in detail in many excellent resources already we are going to omit the discussion here, please see References section and refer to the code example for the complete implementation.  We are going to jump straight to step 4

We are going to use Metadata to select only Significant Developments news stories per our companies of interest that are passed in as a parameter into verifyNewsAgainstSigDevReqs:

    	
            

ddef verifyNewsAgainstSigDevReqs(self, news_, rics_of_interest):

        # a filter may either have or not have any given filtering stanza

        print('>>>NEWS FILTERING<<<')

       

        print('\t\tFound in news: '+ str(news_['subjects'])+ ' provider= ' + str(news_['provider']))

        print('\t\tRequired in filter: ' + rics_of_interest)

       

        if self.sigDevCheck(str(news_['provider'])) and self.ricsInSubjectsCheck(str(news_['subjects']), rics_of_interest):

            return True

        else:

            return False

sigDevCheck is very straightforward we ascertain that provider includes Significant Developments code NS:SIGDEV:

    	
            

def sigDevCheck(self, _provider_stanza):  

        if sig_dev_provider in _provider_stanza:

            return True

        else:

            return False

and ricsInSubjectsCheck aims to match one of our RICs of interest:

    	
            

def ricsInSubjectsCheck(self, _subjects_stanza, rics_of_interest):  

      #  filter_expr_subjects = filter_expr_subjects.replace(",",replaceForComma)

        # always OR

        lRics = rics_of_interest.split(',')

        lSubjectRics = ['R:' + x for x in lRics]

        if any(x in _subjects_stanza for x in lSubjectRics):

            return True

        else:

            return False

the same approach can be easily extended to include more checks, if required.  At present to test we can use the command line:

    	
            "--user", "GE-A-XXX", "--password","XXX", "--clientid", "XXX", "--mrn_ric", "MRN_STORY", "--rics", "AMD.N,FB.O,MSFT.O,GOOG.O,IBM.N,AMZN.O"
        
        
    

Refinitiv Data Platform with AWS Queues

  1. Subscription to headlines or stories with filtering on Significant Development
  2. Preserve the last subscription information
  3. Retrieve the messages from SQS queue, print and remove iteratively until the queue is fully exhausted

Therefore, a typical testing sequence for this example runs can be enacted via command line parameters

  1. -d  - delete all subscriptions
  2. -s  - create new subscription for Significant developments
  3. -l  -optionally examine the new subscription
  4. -r - run it periodically to read all the messages on the queue and drain the queue

Subscription uses the filter that must include:

  • Attribution NS:SIGDEV
  • PermIDs for any companies that are required, we can include RICs as well

So en example filter will look like:

    	
            

    requestData = {

        "transport": {

            "transportType": "AWS-SQS"

        },

        "filter": {

            "type": "operator",

            "operator": "and",

            "operands": [

        {

            "type": "attribution",

            "value": "NS:SIGDEV"

        },

            {

                "operator": "or",

                "operands": [

                {

                    "value": "R:AMZN.O",

                    "type": "newscode"

                },

                 {

                    "value": "P:4295905494",

                    "type": "permid"

                },

                {

                    "value": "R:GOOG.O",

                    "type": "newscode"

                },

                {

                    "value": "P:5030853586",

                    "type": "permid"

                },

                {

                    "value": "R:META.O",

                    "type": "newscode"

                },

                {

                    "value": "P:4297297477",

                    "type": "permid"

                },

                {

                    "value": "R:TSLA.O",

                    "type": "newscode"

                },

                {

                "value": "P:4297089638",

                "type": "permid"

                }],

                "type": "operator"

            }

]},

we just preserve the last established subscription in a file

    	
            

#==============================================

def preserveLastSubscription(line1, line2, line3):

#==============================================

    f = open(lastSubscriptionFile, "w")

    f.write(line1)

    f.write("\n")

    f.write(line2)

    f.write("\n")

    f.write(line3)

    f.close()

to be accessed by the next retrieve call, that will likely refer to the last subscription, without having to pass it in as a parameter.  Alternatively endpointPerSubscription and cryptographyKey can be passed explicitely on teh commend line.

    	
            

def retrieveLastSubscription():

 

    global endpoint_with_subscriptionId

    global crypto_key

    global currentSubscriptionID

 

    if exists(lastSubscriptionFile):

        f = open(lastSubscriptionFile, "r")

        endpoint_with_subscriptionId = f.readline().rstrip('\n')

        crypto_key = f.readline().rstrip('\n')

        currentSubscriptionID = f.readline().rstrip('\n')

        return True

    else:

        return False

and in our retrieveAndRemove we just loop over:

    	
            

print('*** Retrieving all messages from queue... ***')

    i = 0

    while 1:

        resp = sqs.receive_message(QueueUrl = endpoint, MaxNumberOfMessages=10, WaitTimeSeconds = 0)

           

        if 'Messages' in resp:

            messages = resp['Messages']

 

            print(f"&&&Number of messages received: {len(messages)}, iteration {i} &&&")

            # print and remove all the nested messages

            for message in messages:

                mBody = message['Body']

                # decrypt this message

                m = decrypt(cryptographyKey, mBody)

                processPayload(m, callback)

                # *** accumulate and remove all the nested message

                sqs.delete_message(QueueUrl = endpoint, ReceiptHandle = message['ReceiptHandle'])

            i += 1

        else:

            print('No more messages on queue... after ',i, ' iterations')

            break



Common Filtering Criteria

ES:1 = High Significance

ES:2 = Medium Significance

ES:3 = Low Significance

M:2E1 = Front Page

References