Python Jupyter Notebook

Tick History Request - Parametrize and Parallelize

Zoya Farberov
Developer Advocate Developer Advocate

Introduction

This article will discuss a quick and easy wrapper approach for running parallel Tick History requests.  We are going to use TickHistoryTimeAndSalesExtractionRequest as an example request, while the analogous steps can be used with other Tick History and Datascope Select requests.

The companion example code is a Python Jupyter notebook* and is based on Tick History Python Code Samples*:

With this in mind, let us embark on a little coding voyage.

Approach in Brief

  •  Authentication- token request
  •  Parametrize request (TickHistoryTimeAndSalesRequest)
  •  On Demand extraction request 
    • Define
    • Extraction status polling
    • Extraction notes retrieval
    • Parallelize
  •  Data retrieval and save to disk (includes AWS download)
    • Define
    • Parallelize

* see References section for the links to download example code

Import Required Modules and Define Required Parameters

This includes the definition of a "cookie-cutter" request:

    	
            

...

requestTSBodyStarter={      #to be parametrized with instruments and date range

    "ExtractionRequest": {

    "@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.TickHistoryTimeAndSalesExtractionRequest",

    "ContentFieldNames": [

        "Trade - Exchange/Contributor ID",

        "Trade - Price",

        "Trade - Volume",

        "Trade - Qualifiers",

        "Trade - Sequence Number",

        "Trade - Exchange Time",

        "Trade - Open",

        "Trade - High",

        "Trade - Low",

        "Quote - Bid Price",

        "Quote - Bid Size",

        "Quote - Ask Price",

        "Quote - Ask Size"

    ],

    "IdentifierList": {

    "@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.InstrumentIdentifierList",  

    "InstrumentIdentifiers": [

    { "Identifier": "CARR.PA", "IdentifierType": "Ric" }  #placeholder

    ],

    "ValidationOptions": {

        "AllowHistoricalInstruments": "true"

    },

    "UseUserPreferencesForValidationOptions": "false"

    },

    "Condition": {

        "MessageTimeStampIn": "GmtUtc",

        "ApplyCorrectionsAndCancellations": "true",

        "ReportDateRangeType": "Range",

        "QueryStartDate": "2021-08-04T00:00:00.000",  #placeholder

        "QueryEndDate": "2021-08-04T23:59:59.999",    #placeholder

        "DateRangeTimeZone" : "Local Exchange Time Zone",

        "DisplaySourceRIC": "false"

    }

    }

}

...

Read Credentials from File

It is not essential how credentials are acquired, they can be entered as input pramterese or hard-coded, most importantly, that we have them at hand, for the required authentication step.

    	
            

myUsername = "Valid"

myPassword = "ValidOnly"

 

# comment out the next lines, if the creds are hard-coded instead

def readCredsFromFile(filePathName):

    global myUsername, myPassword

    credFile = open(filePathName,"r")    # one per line

                                               

    myUsername = credFile.readline().rstrip('\n')

    myPassword = credFile.readline().rstrip('\n')

 

    credFile.close()

 

readCredsFromFile("..\creds\credsDSS.txt")

#print(myUsername)

Authentication - Datascope Token Request

    	
            

def requestToken(dssUsername, dssRassword) :

    requestUrl = reqStart + ".refinitiv.com/RestApi/v1/Authentication/RequestToken"

 

    requestHeaders={

        "Prefer":"respond-async",

        "Content-Type":"application/json"

        }

 

    requestBody={

        "Credentials": {

        "Username": myUsername,

        "Password": myPassword

    }

    }

 

    r1 = requests.post(requestUrl, json=requestBody,headers=requestHeaders)

 

    if r1.status_code == 200 :

        jsonResponse = json.loads(r1.text.encode('ascii', 'ignore'))

        token = jsonResponse["value"]

   #     print ('Authentication token (valid 24 hours):')

   #     print (token)

    else:

        print ('Replace myUserName and myPassword with valid credentials, then repeat the request')

        token = 'None'

    

    return token

 

tokenValid = requestToken(myUsername, myPassword)

print("Authentication token (valid 24 hours): "+tokenValid)

Parametrise Our Request

    	
            

#  RICs (instruments) and date ranges for request parametrization

request1Parameters = {

    "RICs": ["CARP.PA","IBM.N","MSFT.O","VOD.L"],

    "QueryStartDate": "2022-01-04T00:00:00.000",

    "QueryEndDate": "2022-01-04T03:59:59.999"

}

 

request2Parameters = {

    "RICs": ["JPY=","EUR=","U30YT=RR"],

    "QueryStartDate": "2021-01-04T00:00:00.000",

    "QueryEndDate": "2021-01-04T03:59:59.999"

}

 

def paramterizeRequest(requestBodyStarter, requestParameters):

    requestBody = copy.deepcopy(requestBodyStarter)

    i = 0

    for i, ric in enumerate(requestParameters["RICs"]):

#        print(i, ric)

        if i == 0:

            requestBody["ExtractionRequest"]["IdentifierList"]["InstrumentIdentifiers"][i]["Identifier"] = ric

            requestBody["ExtractionRequest"]["IdentifierList"]["InstrumentIdentifiers"][i]["IdentifierType"] = "Ric"

        else:

            requestBody["ExtractionRequest"]["IdentifierList"]["InstrumentIdentifiers"].append({ 

                'Identifier' : ric, 

                'IdentifierType' : 'Ric'}) 

    requestBody["ExtractionRequest"]["Condition"]["QueryStartDate"] = requestParameters["QueryStartDate"]

    requestBody["ExtractionRequest"]["Condition"]["QueryEndDate"] = requestParameters["QueryEndDate"]

 

    return requestBody

 

request1Body = paramterizeRequest(requestTSBodyStarter, request1Parameters)

print("requestBody ready to submit: \n"+json.dumps(request1Body, indent = 2))

request2Body = paramterizeRequest(requestTSBodyStarter, request2Parameters)

print("requestBody ready to submit: \n"+json.dumps(request2Body, indent = 2))

resulting in required requests to be ready to go:

    	
            requestBody ready to submit:
{
"ExtractionRequest": {
"@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.TickHistoryTimeAndSalesExtractionRequest",
"ContentFieldNames": [
"Trade - Exchange/Contributor ID",
"Trade - Price",
"Trade - Volume",
"Trade - Qualifiers",
"Trade - Sequence Number",
"Trade - Exchange Time",
"Trade - Open",
"Trade - High",
"Trade - Low",
"Quote - Bid Price",
"Quote - Bid Size",
"Quote - Ask Price",
"Quote - Ask Size"
],
"IdentifierList": {
"@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.InstrumentIdentifierList",
"InstrumentIdentifiers": [
{
"Identifier": "CARP.PA",
"IdentifierType": "Ric"
},
{
"Identifier": "IBM.N",
"IdentifierType": "Ric"
},
{
"Identifier": "MSFT.O",
"IdentifierType": "Ric"
},
{
"Identifier": "VOD.L",
"IdentifierType": "Ric"
}
],
"ValidationOptions": {
"AllowHistoricalInstruments": "true"
},
"UseUserPreferencesForValidationOptions": "false"
},
"Condition": {
"MessageTimeStampIn": "GmtUtc",
"ApplyCorrectionsAndCancellations": "true",
"ReportDateRangeType": "Range",
"QueryStartDate": "2022-01-04T00:00:00.000",
"QueryEndDate": "2022-01-04T03:59:59.999",
"DateRangeTimeZone": "Local Exchange Time Zone",
"DisplaySourceRIC": "false"
}
}
}

Send On-demand Extraction Request - Define

We define a helper function to issue a request, that can consequently be issued in parallel

    	
            

from functools import partial

 

async def submitRequest(token, requestBody, requestMarker):

 

    requestUrl=reqStart + '.refinitiv.com/RestApi/v1/Extractions/ExtractRaw'

 

    requestHeaders={

        "Prefer":"respond-async",

        "Content-Type":"application/json",

        "Authorization": "token " + token

    }

    

    loop = asyncio.get_event_loop()

 

    print("posting request "+ json.dumps(requestBody))

    requestP = partial(requests.post, requestUrl, json=requestBody,headers=requestHeaders)

    r2 = await loop.run_in_executor(None, requestP)

 

    #Display the HTTP status of the response

    #Initial response status (after approximately 30 seconds wait) is usually 202

    status_code = r2.status_code

    print (requestMarker, "HTTP status of the response: " + str(status_code))

 

    #if required, poll the status of the request using the received location URL.

    #Once the request has completed, retrieve the jobId and extraction notes.

 

    #If status is 202, display the location url we received, and will use to poll the status of the extraction request:

    if status_code == 202 :

        requestUrl = r2.headers["location"]

        print (requestMarker, 'Extraction is not complete, we shall poll the location URL:')

        print (str(requestUrl))

        

        requestHeaders={

            "Prefer":"respond-async",

            "Content-Type":"application/json",

            "Authorization":"token " + token

        }

 

    #As long as the status of the request is 202, the extraction is not finished;

    #we must wait, and poll the status until it is no longer 202:

    r3 = ""

    while (status_code == 202):

        print (requestMarker,'As we received a 202, we wait 30 seconds, then poll again (until we receive a 200)')

        time.sleep(30)

        requestG = partial(requests.get, requestUrl,headers=requestHeaders)

        r3 = await loop.run_in_executor(None, requestG)

        status_code = r3.status_code

        print (requestMarker, 'HTTP status of the response: ' + str(status_code))

 

    if r3 == "":

        r3 = r2

    #When the status of the request is 200 the extraction is complete;

    #we retrieve and display the jobId and the extraction notes (it is recommended to analyse their content)):

    if status_code == 200 :

        r3Json = json.loads(r3.text.encode('ascii', 'ignore'))

        jobId = r3Json["JobId"]

        print ('\njobId: ' + jobId + '\n')

        notes = r3Json["Notes"]

        print (requestMarker, 'Extraction notes:\n' + notes[0])

 

    #If instead of a status 200 we receive a different status, there was an error:

    if status_code != 200 :

        jobId = -1

        print (requestMarker, 'An error occurred.\n')

    

    return jobId

 

#job1Id = submitRequest(tokenValid, request1Body)

Send On-demand Extraction Request - Parallelize

This example code shows 2 requests issued in parallel.  We can issue more requests in parallel, using the same pre-defined helper function.  However, for TickHistoryTimeAndSalesRequest per Tick History REST API User Guide*, at the time of this writing, the number of concurrent processing requests is 2, same as for most other Tick History Templates.  The maximum number of the requests that can be submitted is 50.  

Therefore, per design, we should never submit more then 50 requests - maximum accepted.  If we wish to only submit requests that will be immediately accepted for processing, we should submit 2 requests at a time.  If we prefer to submit more and have some requests awaiting their turn, we can submit up to maximum allowed to be submitted, up to 50 requests.

    	
            

tasks = asyncio.gather(

    submitRequest(tokenValid, request1Body, 'request#1'),

    submitRequest(tokenValid, request2Body, 'request#2')

)

if we are running in Jupyter notebook, event loop is already running and our tasks are executing in parallel, for example:

    	
            posting request {"ExtractionRequest": {"@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.TickHistoryTimeAndSalesExtractionRequest", "ContentFieldNames": ["Trade - Exchange/Contributor ID", "Trade - Price", "Trade - Volume", "Trade - Qualifiers", "Trade - Sequence Number", "Trade - Exchange Time", "Trade - Open", "Trade - High", "Trade - Low", "Quote - Bid Price", "Quote - Bid Size", "Quote - Ask Price", "Quote - Ask Size"], "IdentifierList": {"@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.InstrumentIdentifierList", "InstrumentIdentifiers": [{"Identifier": "CARP.PA", "IdentifierType": "Ric"}, {"Identifier": "IBM.N", "IdentifierType": "Ric"}, {"Identifier": "MSFT.O", "IdentifierType": "Ric"}, {"Identifier": "VOD.L", "IdentifierType": "Ric"}], "ValidationOptions": {"AllowHistoricalInstruments": "true"}, "UseUserPreferencesForValidationOptions": "false"}, "Condition": {"MessageTimeStampIn": "GmtUtc", "ApplyCorrectionsAndCancellations": "true", "ReportDateRangeType": "Range", "QueryStartDate": "2022-01-04T00:00:00.000", "QueryEndDate": "2022-01-04T03:59:59.999", "DateRangeTimeZone": "Local Exchange Time Zone", "DisplaySourceRIC": "false"}}}
posting request {"ExtractionRequest": {"@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.TickHistoryTimeAndSalesExtractionRequest", "ContentFieldNames": ["Trade - Exchange/Contributor ID", "Trade - Price", "Trade - Volume", "Trade - Qualifiers", "Trade - Sequence Number", "Trade - Exchange Time", "Trade - Open", "Trade - High", "Trade - Low", "Quote - Bid Price", "Quote - Bid Size", "Quote - Ask Price", "Quote - Ask Size"], "IdentifierList": {"@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.InstrumentIdentifierList", "InstrumentIdentifiers": [{"Identifier": "JPY=", "IdentifierType": "Ric"}, {"Identifier": "EUR=", "IdentifierType": "Ric"}, {"Identifier": "U30YT=RR", "IdentifierType": "Ric"}], "ValidationOptions": {"AllowHistoricalInstruments": "true"}, "UseUserPreferencesForValidationOptions": "false"}, "Condition": {"MessageTimeStampIn": "GmtUtc", "ApplyCorrectionsAndCancellations": "true", "ReportDateRangeType": "Range", "QueryStartDate": "2021-01-04T00:00:00.000", "QueryEndDate": "2021-01-04T03:59:59.999", "DateRangeTimeZone": "Local Exchange Time Zone", "DisplaySourceRIC": "false"}}}
request#1 HTTP status of the response: 202
request#1 Extraction is not complete, we shall poll the location URL:
https://selectapi.datascope.refinitiv.com/RestApi/v1/Extractions/ExtractRawResult(ExtractionId='0x07e4d6b531fd8d94')
request#1 As we received a 202, we wait 30 seconds, then poll again (until we receive a 200)
request#2 HTTP status of the response: 202
request#2 Extraction is not complete, we shall poll the location URL:
https://selectapi.datascope.refinitiv.com/RestApi/v1/Extractions/ExtractRawResult(ExtractionId='0x07e4ecf094ed8df6')
request#2 As we received a 202, we wait 30 seconds, then poll again (until we receive a 200)

all that we need to do is await their completion:

    	
            

#asyncio.get_event_loop().run_until_complete(tasks)

print(await tasks)

job1Id, job2Id = tasks.result()

print(job1Id,job2Id)

Retrieve Results and Save to File - Define a Helper Function

  • Request the extraction results, using the known jobId
  • Save the compressed data as GZIP
  • Display a few lines of the result
    	
            

async def retrieveResults(token, jobId, resultMarker):

    requestUrl = reqStart + ".refinitiv.com/RestApi/v1/Extractions/RawExtractionResults" + "('" + jobId + "')" + "/$value"

 

    #AWS requires an additional header: X-Direct-Download

    if useAws:

        requestHeaders={

            "Prefer":"respond-async",

            "Content-Type":"text/plain",

            "Accept-Encoding":"gzip",

            "X-Direct-Download":"true",

            "Authorization": "token " + token

        }

    else:

        requestHeaders={

            "Prefer":"respond-async",

            "Content-Type":"text/plain",

            "Accept-Encoding":"gzip",

            "Authorization": "token " + token

        }

 

    r5 = requests.get(requestUrl,headers=requestHeaders,stream=True)

    #Ensure we do not automatically decompress the data on the fly:

    r5.raw.decode_content = False

    if useAws:

        print ('Content response headers (AWS server): type: ' + r5.headers["Content-Type"] + '\n')

        #AWS does not set header Content-Encoding="gzip".

    else:

        print ('Content response headers (TRTH server): type: ' + r5.headers["Content-Type"] + ' - encoding: ' + r5.headers["Content-Encoding"] + '\n')

    

    fileName = filePath + fileNameRoot + resultMarker +".csv.gz"

    print (resultMarker, 'Saving compressed data to file:' + fileName + ' ... please be patient')

   

    chunk_size = 1024

    rr = r5.raw

    with open(fileName, 'wb') as fd:

        shutil.copyfileobj(rr, fd, chunk_size)

    fd.close

 

    print ('Finished saving compressed data to file:' + fileName + '\n')

 

    #Now let us read and decompress the file we just created.

    #For the demo we limit the treatment to a few lines:

    maxLines = 10

    print ('Read data from file, and decompress at most ' + str(maxLines) + ' lines of it:')

 

    uncompressedData = ""

    count = 0

    with gzip.open(fileName, 'rb') as fd:

        for line in fd:

            dataLine = line.decode("utf-8")

            #Do something with the data:

            print (dataLine)

            uncompressedData = uncompressedData + dataLine

            count += 1

            if count >= maxLines:

                break

    fd.close()

    

#retrieveResults(tokenValid, job1Id, 'result1')

Retrieve Results and Save to File - Parallelize

Now all that is left for us to do is obtain the results: 

    	
            

tasks = asyncio.gather(

    retrieveResults(tokenValid, job1Id, 'result#1'),

    retrieveResults(tokenValid, job2Id, 'result#2')

)

#asyncio.get_event_loop().run_until_complete(tasks)

print(await tasks)

print("<<<All done>>>")

that should look something like:

    	
            

...

result#2 Saving compressed data to file:.\downloads\RTH.result#2.csv.gz ... please be patient
Finished saving compressed data to file:.\downloads\RTH.result#2.csv.gz

Read data from file, and decompress at most 10 lines of it:
#RIC,Domain,Date-Time,GMT Offset,Type,Ex/Cntrb.ID,Price,Volume,Bid Price,Bid Size,Ask Price,Ask Size,Qualifiers,Seq. No.,Exch Time,Open,High,Low

EUR=,Market Price,2021-01-04T00:00:00.343730591Z,+0,Quote,,,,1.2248,,1.2252,,,,,,,

EUR=,Market Price,2021-01-04T00:00:02.247725868Z,+0,Quote,,,,1.2248,,1.2251,,,,,,,

EUR=,Market Price,2021-01-04T00:00:02.260425617Z,+0,Quote,,,,1.2247,,1.2251,,,,,,,

EUR=,Market Price,2021-01-04T00:00:02.668642880Z,+0,Quote,,,,1.2247,,1.2251,,,,,,,

EUR=,Market Price,2021-01-04T00:00:03.560722752Z,+0,Quote,,,,1.2246,,1.225,,,,,,,

EUR=,Market Price,2021-01-04T00:00:04.659753809Z,+0,Quote,,,,1.2246,,1.225,,,,,,,

EUR=,Market Price,2021-01-04T00:00:04.995753384Z,+0,Quote,,,,1.2247,,1.225,,,,,,,

EUR=,Market Price,2021-01-04T00:00:05.656512860Z,+0,Quote,,,,1.2246,,1.225,,,,,,,

EUR=,Market Price,2021-01-04T00:00:06.659741562Z,+0,Quote,,,,1.2246,,1.225,,,,,,,

[None, None]
<<<All done>>>

this concludes our discussion for the moment, a good next step is downloading and running the example code listed in References