Requesting ESG Bulk PIT Content Set- Python

Zoya Farberov
Developer Advocate Developer Advocate

Introduction

ESG stands for Environmental, Social and (Corporate) Governance data.

Refinitiv Data Platform (RDP) provides simple web based API access to a broad range of content, including ESG content and ESG content in bulk.

PIT content is newly made available on RDP, and this article will review the recommended approach to requesting complete ESG Bulk PIT content set and keeping it up-to-date.

For information on how to get started working with RDP, including the generation of CLIENT_ID, which is the same as AppKey, please refer to Quickstart guide for Refinitiv Data Platform included in References

Programmatic Approach

Every Monday 9am UTC, both full and delta ESG Bulk PIT filesets will be published. 

There are currently 100 constituent initialization or full files, and as we add years, these will increase in 5’s. 

At present, there are 5 delta or change files.

There are two ways to stay up-to date:

  • Every week, we initialize the local ESG Bulk PIT store or database by requesting the full set of initialization ESG Bulk PIT files (.F) 
  • First week, we Initialize the local store by requesting the full set of initialization ESG Bulk PIT files (.F).  Consequently, we request the delta ESG Bulk PIT files (.I) that will contain all the changes that were applied to the database since the last week's ESG Bulk PIT .F. files

Therefore, at any beginning of the week, depending on the approach that was selected, we need to request either full set of initialization files, or all the delta/change files, and we never require to request both types of files.

Requirements and Setup to Access RDP ESG Bulk

To interact with RDP platform we require valid RDP credentials and setup:

  1. Import required libraries and define RDP endpoint paths and constants.
  2. Load valid RDP credentials that are permissioned for RDP ESG Bulk access.
  3. Authenticate with RDP using credentials to obtain a valid token

These steps are included in the companion code examples hosted on GitHub (see References section) and are described in detail in many RDP articles, for example https://developers.refinitiv.com/en/article-catalog/article/exploring-news-metadata-refinitiv-data-platform-and-python, so we omit what would be a redundant detailed discussion of these steps here as well, and focus solely on requesting RDP ESG Bulk PIT.

Request Available ESG Bulk PIT File Sets per Package ID

We are going to define bucket to use as a variable, at this time the bucket in use is 

RDP_ESG_PIT_BUCKET = 'bulk-esg'

PackageID assigned to PIT content set should be known prior, and at this time it's '4173-aec7-8a0b0ac9-96f9-48e83ddbd2ad'

We are going to define  a helper function accepting packageID as parameter, that populates this infromation into attributes of the request:

jsonFullResp = requestFileSets(accessToken, False, '','&packageId='+packageIdPIT);

The helper function will work with a packageID, valid at the time of the request and communicated prior to it. 

    	
            

packageIdPIT = '4173-aec7-8a0b0ac9-96f9-48e83ddbd2ad'

FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/file-sets?bucket='+ RDP_ESG_PIT_BUCKET

 

def requestFileSets(token, withNext, skipToken, attributes):   

    global FILESET_ENDPOINT

     

    

    print("Obtaining FileSets in ESG Bucket...")

  

    FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/file-sets?bucket='+ RDP_ESG_PIT_BUCKET

    

    querystring = {}

    payload = ""

    jsonfull = ""

    jsonpartial = ""

    

    headers = {

            'Content-Type': "application/json",

            'Authorization': "Bearer " + token,

            'cache-control': "no-cache"

    }

 

    if attributes:

        FILESET_ENDPOINT = FILESET_ENDPOINT + attributes

    if withNext:

        FILESET_ENDPOINT = FILESET_ENDPOINT + '&skipToken=' +skipToken

    

    print('GET '+FILESET_ENDPOINT )    

    response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)

         

    print('Raw response=');

    print(response);

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)        

        return jsonFullResp; 

    else:

        return '';

 

jsonFullResp = requestFileSets(accessToken, False, '','&packageId='+packageIdPIT);

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

print('Same response, tabular view');

dfPIT = pd.json_normalize(jsonFullResp['value'])

dfPIT

let us observe that more then one Fileset ID is available

Select ESG Fileset ID

We are going to select FilesetID of a Fileset that is most recent - with maximum "created" timestamp

    	
            

dfPITlast = dfPIT[dfPIT.created == dfPIT.created.max()]

FILESET_ID = dfPITlast["id"].iloc[0]

print('FILESET_ID selected is: ' + FILESET_ID)

and next we are ready to

Request File IDs per selected Fileset ID

We are going to define a helper function:

    	
            

FILES_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/files?filesetId='

 

def requestFileDetails(token, fileSetId, attributes, withNext, skipToken):   

 

    print("Obtaining File details for FileSet= "+ fileSetId + " ...")

    print("(If result is Response=400, make sure that fileSetId is set with a valid value...)")

    if withNext:

        FILES_ENDPOINT = RDP_BASE_URL + skipToken

    else:

        FILES_ENDPOINT = FILES_ENDPOINT_START + fileSetId

  

    if attributes:

        FILES_ENDPOINT = FILES_ENDPOINT + attributes

        

    querystring = {}

    payload = ""

    jsonfull = ""

    jsonpartial = ""

    

    headers = {

            'Content-Type': "application/json",

            'Authorization': "Bearer " + token,

            'cache-control': "no-cache"

    }

        

    response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)

         

    print('Raw response=');

    print(response);

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)        

        return jsonFullResp; 

    else:

        return '';

We will call the helper function with pageSize of 100 (max) to obtain the first 100 available results as well as skipToken or nextLink, to obtain the remaining available results. 

    	
            

jsonFullResp = requestFileDetails(accessToken, FILESET_ID, '&pageSize=100', False, '');

 

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

dfPart1 = pd.json_normalize(jsonFullResp['value'])

dfPart1

 

skipToken = jsonFullResp['@nextLink']

skipToken

 

jsonFullRespRemainder = requestFileDetails(accessToken, FILESET_ID, '&pageSize=100', True, skipToken);

 

print('Parsed json response=');

print(json.dumps(jsonFullRespRemainder, indent=2));

dfPart2 = pd.json_normalize(jsonFullRespRemainder['value'])

dfPart2

 

#Put the two results together

dfAll = dfPart1.append(dfPart2)

dfAll

Identify All the Latest Initialization files

Select .F files

    	
            

dfFull = dfAll.loc[dfAll['filename'].str.contains('\.F\.')]

dfFull

note how initialization or full files always contain '.F.' in the file name:

Identify The Latest Delta Files

Select .I files

    	
            

dfDelta = dfAll.loc[dfAll['filename'].str.contains('\.I\.')]

dfDelta

note how delta/Incremental files always contain '.I.' in the file name:

we are going to define a helper function to

Download File via File Id with Redirect

    	
            

import shutil

 

FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/files/'

 

# use valid values, obtained from the previous step

exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'  

exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'

 

def requestFileDownload(token, fileId, fileName):   

    FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream'

    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)

  

    chunk_size = 1000

    

    headers = {

            'Authorization': 'Bearer ' + token,

            'cache-control': "no-cache",

            'Accept': '*/*'

    }

        

    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)

 

         

    print('Response code=' + str(response.status_code));

    

    if response.status_code == 200:

        print('Processing...')

        with open(fileName, 'wb') as fd:

            shutil.copyfileobj(response.raw, fd) 

        print('Look for gzipped file named: '+ fileName + ' in current directory')

        response.connection.close()

        

    return; 

we are going to call this function repeatedly to

Download the Latest PIT Delta Files

    	
            

for index, row in dfDelta.iterrows():

    print (index,row["id"], row["filename"])

    requestFileDownload(accessToken, row["id"],'.\\PITfiles\\'+row["filename"]);

and to call this function repeatedly to

Download the Latest PIT Full Files

    	
            

for index, row in dfFull.iterrows():

    print (index,row["id"], row["filename"])

    requestFileDownload(accessToken, row["id"],'.\\PITfiles\\'+row["filename"]);

Downloading ESG Bulk Files Without Redirect

The code to request ESG Bulk files without redirect is included at the bottom of the companion example on GitHub:

  1. Request File Location (Step 1 of 2)
  2. Download File From File Location (Step 2 of 2)

 

References

Thanks to our subject matter expert, product manager Anita Varma, for sharing her knowledge and insight.

Example code on GitHub:  Refinitiv-API-Samples/Example.RDPAPI.Python.ESGBulkPIT

RDP API are on developers portal: Refinitiv Data Platform APIs

RDP Quickstart guide: Quickstart guide for Refinitiv Data Platform

RDP ESG Bulk PIT User Guide on developers portal:  RDP ESG Bulk PIT User Guide