1. Home
  2. Article Catalog
  3. Getting Swiss Portfolio periodic tax returns using RDP Libraries

Getting Swiss Portfolio periodic tax returns using RDP Libraries

Raksina Samasiri
Developer Advocate Developer Advocate
Maysa Wittayanontawet
Senior Data Scientist Senior Data Scientist
Hathaipath Inlub
Data Scientist Data Scientist
Kittinee Thamruengthong
Data Scientist Data Scientist

Overview

Why do we need to get the Swiss Tax Data attributes?

A Swiss Portfolio Manager is required to make periodic tax returns to the Swiss Federal Tax Authority. In order to calculate the appropriate taxation rates on these tax returns, the portfolio manager needs some key attributes alongside the basic instrument reference data that they already have.
In this article, we're going to retrieve the Swiss Tax Data attributes provided by Refinitiv, for each instrument of a mixed-asset portfolio (bonds, equities, and funds) of 2,000 instruments

  •  Instrument ID
  •  Issuer Domicile Text
  •  Swiss Stamp Duty Flag
  •  Swiss Stamp Tax Rate
  •  Taxation Comment
  •  Taxation Type

We're using Refinitiv Data Platform Libraries (RDP Libraries) to retrieve the data from these RDP APIs

  •  Symbology API: To convert the identifiers supplied in the instrument list into PermIDs
  •  Data Store GraphQL API: To request Swiss Tax Data for the instruments requested

Prerequisite

The code can be run on Jupyter Notebook that has Refinitiv Data Platform (RDP) Python library installed and ready to be used. To run examples in this article,
        -  Access Credentials are required to use RDP libraries. For more detail, please check this page RDP libraries - Access Credentials. In this article, the authentication is being done using Platform session that allows you to access content directly within the cloud utilizes the OAuth 2.0 specification to ensure secure communication.  While the libraries will shield the user from OAuth token management, the following credentials will be required to access content using the Platform Session:

User ID / Machine ID A User ID or Machine ID provided to you.
Password The User / Machine Password provided to you.
App Key An Application Key used to monitor the application. Users can generate/manage their application ID's here.

To acquire your Platform credentials, you will need to reach out to your Refinitiv Account Manager.

        -  Python version 3.9.6
        -  Required python libraries:
                -  refinitiv.dataplatform==1.0.0a10
                -  pandas==1.3.5
                -  logging==0.5.1.2
                -  asyncio==3.4.3
                -  json==2.0.9
                -  datetime==4.3
                -  copy
                -  collections
                -  time
                -  configparser
                -  functools
                -  contextlib
                -  traceback
                -  os
        -  Input files
                1) credentials.ini:
 credentials file that contains RDP credentials

    	
            [RDP]
username = #RDP_USER_ID/MACHINE_ID#
password = #RDP_PASSWORD#
clientId = #RDP_APP_KEY#
uuid = 

                2) Portfolio.csv CSV file containing the instrument types (Isin, Wpk, ValorenNumber, Sedol) and instrument codes
The example of Portfolio.csv file can be found in this GitHub Repository.

To find how to form an API call, search for an endpoint in RDP API Playground then check Playground tab and Reference tab

These 2 endpoints of RDP APIs are being used to retrieve the data

1) Symbology API (/discovery/symbology/v1/lookup) to retrieve PermIDs of input instruments (in various instrument types) or check Symbology User Guide
2) With output of the Symbology API, calling Data Store GraphQL API (/data-store/v1/graphql) to get Swiss Tax Data.

5 steps to retrieve the data

Step 1) Preparation

1.1 ) Import necessary Python libraries
1.2) Create function read_input_file to import input CSV file (Portfolio.csv) and convert it to Python dictionary with instrument type as a key and code as a value

    	
            # ==============================================
def read_input_file(input_file_name):
# ==============================================
    """Import input file (Portfolio.csv) and convert it to Python dictionary with instrument type as a key and code as a value"""
    port = pd.read_csv(f".\\{input_file_name}", header=None)
    port.rename(columns={0: "type", 1: "id"}, inplace=True)
    port = port.drop_duplicates()
    
    # map instrument type
    id_dict = defaultdict(list)
    for row in port.itertuples():
        id_dict[row[1]].append(row[2])
    return id_dict

Step 2) Authentication

Read RDP credetials from file credentials.ini and open the platform session with function authenticate

    	
            CREDENTIALS_FILE = "credentials.ini"
# ==============================================
def authenticate():
# ==============================================
    """Read RDP credetials from CREDENTIALS_FILE file (credentials.ini) and open the platform session"""
    global session

    try:
        print("Read credentials from file")

        config = configparser.ConfigParser()
        config.read(CREDENTIALS_FILE)

        APP_KEY = config["RDP"]["clientId"]
        RDP_LOGIN = config["RDP"]["username"]
        RDP_PASSWORD = config["RDP"]["password"]

        grant = rdp.GrantPassword(username=RDP_LOGIN, password=RDP_PASSWORD)
        session = rdp.open_platform_session(APP_KEY, grant)
        session.set_log_level(logging.WARNING)
        
    except Exception as e:
        print(
            f"""Error message : {e}
            Cannot get credentials from a file, please create the credentials file as 'credentials.ini' with the format below, 
                [RDP]
                username = YOUR_RDP_USER@EMAIL_DOMAIN.com
                password = YOUR_PASSWORD
                clientId = YOUR_APP_KEY"""
        )

Step 3) Convert Symbology

3.1 ) Generate API Payload with get_payloads function, which dividing the instruments into batches with 1,500 instruments per each batch due to Symbology API input limit (maximum 1,500 instruments per call)
3.2) With the payload from previous step, call Symbology API to get instruments' PermID with get_symbols function

    	
            # ==============================================
def get_payloads(id_dict_ori, limit):
# ==============================================
    """Generate API Payload
    dividing the instruments into batches with 1,500 instruments per each batch
    due to Symbology API input limit (maximum 1,500 instruments per call)"""
    id_dict = deepcopy(id_dict_ori)
    inputs = []
    payloads = []
    count = 0
    initial = 0
    while any(len(value) for value in id_dict.values()):
        for key, value in id_dict.items():
            count += len(value)
            if count > limit:
                ids = value[initial : len(value) + limit - count]
                id_dict[key] = value[len(value) + limit - count :]
                initial += limit
                count = 0
                input_dict = {"identifierTypes": [key], "values": ids}
                inputs.append(input_dict)
                payloads.append(inputs)
                inputs = []
                break
            else:
                input_dict = {"identifierTypes": [key], "values": value}
                inputs.append(input_dict)
                id_dict[key] = value[len(value) :]
    payloads.append(inputs)
    return payloads

# ==============================================
def get_symbols(payloads):
# ==============================================
    """Call Symbology API to get instruments' PermID from an output of get_payloads function"""
    symbols = []
    for payload in payloads:
        endpoint = rdp.Endpoint(
            session=rdp.get_default_session(),
            url="https://api.refinitiv.com/discovery/symbology/v1/lookup",
        )
        response = endpoint.send_request(
            method=rdp.Endpoint.RequestMethod.POST,
            body_parameters={
                "from": payload,
                "to": [
                    {"objectTypes": ["anyinstrument"], "identifierTypes": ["PermID"]}
                ],
                "type": "auto",
            },
        )
        symbols.extend(response.data.raw["data"])
    return symbols

Step 4) Data Store GraphQL API - retrieve Swiss Tax Data

These function are being used
2.1 ) convert_symbols_to_dict: convert the Symbology API output to Python dictionary data type and create a dataframe of its input/output
2.2) gen_qraphql_payload_get_data: generate GraphQL payload for calling the endpoint
        2.2.1) get_graphql_data_rdp_async: call the endpoint as a batch with 200 instruments per each batch due to the Data Store GraphQL API input limit (maximum 200 instruments per call)
2.3) format_graphql_response_thread: convert an output to a dataframe and merge it with the dataframe of Symbology API's input/output
2.4) save_as_csv: save an output dataframe as a CSV file named swiss_tax_result_{current_time}.csv in the folder swiss_tax_result

    	
            gql_limit = 200
# ==============================================
def convert_symbols_to_dict(all_symbols):
# ==============================================
    """Convert the Symbology API output to Python dictionary data type and create a dataframe of its input/output"""
    # response symbology
    symbol = []
    for index, value in enumerate(all_symbols):
        if value["output"]:
            symbol.append(value)
    response_df = pd.DataFrame()
    for index, data in enumerate(symbol):
        input_list = []
        output_list = []
        data["input"][0]["instrument"] = data["input"][0].pop("value")
        if len(data["output"]) > 1:
            for row in range(len(data["output"])):
                input = pd.DataFrame.from_dict(data["input"][0], orient="index").T
                input_list.append(input)
            raw_input_df = pd.concat(input_list, ignore_index=True, sort=False)
        else:
            raw_input_df = pd.DataFrame.from_dict(data["input"][0], orient="index").T
        if len(data["output"]) > 1:
            for row in range(len(data["output"])):
                output = pd.DataFrame.from_dict(data["output"][row], orient="index").T
                output_list.append(output)
            raw_output_df = pd.concat(output_list, ignore_index=True, sort=False)

        else:
            raw_output_df = pd.DataFrame.from_dict(data["output"][0], orient="index").T
        raw_df = pd.concat([raw_input_df, raw_output_df], axis=1)
        response_df = response_df.append(raw_df, ignore_index=True, sort=False)
    # logging.info(f'response dataframe \n{response_df}\n')
    object_id_dict = defaultdict(list)
    for row in response_df.itertuples():
        object_id_dict[row[5]].append(row[3])
    
    column_names = response_df.columns.values
    column_names[0] = "Input ID type"
    response_df.columns = column_names
    io_mapping = response_df.loc[:,["Input ID type", "instrument", "value"]]
    io_mapping.rename(columns={"instrument": "Input ID", "value": "PermID"}, inplace=True)
    
    return object_id_dict, io_mapping

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i : i + n]
    	
            # ==============================================
def gen_qraphql_payload_get_data(object_id_dict):
# ==============================================
    """Generate GraphQL payload for calling the endpoint"""
    gql_output = {}
    gql_class_mapping = {
        "FundShareClass": "DerivedContentLipperFundClass",
        "EDInstrument": "DerivedContentEDFInstrument",
        "GovCorpInstrument": "DerivedContentGovCorpBonds",
    }
    cnt = 0
    for query_type, class_name in gql_class_mapping.items():
        several_tasks = asyncio.gather(
            *[
                get_graphql_data_rdp_async(
                    class_name, json.dumps(chunk_instruments), cnt
                )
                for chunk_instruments in chunks(object_id_dict[query_type], gql_limit)
            ]
        )
        loop = asyncio.get_event_loop()
        gql_output[class_name] = loop.run_until_complete(several_tasks)

        cnt += 1
    return gql_output
    	
            # ==============================================
def format_graphql_response_thread(gql_output,io_mapping):
# ==============================================
    """Convert an output to a dataframe and merge it with the dataframe of Symbology API's input/output"""
    raw_output_df = pd.DataFrame()
    response_list = []
    for gql_class in gql_output:
        for response, class_name in gql_output[gql_class]:
            raw_response = response["data"]
            for raw in raw_response[class_name]:
                output = pd.DataFrame.from_dict(raw, orient="index")
                output["class_name"] = class_name
                response_list.append(output)
            raw_output_df = pd.concat(response_list, ignore_index=True, sort=False)
            
    raw_output_df = io_mapping.merge(raw_output_df, left_on="PermID", right_on="InstrumentId", how="inner")
    raw_output_df = raw_output_df.drop(columns=["InstrumentId"])
    raw_output_df = raw_output_df.drop_duplicates()
    
    return raw_output_df
    	
            # ==============================================
def save_as_csv(df, folder_name, file_name):
# ==============================================
    """Save an output dataframe as a CSV file named swiss_tax_result_{current_time}.csv in the folder swiss_tax_result"""
    print(f"Save output dataframe as CSV file named: {file_name}")
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
    df.to_csv(os.path.join(folder_name,file_name), index=False)

Step 5) Run the defined functions

    	
            # using now() to get current time for output file name
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
input_file_name = 'Portfolio.csv'
swiss_tax_folder_name = "swiss_tax_result"
swiss_tax_file_name = f"swiss_tax_result_{current_time}.csv"

try:
    # setting up
    authenticate()
    start = time.time()
    
    # read input file
    id_dict = read_input_file(input_file_name)

    # request symbology
    payloads = get_payloads(id_dict, limit=1500)
    all_symbols = get_symbols(payloads)

    # call graphql endpoint
    object_id_dict, io_mapping = convert_symbols_to_dict(all_symbols)
    gql_output = gen_qraphql_payload_get_data(object_id_dict)
    swiss_tax_df = format_graphql_response_thread(gql_output, io_mapping)

    # show runtime
    end = time.time()
    print(f"Runtime of the app is {end - start}")
    session.close()

    # save to csv file
    save_as_csv(swiss_tax_df, swiss_tax_folder_name, swiss_tax_file_name)

    display(swiss_tax_df)

except Exception as e:
    session.close()
    logging.error(f'App failed with error message : {e}')
    print(traceback.print_exc())

Here's the output dataframe of Swiss Tax Data

Output CSV files are saved with date and timestamp of the run in swiss_tax_result folder

Conclusion

From this code, we can retrieve Swiss Tax data from the input instruments provided in a CSV file using RDP Libraries.
The full code can be found in this GitHub Repository.