Refinitiv Data Platform APIs

How To Identify And Request ESG Bulk Content - Python

Zoya Farberov
Developer Advocate Developer Advocate

Introduction

The main points to look for in the article:

  • How to authenticate with RDP
  • Approaches to identifying the required ESG Bulk files
  • How to stream the identified files to one's desktop 

Introduction to ESG Dataset and Refinitiv Data Platform Service

ESG stands for Environmental, Social and (Corporate) Governance data.

Refinitiv Data Platform (RDP) provides simple web based API access to a broad range of content, including ESG content and ESG content in bulk.

With growing popularity of socially conscious investing, Refinitiv offers one of the most comprehensive Environmental, Social and Governance (ESG) databases in the industry, covering over 80% of global market cap, across more than 450 different ESG metrics, with history going back to 2002. Customers looking to download our ESG content can do so through our bulk API service in Refinitiv Data Platform (RDP). RDP is a cloud based API that provides a single access point to all Refinitiv content.

ESG data is the first content made available in our bulk API service known as Client File Store (CFS). This capability allows our customers to download our entire history of ESG coverage. To more about how the ESG Bulk Service works in Refinitiv Data Platform, please visit:

https://developers.refinitiv.com/refinitiv-data-platform/refinitiv-data-platform-apis/docs

Within RDP family of service, ESG Bulk is part of Client File Store (CFS) - based section of service, find out more at:

https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-platform-apis

Let us now focus on the programmatic interaction with ESG Bulk RDP service.

Python Environment

For the purpose of demonstration, we are going to use Python 3.7 and  Jupiter Lab

Valid Credentials - Replace in Code or Read From File

Valid RDP credentials are required to interact with an RDP service. 

    	
            

USERNAME = "VALIDUSER"

PASSWORD = "VALIDPASSWORD"

CLIENT_ID = "SELFGENERATEDCLIENTID"

 

def readCredsFromFile(filePathName):

### Read valid credentials from file

    global USERNAME, PASSWORD, CLIENT_ID

    credFile = open(filePathName,"r")    # one per line

                                                #--- RDP MACHINE ID---

                                                #--- LONG PASSWORD---

                                                #--- GENERATED CLIENT ID---

 

    USERNAME = credFile.readline().rstrip('\n')

    PASSWORD = credFile.readline().rstrip('\n')

    CLIENT_ID = credFile.readline().rstrip('\n')

 

    credFile.close()

 

readCredsFromFile("..\creds\credFileHuman.txt")

 

# Uncomment - to make sure that creds are either set in code or read in correctly

#print("USERNAME="+str(USERNAME))

#print("PASSWORD="+str(PASSWORD))

#print("CLIENT_ID="+str(CLIENT_ID))

We include two ways to supply the valid credentials. 

  • One is, to replace the placeholders in code, "VALIDUSER" ... with the valid personal credential values.    To enact, comment out the call  to read cred from file: 

        #readCredsFromFile("..\creds\credFileHuman.txt")

  • The other way is to store a set of valid RDP credentials in a file that is stored in path "../creds" in file "credsFileHuman.txt" and have the code retrieve the credentials from the file.  

        The file is expected to be in simple format one entity per line:

    	
            

VALIDUSER

VALIDPASSWORD

SELFGENERATEDCLIENTID 

Define Token Handling and Obtain a Valid Token

Having a valid token is a pre-requisite to requesting of any RDP content, and will be passed into the next steps.  For additional information on Authorization and Tokens, refer to RDP Tutorial: Authorization - All about tokens.

The implementation steps that come next may look familiar, as with some variation they come up repeatedly, with any RDP service interaction.

    	
            

TOKEN_ENDPOINT = RDP_BASE_URL + CATEGORY_URL + RDP_AUTH_VERSION + ENDPOINT_URL

 

def _requestNewToken(refreshToken):

    if refreshToken is None:

        tData = {

            "username": USERNAME,

            "password": PASSWORD,

            "grant_type": "password",

            "scope": SCOPE,

            "takeExclusiveSignOnControl": "true"

        };

    else:

        tData = {

            "refresh_token": refreshToken,

            "grant_type": "refresh_token",

        };

 

    # Make a REST call to get latest access token

    response = requests.post(

        TOKEN_ENDPOINT,

        headers = {

            "Accept": "application/json"

        },

        data = tData,

        auth = (

            CLIENT_ID,

            CLIENT_SECRET

        )

    )

    

    if response.status_code != 200:

        raise Exception("Failed to get access token {0} - {1}".format(response.status_code, response.text));

 

    # Return the new token

    return json.loads(response.text);

 

def saveToken(tknObject):

    tf = open(TOKEN_FILE, "w+");

    print("Saving the new token");

    # Append the expiry time to token

    tknObject["expiry_tm"] = time.time() + int(tknObject["expires_in"]) - 10;

    # Store it in the file

    json.dump(tknObject, tf, indent=4)

    

def getToken():

    try:

        print("Reading the token from: " + TOKEN_FILE);

        # Read the token from a file

        tf = open(TOKEN_FILE, "r+")

        tknObject = json.load(tf);

 

        # Is access token valid

        if tknObject["expiry_tm"] > time.time():

            # return access token

            return tknObject["access_token"];

 

        print("Token expired, refreshing a new one...");

        tf.close();

        # Get a new token from refresh token

        tknObject = _requestNewToken(tknObject["refresh_token"]);

 

    except Exception as exp:

        print("Caught exception: " + str(exp))

        print("Getting a new token using Password Grant...");

        tknObject = _requestNewToken(None);

   # Persist this token for future queries

    saveToken(tknObject)

    # Return access token

    return tknObject["access_token"];

 

accessToken = getToken();

print("Have token now");

print("Token is: " + accessToken)

Request Available FileSets

The purpose of ESG bulk service is obtaining ESG content in bulk. The content is available as:

  • A full JSON data file containing history for all measures and all organizations.
  • A delta JSON data file that contains only incremental changes to the universe since last week.

A customer can examine the available File Sets that are permissioned to them, and is expected to:

  • Build the initially available ESG content set/representation with the full files
  • Apply delta, changes, as they become available
  • Fill the gap in ESG content, if the retrieval was not completed, and the content that was missed remains available

This step serves to verify the permissioned type of the file, for example:

  • ESGRawFullScheme
  • ESGScoresFull
  • ESGScoresWealthFull
    	
            

FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/file-sets?bucket='+ RDP_ESG_BUCKET

 

def requestFileSets(token, withNext, skipToken, attributes):   

    global FILESET_ENDPOINT

     

    

    print("Obtaining FileSets in ESG Bucket...")

  

    FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/file-sets?bucket='+ RDP_ESG_BUCKET

    

    querystring = {}

    payload = ""

    jsonfull = ""

    jsonpartial = ""

    

    headers = {

            'Content-Type': "application/json",

            'Authorization': "Bearer " + token,

            'cache-control': "no-cache"

    }

 

    if attributes:

        FILESET_ENDPOINT = FILESET_ENDPOINT + attributes

    if withNext:

        FILESET_ENDPOINT = FILESET_ENDPOINT + '&skipToken=' +skipToken

    

    print('GET '+FILESET_ENDPOINT )    

    response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)

         

    print('Raw response=');

    print(response);

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)        

        return jsonFullResp; 

    else:

        return '';

 

jsonFullResp = requestFileSets(accessToken, False, '','');

Paginate Through the Available FileSets

This step allows to see what filesets are presently made available, as this can change overtime.

    	
            

i = 1

while "@nextLink" in jsonFullResp: 

    print('<<< Iteraction: '+str(i)+' >>>  More exists: '+ jsonFullResp['@nextLink'] + ', skipToken is: ' + jsonFullResp['@nextLink'][-62:]+'\n')

    jsonFullResp = requestFileSets(accessToken, True, jsonFullResp['@nextLink'][-62:],'');

    print(json.dumps(jsonFullResp, indent=2));

    i+=1;

print('Last response without next=');

print(json.dumps(jsonFullResp, indent=2));

Retrieve FileSets of Specific File Type (Filter By Attribute)

The file types may change over time, at the time of this writing, the available FileSets are of types:

  • ESG Raw Full A
  • ESG Raw Full B
  • ESG Raw Current A
  • ESG Raw Current B
  • ESG Sources
  • ESG Raw Wealth Standard
  • Symbology Cusip
  • Symbology SEDOL
  • Symbology Organization
  • Symbology Instrument Quote

So if we wish to request only "Symbology Cusip" filesets, we go:

    	
            requestFileSets(accessToken, False, '','&attributes=ContentType:Symbology Cusip');
        
        
    

From the output we select File Id of the file or files that we are interested in downloading.  for example: 

    	
            

...

"files": [ "48c8-c367-10b639d6-9128-0e00b40dea98" ],

...

Or if we wish to filter by Package Id (this is a recommended approach, enter and use PACKAGE_ID from the package IDs permissioned to your user id and supplied to you by your Refinitiv contact):

    	
            jsonFullResp = requestFileSets(accessToken, False, '','&packageId='+PACKAGE_ID); #+'&attributes=ContentType:ESG Sources');
        
        
    

The results are made available in parsed json view:

As well as tabular view by structuring into pandas dataframe:

Retrieving Complete File Details of a FileSet

Once we have identified the FileSet id for the FileSet that we are interested in, we request the complete details, so we can learn the specific File ids and their corresponding File names.

    	
            

FILES_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/files?filesetId='

 

def requestFileDetails(token, fileSetId):   

 

    print("Obtaining File details for FileSet= "+ fileSetId + " ...")

    print("(If result is Response=400, make sure that fileSetId is set with a valid value...)")

    FILES_ENDPOINT = FILES_ENDPOINT_START + fileSetId

  

    querystring = {}

    payload = ""

    jsonfull = ""

    jsonpartial = ""

    

    headers = {

            'Content-Type': "application/json",

            'Authorization': "Bearer " + token,

            'cache-control': "no-cache"

    }

        

    response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)

         

    print('Raw response=');

    print(response);

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)        

        return jsonFullResp; 

    else:

        return '';

 

jsonFullResp = requestFileDetails(accessToken, FILESET_ID);

There are two ways of downloading the files:

Stream File via FileId using Redirect

This is a single request approach to obtaining the required file

    	
            

import shutil

 

FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/files/'

 

# use valid values, obtained from the previous step

exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'  

exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'

 

def requestFileDownload(token, fileId, fileName):   

    FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream'

    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)

  

    chunk_size = 1000

    

    headers = {

            'Authorization': 'Bearer ' + token,

            'cache-control': "no-cache",

            'Accept': '*/*'

    }

        

    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)

 

    # running on windows and colons are not allowed in filenames

    fileName = fileName.replace(":",".")

 

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)

 

         

    print('Response code=' + str(response.status_code));

    

    if response.status_code == 200:

        print('Processing...')

        with open(fileName, 'wb') as fd:

            shutil.copyfileobj(response.raw, fd) 

        print('Look for gzipped file named: '+ fileName + ' in current directory')

        response.connection.close()

        

    return; 

 

# consider below an example only

requestFileDownload(accessToken, exampleFileId, exampleFileName);

#requestFileDownload(accessToken, FILE_ID, FILE_NAME);

Please note at the end of the code snippet, how we can call either with hard-coded exampleFileId and exampleFileName or we can enter FILE_ID and FILE_NAME via variable.  Use the preferred approach.

Or

We can first obtain the direct file download URL (or multiple URLs) and then download the files explicitly, from URLs:

Get File Location (Step 1 of 2)

    	
            

import shutil

 

FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/files/'

DIRECT_URL = ''

 

def requestFileLocation(token, fileId):   

    

    FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream?doNotRedirect=true'    

    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)

  

    filename = FILE_NAME

    chunk_size = 1000

    

    headers = {

            'Authorization': 'Bearer ' + token,

            'cache-control': "no-cache",

            'Accept': '*/*'

    }

        

    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)

 

         

    print('Response code=' + str(response.status_code));

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)

        print('Parsed json response=');

        print(json.dumps(jsonFullResp, indent=2));

        DIRECT_URL = jsonFullResp['url'];

        print('File Direct URL is: '  +str(DIRECT_URL)+ '|||');

        

        return jsonFullResp['url'];

    else:

        return 'Error response: '+ response.text

 

 

DIRECT_URL = requestFileLocation(accessToken, FILE_ID);

and next we are ready to

Download File From File Location (Step 2 of 2)

    	
            

from urllib.parse import urlparse, parse_qs

def requestDirectFileDownload(token, directUrl, fileName):   

    

    global DIRECT_URL

    print("Obtaining File from URL... " + directUrl)

    

    #Parse out URL parameters for submission into requests

    url_obj = urlparse(DIRECT_URL)

    parsed_params = parse_qs(url_obj.query)

    # extract the URL without query parameters

    parsed_url = url_obj._replace(query=None).geturl()

 

    response = requests.get(parsed_url, params=parsed_params,stream=True)

        

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.get(parsed_url, params=query)

 

         

    print('Response code=' + str(response.status_code));        

  

    filename = 'another_'+fileName    

    

    if response.status_code == 200:

        print('Processing...')

        with open(filename, 'wb') as fd:

            shutil.copyfileobj(response.raw, fd) 

 

        print('Look for gzipped file named: '+ filename + ' in current directory')

        response.connection.close()

        

    return; 

 

 

requestDirectFileDownload(accessToken, DIRECT_URL, FILE_NAME);

Let us now examine a very common use case

Select the Latest ESG FileSets (Init and Delta) as of Last Sunday per PackageId

    	
            

import datetime

 

# determine what date last Sunday was

d = datetime.datetime.today()

#print(d)

sun_offset = (d.weekday() - 6) % 7

sunday = d - datetime.timedelta(days=sun_offset)

 

# format Sunday date to ESG bulk current requirements

sunday = sunday.replace(hour=0, minute=0, second=0, microsecond=0)

sunday = str(sunday).replace(' 00:00:00', 'T00:00:00Z')

print("Last Sunday was on", sunday)

 

PACKAGE_ID = '4867-9a46-216e838a-9241-8fc3561b51ef'

ESG_FILESET_RESP = requestFileSets(accessToken, False, '','&packageId='+PACKAGE_ID+'&availableFrom='+ sunday);

print('Parsed json response=');

print(json.dumps(ESG_FILESET_RESP, indent=2));

# now ESG_FILESET_RESP contains the requisite FileSetIds

now that we have identified the FilesEts that we require, we are able to iterate over the identified files and request them for download:

Iterate over Latest ESG FileSets and Request the Latest ESG Files (Init and Delta)

 

    	
            

print("List of FileSet Ids to be streamed by this step:")

for item in ESG_FILESET_RESP['value']:

    print ('\t'+item['id'])

    # Request File Details for the FileSets of interest

    jsonFullRespFile = requestFileDetails(accessToken, item['id']);

    print('\t\tList of Files:')

    for item2 in jsonFullRespFile['value']:

        print ('File name: ' +item2['filename'])

    # Request download per file Id and into fileName

    print('Starting download ... ')

    for item2 in jsonFullRespFile['value']:

        print ('Streaming File: ' +item2['filename'])

        requestFileDownload(accessToken, item2['id'],item2['filename']);

This article is brought to developers in collaboration with ESG Bulk product management team.

 

And next, would like to leave off with relevant ESG Bulk information:

Example.RDPAPI.Python.ESGBulkIntroduction

RDP Postman Collection and Tutorial Samples

ESG Bulk - CFS API User Guide

CFS API User Guide

https://www.python.org/

https://jupyter.org/