Common Use Case- With Emphasis on Ease of Use

Snapshotting and Streaming 2K Instrument List with Refinitiv Data Library Python

Zoya Farberov
Developer Advocate Developer Advocate

Introduction

Refinitiv Data Library for Python provides a set of ease-of-use interfaces offering coders uniform access to the breadth and depth of financial data and services available on the Refinitiv Data Platform.  It has recently became available to developers in early release, and it is a successor to RDP Library for Python.

When developers are new to Refinitiv Data Library Python, the first piece of code that we look to get working usually involves a single instrument or just a few of the instruments.  Once that is accomplished, next, we are looking to implement the real-world requriements, that typically require the lists of instruments.

Next, we are going to discuss, how Refintiiv Data Library for Python allows for EASE OF USE in implementing a common use case of SNAPSHOTTING (getting all fields once) or STREAMING ( getting all fields one and any consequent updates) of a list of 2000 INSTRUMENTS.

For the requirements that are larger then 2K, the partitioning approach that we are going to discuss can be extended, by partitioning or chunking those requirements into the batches of 2K (or other preferred batch size), and requesting in batches, easily configurable and tunable to the requirement at hand.

We are going to focus on the functionality that is available and ready to use at the time of this writing.

Refinitiv Data Library Python Resources at a Glance

It may be helpful to discuss right away, how to obtain RD Library Python, and to access the developer resources fro RD Lib Python:

Refinitiv Data Library for Python- Quckstart Guide

Refinitiv Data Library for Python - Concept Guide and Documentation

Data Library for Python - Examples on GitHub

Requesting 2K Pricing Snapshots

  • Using stream definition without updates,
  • Using get_data (chunked)

This example code that is discussed in this article and is available for download (see References section at the end) is based on EX-2.02.01-Pricing-Snapshot example, that is part of Refinitiv Data Library for Python example deck

A little setup is required before we can get started.

Import Required Libraries

    	
            

import os

os.environ["RD_LIB_CONFIG_PATH"] = "../../../Configuration"

import refinitiv.data as rd

from refinitiv.data.content import pricing

import pandas as pd

Read File with 2k RICs

Our RIC file is simply formatted- one RIC per line. 

    	
            

defaultRICs =  ['IBM.N', 'GBP=', 'JPY=', 'CAD=']  

def readRICFile(filename):

    if filename != '':

        f = open(filename, "r")

        ricList = f.read().splitlines()

        print('Read '+str(len(ricList))+' RICs from file '+filename)

    else:

        ricList = defaultRICs

    return ricList

Requesting 2k Snapshots with Stream

Desktop session, RDP Platform session(if user is permissioned for streaming) and Deployed session support requesting of snapshot over stream

Different session type options are included (see comments), we can select the required session configuration, while keeping the other available session configs commented out:

    	
            

#rd.open_session('platform.rdpMachine')

#rd.open_session('platform.deployed')

rd.open_session(name='desktop.workspace',config_name='../../../Configuration/refinitiv-data.config.json')

Getting stream from a platform session:

    	
            

non_streaming = rd.content.pricing.Definition(

    myRics,

    fields=['BID', 'ASK']

).get_stream()

or, if deployed session is used, service definition may be required to correspond to the service name as it is configured on teh local RTDS:

    	
            

non_streaming = rd.content.pricing.Definition(

    myRics,

    fields=['BID', 'ASK'],

    service = 'ELEKTRON_DD'

).get_stream()

    	
            

# Open the stream in non-streaming mode

non_streaming.open(with_updates=False)

 

# Snapshot the prices at the time of the open() call

non_streaming.get_snapshot()

non_streaming is now available for any required access, for example:

Iterate on Instruments and Fields

    	
            

for instrument in non_streaming:

    print(instrument.name)

    for field_name, field_value in instrument:

        print(f"\t{field_name} : {field_value}")

Simplified Form Based on Stream

can be used simply by:

    	
            

data = rd.get_data(myRics, fields=['BID', 'ASK'])

data

Let us go over another way

Requesting Snapshots with Get Data

Depending on permissions assigned to the user, this approach is suitable for RDP platform session, machineId or human, scopes: trapi.data.pricing.read are required

2K list submitted in a single request can result in '414 Request-URI Too Large' and so we

Chunk RIC List - Define a Helper Function

    	
            

def list_to_chunks(long_list, chunk_size):

    chunked_list = list()

    for i in range(0, len(long_list), chunk_size):

        chunked_list.append(long_list[i:i+chunk_size])

    return chunked_list

and now we are ready to

Get 2K Price Snapshots as Chunked

and assemble results into a single result DataFame.  

We try not to append or concat the chunk result to the overall result on every iteration, this would work as well, but this would be costly, a DataFrame is rebuilt "in-place".  Instead, we append the chunk results to a list, and build the result DataFrame from the list, when completed, and only once.

    	
            

dataAll = []

myRicChunks = list_to_chunks(myRics, 100)

for i in range(len(myRicChunks)):

    response = rd.content.pricing.Definition(

        myRicChunks[i],

        fields=['BID', 'ASK']

    ).get_data()

 

    print("Chunk #", i)

    dataAll.append(response.data.df)

#    print(response.data.df)

dfAll = pd.concat(dataAll,ignore_index=True)

print('Displaying full result:')

display(dfAll)   

We like to track the progress, and when completed we display the complete result: 

next, we go over:

Requesting 2K Pricing Streaming Events

This example code is based on EX-2.02.04-Pricing-StreamingEvents example, that is part of Refinitiv Data Library for Python example deck

A little setup is required

  • Import Required Libraries
  • Read File with RICs

See code for these steps included at the beginning of the article and in the example on GitHub (see References section)

Open Session

Supported sesstions are:

  • Desktop
  • RDP platform human or machineId if permissioned for streaming pricing
  • Deployed RTDS

Again, just uncomment the session configuration pointing per requirement.

    	
            

i#rd.open_session()

#rd.open_session('platform.rdpMachine')

rd.open_session(name='platform.rdpMachine',config_name='../../../Configuration/refinitiv-data.config.json')

#rd.open_session(name='platform.deployed',config_name='../../../Configuration/refinitiv-data.config.json')

#rd.open_session(name='desktop.workspace',config_name='../../../Configuration/refinitiv-data.config.json')

#rd.open_session(name='platform.rdp',config_name='../../../Configuration/refinitiv-data.config.json')

Define Callbacks to Capture Incoming Events

    	
            

def display_refreshed_fields(pricing_stream, instrument_name, fields):

    current_time = datetime.datetime.now().time()

    print(current_time, "- Refresh received for", instrument_name, ":", fields)   

def display_updated_fields(pricing_stream, instrument_name, fields):

    current_time = datetime.datetime.now().time()

    print(current_time, "- Update received for", instrument_name, ":", fields)    

...

Create a Pricing Stream and Register Event Callbacks

Note (in code, as commented out) that for Deployed session we may need to set service to use explicitly 

If the service name does not match, the error may look similar to:

...'Stream': 'Closed', 'Data': 'Suspect', 'Code': 'SourceUnknown', 'Text': 'A18: Unknown service.'...

    	
            

stream = rd.content.pricing.Definition(

    myRics,

    fields=['BID', 'ASK'],

#    service = 'ELEKTRON_DD'

).get_stream()

 

 

stream.on_refresh(display_refreshed_fields)

stream.on_update(display_updated_fields)

stream.on_status(display_status)

stream.on_complete(display_complete_snapshot)

Open the Stream

To be closed the same way when we are done

    	
            stream.open()
        
        
    

We should be seeing refreshes and updates for the valid instruments and perhaps a few Status "{'Stream': 'Closed', 'Data': 'Suspect', 'Code': 'NotFound', 'Text': '**The record could not be found'}" for invalid instruments if they are included in the list

Next, let us look at generalizing the ease-of-use approach for larger instrument list requirements:

Request 10k List of Streaming Data

Partitioning requests into convenient bathes

Chunk RIC List - Define a Helper Function

    	
            

def list_to_chunks(long_list, chunk_size):

    chunked_list = list()

    for i in range(0, len(long_list), chunk_size):

        chunked_list.append(long_list[i:i+chunk_size])

    return chunked_list

Partion the List into 2k Sub-Lists (Batches)

    	
            my2kRicChunks = list_to_chunks(my10kRics, 2000)
        
        
    

Create a Stream for Each Batch (Sub-List) with Callbacks

Minimize the output to be able to see the output reporting completion of each Batch - Refresh callbacks can be just "pass"-

and we store the stream handles.

    	
            

def donot_display_refreshed_fields(pricing_stream, instrument_name, fields):

    pass

      

#def donot_display_updated_fields(pricing_stream, instrument_name, fields):

#    pass

 

 

streams_list = list()

for i in range(len(my2kRicChunks)):

    stream = rd.content.pricing.Definition(

        my2kRicChunks[i],

        fields=['BID', 'ASK']

    ).get_stream()

 

    stream.on_refresh(donot_display_refreshed_fields)

    stream.on_update(display_updated_fields)

    stream.on_status(display_status)

    stream.on_complete(display_complete_snapshot)

    streams_list.append(stream)

In this example we go with the simplest approach:

Open Streams Sequentially

    	
            

for i in range(len(streams_list)):

    streams_list[i].open()

    print("Stream #", i)

the diminished output allows us to track the progress:

Close Streams Sequentially (at any time)

Once all streams are closed- all the updates should cease

    	
            

for i in range(len(streams_list)):

    streams_list[i].close()

    print("Stream closed #", i)

This wraps up our present discussion, more of the relevant information is listed in the References section

References

https://github.com/Refinitiv-API-Samples/Article.RDLibrary.Python.2KUseCase - Article Code

Refinitiv Data Library for Python- Quckstart Guide

Refinitiv Data Library for Python - Concept Guide and Documentation

Data Library for Python - Examples on GitHub

To discuss these topics further, or with other questions about Refinitiv APIs usage- get in touch with us on Refinitiv Developers Forums