Divide and Conquer in Action

Downloading a Large Tick History File with Python

Zoya Farberov
Developer Advocate Developer Advocate

Introduction

There are many approaches and techniques that can be utilized to download files.  The most obvious and simple approach is, to just request the file, to receive it fully, and to store it locally. 

In this article, we are going to discuss an approach to downloading really large files from RDP CFS - Tick History service, that is robust, and minimizes the chances of a potential failure.

The two primary aspects that should enable us to get down our large file with a large decree of reliability and that we will next focus on are:

  • Streaming the file via requests module, rather then downloading as a whole
  • Partitioning a large requirement into smaller pieces, that can also serve as checkpoints, and only requesting one piece, at a time by using HTTP request header Range  

Let us introduce the concepts that will be part of the discussion:

  • Refinitiv Data Platform (RDP)
  • Client File Store (CFS)
  • Tick History (TH)
  • Python Jupyter Notebook - a Python interface to run, document, share and learn Python code, that will be our vehicle 

Authentication Requirement

RDP CFS Tick History service that is used to obtain Tick History files requires authentication with valid RDP credentials.   In order to keep the focus of this discussion on downloading large tick history files, while still fullfilling the authentication requirements, and including the complete working example that is available for download on GitHub ( see References) authentication is implemented in a separate AuthTokenHandling notebook.  We install module ipynb and reference the notebook from the same directory: 

    	
            

from ipynb.fs.full.AuthTokenHandling import getToken

 

next we are ready to call the defined functions:

Authenticate- Call getToken 

    	
            

accessToken = getToken(); 

print("Have token now");

Request File Sets - Define a Helper Function

We are going to identify required FileSet and required FileID

    	
            

def requestFileSets(token, withNext, skipToken, bucket, attributes):   

    global FILESET_ENDPOINT    

    print("Obtaining FileSets in "+bucket+" Bucket...")

  

    FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/file-sets?bucket='+ bucket

    

    querystring = {}

    payload = ""

    jsonfull = ""

    jsonpartial = ""

    

    headers = {

            'Content-Type': "application/json",

            'Authorization': "Bearer " + token,

            'cache-control': "no-cache"

    }

 

    if attributes:

        FILESET_ENDPOINT = FILESET_ENDPOINT + attributes

    if withNext:

        FILESET_ENDPOINT = FILESET_ENDPOINT + '&skipToken=' +skipToken

        

    response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)

         

    print('Raw response=');

    print(response);

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)

        return jsonFullResp; 

    else:

        return '';

So that next we can

Request FileSets Containing Large Tick History File -

The file we are going to use as example is FRA-2020-02-25-NORMALIZEDMP-Data-1-of-1

We request Tick History File Sets for a venue FRA and limit by dates 2020.02.25 to 2020.02.29, and the first FileSet should contain the file ID of the file that we require.

    	
            

RDP_TH_BUCKET = "TICKHISTORY_VBD_UNLIMITED"

CFS_VBD_VENUE = "FRA"

CFS_VBD_VIEW = "normalised"

 

jsonFullResp = requestFileSets(accessToken, False, '',RDP_TH_BUCKET,'&attributes=venue:'+CFS_VBD_VENUE+',view:'+CFS_VBD_VIEW+

                               '&contentFrom=2020-02-25T00:00:00Z&contentTo=2020-02-29T00:00:00Z');

print('Same response, tabular view');

df = pd.json_normalize(jsonFullResp['value'])

df

Select File Id

We are going to select the first File ID in the File Set, to use 2020.02.25 FRA normalized as an example of a large file.  Another large file can be handled analogously- we just need it's File ID to proceed

    	
            

FILE_ID = df.iloc[0]['files'][0]

print('FILE_ID selected is: ' + FILE_ID)

Get TH File Details by File ID - Define a Helper Function

    	
            

FILES_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'

 

def requestFileDetails(token, fileId):   

 

    print("Obtaining File details for File= "+ fileId + " ...")

    print("(If result is Response=400, make sure that fileId is set with a valid value...)")

    FILES_ENDPOINT = FILES_ENDPOINT_START + fileId

  

    querystring = {}

    payload = ""

    jsonfull = ""

    jsonpartial = ""

    

    headers = {

            'Content-Type': "application/json",

            'Authorization': "Bearer " + token,

            'cache-control': "no-cache"

    }

        

    response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)

         

    print('Raw response=');

    print(response);

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)        

        return jsonFullResp; 

    else:

        return '';

that we can next use to

Request File Details of a Large File

* We verify the size of the file to be retrieved

* Store its file name and file size

    	
            

jsonFullResp = requestFileDetails(accessToken, FILE_ID);

 

print('Parsed json response=');

print(json.dumps(jsonFullResp, indent=2));

FILE_NAME = jsonFullResp['filename'];

print('FILE_NAME selected is: ' + FILE_NAME)

FILE_SIZE_BYTES = jsonFullResp['fileSizeInBytes'];

print('FILE_SIZE_BYTES selected is: ' + str(FILE_SIZE_BYTES))

with what we have learned from the File details, we are ready to download as two step, without Redirect:

  1. Obtain the complete file location (URL)
  2. Stream and download file in chunks, continuously copying to disk just as it becomes ready

Get File Location (Step 1 of 2)

    	
            

FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'

DIRECT_URL = ''

 

def requestFileLocation(token, fileId):   

    

    FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream?doNotRedirect=true'

    

    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)

  

    chunk_size = 1000

    

    headers = {

            'Authorization': 'Bearer ' + token,

            'cache-control': "no-cache",

            'Accept': '*/*'

    }

        

    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)

    

    if response.status_code != 200:

        if response.status_code == 401:   # error when token expired

                accessToken = getToken();     # token refresh on token expired

                headers['Authorization'] = "Bearer " + accessToken

                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)

 

         

    print('Response code=' + str(response.status_code));

    

    if response.status_code == 200:

        jsonFullResp = json.loads(response.text)

        DIRECT_URL = jsonFullResp['url'];

        print('File Direct URL is: '  +str(DIRECT_URL));

        

    return DIRECT_URL; 

 

 

DIRECT_URL = requestFileLocation(accessToken, FILE_ID);

and

Download File From File Location in Chunks (Step 2 of 2)

The chunk size can be tuned for the specific environment and the required download size.  chunkSize=3000000000 has worked well in my environment for the 38G file that was downloaded.

    	
            

from urllib.parse import urlparse, parse_qs

 

def requestDirectFileDownload(token, fileURL, fileName,fileSizeBytes, chunkSize):   

 

    print("Obtaining File from URL... " + fileURL + '... to file name=' + fileName+ ', it\'s size is '+ str(fileSizeBytes))

    

    headers_ = {

            'Accept': '*/*',

    }

    

    #Parse out URL parameters for submission into requests

    url_obj = urlparse(fileURL)

    parsed_params = parse_qs(url_obj.query)

    # extract the URL without query parameters

    parsed_url = url_obj._replace(query=None).geturl()

 

    for i in range(0, fileSizeBytes, chunkSize):

        rangeEnd = i+chunkSize -1

        if rangeEnd > (fileSizeBytes -1):

            rangeEnd = fileSizeBytes -1

        rangeExpression = 'bytes='+str(i)+'-'+str(rangeEnd)

        print('Processing rangeExpression='+rangeExpression)

        headers_['Range'] = rangeExpression

        

        # ignore verifying the SSL certificate as example only 'verify=False'

        # otherwise add security certificate

        response = requests.get(parsed_url, headers = headers_, params=parsed_params, stream=True) #, verify=False)

          

        print('Response code=' + str(response.status_code)+ ' text='+ str(response.reason));

        

        if response.status_code != 206:

            if response.status_code == 401:   # error when token expired

                    accessToken = getToken();     # token refresh on token expired

                    headers_['Authorization'] = "Bearer " + accessToken

                    response = requests.get(parsed_url, params=parsed_params, headers=headers_,  stream=True) #, verify=False) 

            elif response.status_code == 400:   #bad request

                    break;          

  

        filename = 'downloaded_'+fileName  

    

        if response.status_code == 206:

            print('Processing into file '+str('downloaded_'+fileName)+' ...')

            with open(filename, 'ab') as fd:

                shutil.copyfileobj(response.raw, fd) 

                

    print('Look for gzipped file named: '+ 'downloaded_'+filename + ' in current directory')       

        

    response.connection.close()

        

    return; 

 

requestDirectFileDownload(accessToken, DIRECT_URL, FILE_NAME, FILE_SIZE_BYTES, 3000000000);

<<<>>>

Now our large file is ready to unzip and use as required.  

To learn more, visit us on LSEG Developers Forums

References