Japanese Investor Briefs in News - Part 2

Zoya Farberov
Developer Advocate Developer Advocate

Introduction

We are continuing with the use case discussion from Japanese Investor Briefs in News - Part 1 and next are going to look into:

  • Using realtime streaming service with Refinitiv-Realtime Optimized in Python
  • Using SQS queues in Python script

Refinitiv Realtime Optimized- How to Select Japanese Investor Briefs News

  1. Connect to Real-Time stremaing service
    1. If service is Refinitiv Real-Time Optmized, authenticate 
    2. If service is Refinitiv Real-Time Optmized, we do Service Discovery
    3. Connect to the most available endpoint
  2. Subscribe to MRN news from Real-Time streaming service
  3. Assemble the complete stories per MRN model, including the metadata
  4. Use the metadata to select only Japanese Briefs news per our companies of interest

As Steps 1, 2 and 3 are described in detail in many excellent resources already we are going to omit the discussion here, please see References section and refer to the code example for the complete implementation.  We are going to jump straight to step 4

We are going to use Metadata to select only Japanese Investor Briefs news stories per our companies of interest that are passed in as a parameter into verifyNewsAgainstJapaneseBriefsReqs:

    	
            

def verifyNewsAgainstJapaneseBriefsReqs(self, news_, rics_of_interest):

        # a filter may either have or not have any given filtering stanza

        print('>>>NEWS FILTERING<<<')

       

        print('\t\tFound in news: '+ str(news_['subjects'])+ ' audiences= ' + str(news_['audiences'])+ ' language= ' + str(news_['language']))

        print('\t\tRequired in filter: ' + product_japbrief+ ' '+ language_japanese +' '+newscode_us)

       

        if self.multiCheck(str(news_['audiences']), [product_japbrief]) and (self.singleCheck(str(news_['language']),language_japanese) and self.multiCheck(str(news_['subjects']), [newscode_us])):

            return True

        else:

            return False

singleCheck is very straightforward and we will use it to verify the required language:

    	
            

def singleCheck(self, _stanza, match_string):  

        if _stanza == match_string:

            return True

        else:

            return False

 

multiCheck is very straightforward as well, and we will user it to verify that  all required audiences and subjects are present:

    	
            

def multiCheck(self, _stanza, match_list):  

        if all(x in _stanza for x in match_list):

            return True

        else:

            return False

the same approach can be easily extended to include more checks, if required.  At present to test we can use the command line:

    	
            "--user", "GE-A-XXX", "--password","XXX", "--clientid", "XXX", "--mrn_ric", "MRN_STORY"
        
        
    

Refinitiv Data Platform with AWS Queues

  1. Subscription to headlines or stories with filtering on Japanese Briefs
  2. Preserve the last subscription information
  3. Retrieve the messages from SQS queue, print and remove iteratively until the queue is fully exhausted

Therefore, a typical testing sequence for this example runs can be enacted via command line parameters

  1. -d  - delete all subscriptions
  2. -s  - create new subscription for Japanese Briefs
  3. -l  -optionally examine the new subscription
  4. -r - run it periodically to read all the messages on the queue and drain the queue

As an example we will use subscription filter that must include:

  • product  NP:RINBX
  • language L:ja
  • Newscode G:6J

So en example filter will look like:

    	
            

"filter": {

      "operator": "and",

      "operands": [

        {

          "type": "product",

          "value": "NP:RINBX"

        },

        {

          "type": "language",

          "value": "L:ja"

        },

        {

          "type": "newscode",

          "value": "G:6J"

        }

      ],

      "type": "operator"

    },

we just preserve the last established subscription in a file

    	
            

#==============================================

def preserveLastSubscription(line1, line2, line3):

#==============================================

    f = open(lastSubscriptionFile, "w")

    f.write(line1)

    f.write("\n")

    f.write(line2)

    f.write("\n")

    f.write(line3)

    f.close()

to be accessed by the next retrieve call, that will likely refer to the last subscription, without having to pass it in as a parameter.  Alternatively endpointPerSubscription and cryptographyKey can be passed explicitely on teh commend line.

    	
            

def retrieveLastSubscription():

 

    global endpoint_with_subscriptionId

    global crypto_key

    global currentSubscriptionID

 

    if exists(lastSubscriptionFile):

        f = open(lastSubscriptionFile, "r")

        endpoint_with_subscriptionId = f.readline().rstrip('\n')

        crypto_key = f.readline().rstrip('\n')

        currentSubscriptionID = f.readline().rstrip('\n')

        return True

    else:

        return False

and in our retrieveAndRemove we just loop over:

    	
            

print('*** Retrieving all messages from queue... ***')

    i = 0

    while 1:

        resp = sqs.receive_message(QueueUrl = endpoint, MaxNumberOfMessages=10, WaitTimeSeconds = 0)

           

        if 'Messages' in resp:

            messages = resp['Messages']

 

            print(f"&&&Number of messages received: {len(messages)}, iteration {i} &&&")

            # print and remove all the nested messages

            for message in messages:

                mBody = message['Body']

                # decrypt this message

                m = decrypt(cryptographyKey, mBody)

                processPayload(m, callback)

                # *** accumulate and remove all the nested message

                sqs.delete_message(QueueUrl = endpoint, ReceiptHandle = message['ReceiptHandle'])

            i += 1

        else:

            print('No more messages on queue... after ',i, ' iterations')

            break