Introduction

This article shows how to writing the Elektron WebSocket API application with Python language to consume Machine Readable News (MRN) from Thomson Reuters Enterprise Platform (TREP), then assemble and decode MRN textual news message to display news in a console/Jupyter Notebook.

Refinitiv Machine Readable News (MRN) is an advanced service for automating the consumption and systematic analysis of news. It delivers deep historical news archives, ultra-low latency structured news and news analytics directly to your applications. This enables algorithms to exploit the power of news to seize opportunities, capitalize on market inefficiencies and manage event risk.

MRN Overview

MRN is published over Elektron using an Open Message Model (OMM) envelope in News Text Analytics domain messages. The Real-time News content set is made available over MRN_STORY RIC. The content data is contained in a FRAGMENT field that has been compressed, and potentially fragmented across multiple messages, in order to reduce bandwidth and message size.

A FRAGMENT field has a different data type based on a connection type:

  • RSSL connection (ESDK C++ and Java editions): BUFFER type
  • WebSocket connection: Base64 ascii string

The data goes through the following series of transformations:

  1. The core content data is a UTF-8 JSON string
  2. This JSON string is compressed using gzip
  3. The compressed JSON is split into a number of fragments (BUFFER or Base64 ascii string) which each fit into a single update message
  4. The data fragments are added to an update message as the FRAGMENT field value in a FieldList envelope

Therefore, in order to parse the core content data, the application will need to reverse this process.

Data Model

Five fields, as well as the RIC itself, are necessary to determine whether the entire item has been received in its various fragments and how to concatenate the fragments to reconstruct the item:

  • MRN_SRC: identifier of the scoring/processing system that published the FRAGMENT;
  • GUID: globally unique identifier for the data item, all messages for this data item will have the same GUID value;
  • FRAGMENT: compressed data item fragment;
  • TOT_SIZE: total size in bytes of the fragmented data;
  • FRAG_NUM: sequence number of fragments within a data item, which is set to 1 for the first fragment of each item published

and is incremented for each subsequent fragment for the same item. A single MRN data item publication is uniquely identified by the combination of RIC name, MRN_SRC, and GUID.

Fragmentation

For a given RIC/MRN_SRC/GUID combination, when a data item requires only a single message, then TOT_SIZE will equal the number of bytes length in the FRAGMENT and FRAG_NUM will be 1.

When multiple messages are required, then the data item can be deemed as fully received once the sum of the number of bytes length of each FRAGMENT equals TOT_SIZE. The consumer will also observe that all FRAG_NUM range from 1 to the number of the fragment, with no intermediate integers skipped. In other words, a data item transmitted over three messages will contain FRAG_NUM values of 1, 2 and 3.

The WebSocket application also need to convert the FRAGMENT field data from Base64 strin to bytes data before further process this field. Python application can uses base64 module to decode Base64 string to bytes data.

Compression

The FRAGMENT field is compressed with gzip compression, thus requiring the consumer to decompress to reveal the JSON plain-text data in that FID.

When an MRN data item is sent in multiple messages, all the messages must be received and their FRAGMENTs concatenated before being decompressed. In other words, the FRAGMENTs should not be decompressed independently of each other.

The decompressed output is encoded in UTF-8 and formatted as JSON. Python application can uses zlib module to decompress JSON string.

How to process MRN data

Once the application has established a connection and requested MRN RIC data from TREP, the application can process incoming MRN data message with the following flow:

Please see more detail in Code Walkthrough section below.

Code Walkthrough

Requesting data

The application can subscribe MRN data with the NewsTextAnalytics domain and the following MRN-specific RIC name:

  • MRN_STORY: Real-time News
  • MRN_TRNA: News Analytics: Company and C&E assets
  • MRN_TRNA_DOC: News Analytics: Macroeconomic News & events
  • MRN_TRSI: News Sentiment Indices
# Global Default Variables
mrn_domain = 'NewsTextAnalytics'
mrn_item = 'MRN_STORY'

def send_mrn_request(ws):
    """ Create and send MRN request """
    mrn_req_json = {
        'ID': 2,
        "Domain": mrn_domain,
        'Key': {
            'Name': mrn_item
        }
    }

    ws.send(json.dumps(mrn_req_json))
    print("SENT:")
    print(json.dumps(mrn_req_json, sort_keys=True, indent=2, separators=(',', ':')))

Handle Refresh Message

The MRN Refresh response message does not contain any news or fragment information. It contains the relevant feed related or other static Fields. The application just print out each incoming field data in a console for informational purpose.

def processRefresh(ws, message_json):

    print("RECEIVED: Refresh Message")
    decodeFieldList(message_json["Fields"])

def decodeFieldList(fieldList_dict):
    for key, value in fieldList_dict.items():
        print("Name = %s: Value = %s" % (key, value))

Example result of MRN Refresh response message is following:

RECEIVED: Refresh Message
Name = PROD_PERM: Value = 10001
Name = ACTIV_DATE: Value = 2019-07-20
Name = RECORDTYPE: Value = 30
Name = RDN_EXCHD2: Value = MRN
Name = TIMACT_MS: Value = 37708132
Name = GUID: Value = None
Name = CONTEXT_ID: Value = 3752
Name = DDS_DSO_ID: Value = 4232
Name = SPS_SP_RIC: Value = .[SPSML2L1
Name = MRN_V_MAJ: Value = 2
Name = MRN_TYPE: Value = STORY
Name = MRN_V_MIN: Value = 10
Name = MRN_SRC: Value = HK1_PRD_A
Name = FRAG_NUM: Value = 1
Name = TOT_SIZE: Value = 0
Name = FRAGMENT: Value = None

Handle Update Messages

The Update response messages contain news information and fragment(s) data. Firstly, the application gets FRAGMENT, FRAG_NUM, GUID and MRN_SRC fields values from the Update message. We use Python's base64 module to decode a FRAGMENT field value from ascii string to bytes data.

def processMRNUpdate(ws, message_json):  # process incoming News Update messages

    fields_data = message_json["Fields"]
    # declare variables
    tot_size = 0
    guid = None

    # Get data for all requried fields
    fragment = base64.b64decode(fields_data["FRAGMENT"])
    frag_num = int(fields_data["FRAG_NUM"])
    guid = fields_data["GUID"]
    mrn_src = fields_data["MRN_SRC"]

Process the first fragment/single fragment news

Next, we check if a FRAG_NUM = 1 which means it is the first fragment of this news message. The application then checks if a FRAGMENT bytes length = TOT_SIZE field.

  • If equal, this is a single fragment news and this news message is completed
  • If not, this is a first fragment of multi-fragments news. We store this Update message's all fields data to the _news_envelopes list object and waiting for the next fragment(s).
def processMRNUpdate(ws, message_json):  # process incoming News Update messages
    ...
    # Get data for all requried fields
    ...
    if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details
        pass
    else:  # FRAG_NUM = 1 The first fragment
            tot_size = int(fields_data["TOT_SIZE"])
            print("FRAGMENT length = %d" % len(fragment))
            # The fragment news is not completed, waiting and add this news data to envelop object.
            if tot_size != len(fragment):
                print("Add new fragments to news envelop for guid %s" % guid)
                _news_envelopes.append({  # the envelop object is a Python dictionary with GUID as a key and other fields are data
                    "guid": guid,
                    "data": {
                        "fragment": fragment,
                        "mrn_src": mrn_src,
                        "frag_num": frag_num,
                        "tot_size": tot_size
                    }
                })
                return None

Process multi-fragments news

When the application receives Update message with FRAG_NUM > 1, it means this message is mult-fragments news. We get the previous fragment(s) data from _news_envelopes list object via a GUID. Please note that Update messages with FRAG_NUM >1 will contain fewer fields as the metadata has been included in the first Update message (FRAG_NUM=1) for that news message.

The application also needs to check the validity of the received fragment by checking the MRN_SRC and FRAG_NUM order with previous fragments. If the received fragment is valid, the application assembles a received FRAGMENT bytes data with previous fragments and compare a total fragments byte length value with TOT_SIZE value.

  • If equal, it means this multi-fragments news message is completed
  • If not, we update current news Update message fields to the _news_envelopes list object and waiting for more fragment(s).
def processMRNUpdate(ws, message_json):  # process incoming News Update messages
    ...
    # Get data for all requried fields
    ...
    if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details
        guid_index = next((index for (index, d) in enumerate(
                _news_envelopes) if d["guid"] == guid), None)
        envelop = _news_envelopes[guid_index]
        
        if envelop and envelop["data"]["mrn_src"] == mrn_src and frag_num == envelop["data"]["frag_num"] + 1:
            print("process multiple fragments for guid %s" % envelop["guid"])
            
            # Merge incoming data to existing news envelop and getting FRAGMENT and TOT_SIZE data to local variables
            fragment = envelop["data"]["fragment"] = envelop["data"]["fragment"] + fragment
            envelop["data"]["frag_num"] = frag_num
            tot_size = envelop["data"]["tot_size"]
            print("TOT_SIZE = %d" % tot_size)
            print("Current FRAGMENT length = %d" % len(fragment))

            # The multiple fragments news are not completed, waiting.
            if tot_size != len(fragment):
                return None
            # The multiple fragments news are completed, delete assoiclate GUID envelop
            elif tot_size == len(fragment):
                del _news_envelopes[guid_index]
    else:  # FRAG_NUM = 1 The first fragment
        ...

Handle a completed news FRAGMENT message

To unzip the content and get JSON string news message, we use Python's zlib module to unzip the gzip bytes data, then we parse it to JSON message.

def processMRNUpdate(ws, message_json):  # process incoming News Update messages
    ...
    # Get data for all requried fields
    ...
    if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details
        ...
    else:  # FRAG_NUM = 1 The first fragment
        ...

    # News Fragment(s) completed, decompress and print data as JSON to console
    if tot_size == len(fragment):
        print("decompress News FRAGMENT(s) for GUID  %s" % guid)
        decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32)
        print("News = %s" % json.loads(decompressed_data))

Example Result

The example application supports console, Docker and classic Jupyter Notebook environments. Please check the README.md file in the source code project for more detail regarding how to run the application in each environment.

Note: The news message is in UTF-8 JSON string format. Some news messages that contains special unicode character may not be able to show in Windows OS console (cmd, git bash, powershell, etc) due to the OS limitation. Those messages will be print as UnicodeEncodeError exception. Cannot decode unicode character message in a console instead.

RECEIVED: 
[
  {
    "DoNotCache":true,
    "DoNotConflate":true,
    "DoNotRipple":true,
    "Domain":"NewsTextAnalytics",
    "Fields":{
      "ACTIV_DATE":"2019-08-14",
      "FRAGMENT":"H4sIAAAAAAAC/41W727iRhD/3qcY+UN1lRJigzFg6dQScAg5Y6jtkN6VqlrwAtvYa253HYKqvkufpU/WWa+T0JPaXqQws7Mzs/Pnt7P+3SK5mmaWb/E4iex2t3NHrAuLVBmjfEOl5f9sRQs/TIJojPKGtX65sNZldkKzm1LAmkoFgsoqVxKOe8rhIBhXjO9A7ZkEwnlZobeCcnUBh5wSSWGTs80jlBxyxh/RRV4e/RXfK3Xwr64O2bYlaKWokK1NWVztVZFzepSvTIvIw/fsvduxu4NNZ721aafXdZ1vq/eV4L7WKHL/zIPftp2B3Xdc/y3PFV/xOEogqoo1FeBDLV7xW8ozNMPQhjnac6LYE4VECaIgxUwX4WjFHReG1a7Cpfa84n/9af5vsVBBnMwjmEbLIElnQZTCzX00TiCczqZpMD7XGYZpEEfDdLoMIEnjYRpMpkECaXyfpOackO5IDgFWU51gmmEF2ZZR4UPb6fRt+84Lw/lo2Bndj2+8szC+DO8/cqI7RiWkQisf8g28W1lqT2FUFgfCTyvruxUf7QnfUVAljJmgG4U9n/JtKQp0U3Jz4pQD2WxKkRHsNByZ2kPIZA2CuMopDFpey3Ev4Mz3KzAkSrG4MwETQfakWPF5nonylKGfPEdwADkcSoQUzYAgnoCX/JI+001Vp5G9BFVu4Xo2h0lerrFoSUHynAoMvz5OZ6nzq0Oj2y2awFaUBTgwR2sNAV2r1lsRNba3lcCIBbC3fN8gjP79pu00p5zALRFP9GQsCc80vNd0T/IthnbehIRuBFVEMB0mFU9M1yBkBcMMV/xl96Qr/vWdW/EUozjsS041lm3o9Z0+5tTumhhDUgkC6b4siNT9esJrq69k42ERw4xwstMVuyO8kmdHG+VSyH85w+t4uKPv+lmdAJcHUT6xDNu2PgHeNdN+fT1BmrR1z7QsLHmmC6NKnArB86ZGXEvbaDfYfnRk3GjtG8YRZrp6I7SrsJPDSu1LoS8J1oygoMbJQrBC1/EMrSirQ9ItrX3dc111+IBIzcqiBSkVhay7t0HfTNtInG45MQOtrI0qbL9WyRDigq2r2nOdyhc1KMhJR5+fWvC/cCq5wshXXHD5Qy7pTg8ueFcQlqvSPxfilQR09sQkQ/Xj8djCXaPdDNAz2RVqN6DGCb5lQqqRoIgePfU14i/t/qXjpnbf73b8rt0aOPYn1NxTkuFspqgVBxO4/CcSm3mI4teRMKabnIg6JzRn2v3rrP3VTN9296fPd/chG39y6cfQO8nlLpuxeed5xx7j43IcTj5MqaOtuVR6kMj5Vj9Bcew7tu3pF2jqB/fxVTCM61coR5hUiFk8i+pTCyolLtPTAUVtXLOiWViKPqur58tDThi/3LJnzP/CavApcD9K9NumZdU6UURV+PhZGIXyK0nWOcUdWa1/w1zrV/Had9pdlCF1e4Z6A6dm2l5DezbSid+pf3t3NRni78x3fhwb+hDVtNM25LYmuvwL320Puv2B53l6L/Zvh0naCuudjus6rtdz3V6vXndt/ed5g/7bstdtu9ow8J36172uiZcasqxJYIJY2oZ+6jW031ATXPu6DuDudtLSgqjtL+IgSWLDX8+TxDHsLJpMk6RRWYSz+fJFZRiPG51kOhnNX0yHo9RwN9MosfuGx4czPeffNAw3QqHhHoJ7w0yuDUVozBfNOfFy1BhEwUNzIAK58TFbRB8NG5rvGUUeaUI/V/qzx/KdC6sSO+TxG6dzYT0h7hHWX3Fv/vjmb4qautlTCQAA",
      "FRAG_NUM":1,
      "GUID":"RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1",
      "MRN_SRC":"HK1_PRD_A",
      "MRN_TYPE":"STORY",
      "MRN_V_MAJ":"2",
      "MRN_V_MIN":"10",
      "TIMACT_MS":35640745,
      "TOT_SIZE":1335
    },
    "ID":2,
    "Key":{
      "Name":"MRN_STORY",
      "Service":"ELEKTRON_DD"
    },
    "PermData":"AwhCEBVM",
    "SeqNumber":13678,
    "Type":"Update",
    "UpdateType":"Unspecified"
  }
]
FRAGMENT length = 1335
decompress News FRAGMENT(s) for GUID  RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1
News = {'altId': 'nRSN0253Ja', 'audiences': ['NP:LSEND', 'NP:LSEN'], 'body': 'For best results when printing this announcement, please click on link below:\nhttp://pdf.reuters.com/htmlnews/htmlnews.asp?i=43059c3bf0e37541&u=urn:newsml:reuters.com:20190814:nRSN0253Ja\n\nRNS Number : 0253J\nHenderson Alternative Strat Tst PLC\n14 August 2019\n\xa0\n\xa0\nHENDERSON INVESTMENT FUNDS LIMITED\nHENDERSON ALTERNATIVE STRATEGIES TRUST PLC\nLegal Entity Identifier: 213800J6LLOCA3CUDF69\n\xa0\n\xa0\n14 August 2019\nHenderson Alternative Strategies Trust plc ("the Company")\nChange to Director Information\n\xa0\nIn accordance with Listing Rule 9.6.14, the Company announces that Mr Graham\nOldroyd will be appointed as a non-executive director of BMO Global Smaller\nCompanies plc with effect from 1 October 2019.\n\xa0\n\xa0\nFor further information, please call:\n\xa0\nHelena Harvey\nFor and on behalf of\nHenderson Secretarial Services Limited\nSecretary to Henderson Alternative Strategies Trust plc\nTelephone: 020 7818 2025\n\xa0\nLaura Thomas\nInvestment Trust PR Manager\nJanus Henderson Investors\nTelephone: 020 7818 2636\nThis information is provided by RNS, the news service of the London Stock Exchange. RNS is approved by the Financial Conduct Authority to act as a Primary Information Provider in the United Kingdom. Terms and conditions relating to the use and distribution of this information may apply. For further information, please contact\nrns@lseg.com (mailto:rns@lseg.com)\n or visit\nwww.rns.com (http://www.rns.com/)\n.\n\xa0\n', 'firstCreated': '2019-08-14T08:53:50.910Z', 'headline': 'REG - Henderson Alt Strat  - Director Declaration', 'id': 'RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1', 'instancesOf': ['RR:1006', 'NI:EUR/EARN'], 'language': 'en', 'messageType': 2, 'mimeType': 'text/x-plain-fixed', 'provider': 'NS:LSE', 'pubStatus': 'stat:usable', 'subjects': ['B:125', 'B:147', 'B:1691', 'B:261', 'B:270', 'G:3', 'G:7J', 'G:A', 'M:1QD', 'M:1WN', 'M:32', 'M:3H', 'M:Z', 'P:4295896662', 'R:HAST.L', 'P:34414674477', 'P:5000006698', 'P:5000075242', 'E:1', 'E:4B', 'E:6T', 'E:6V', 'E:E', 'M:1V0', 'M:1Z7', 'M:1Z8', 'M:1ZN', 'M:2B', 'R:JHG.N', 'N2:PRESSR', 'N2:BOSS1', 'N2:MNGISS', 'N2:PPLMOV', 'N2:BOARD1', 'N2:SIGCOR', 'N2:BACT', 'N2:FINS08', 'N2:INVT08', 'N2:INVT', 'N2:FINS', 'N2:CINV', 'N2:WEU', 'N2:GB', 'N2:EUROP', 'N2:SRVCS', 'N2:NEWR', 'N2:REG', 'N2:CMPNY', 'N2:LEN'], 'takeSequence': 1, 'urgency': 3, 'versionCreated': '2019-08-14T08:53:50.910Z'}

References

For any question related to this example or Elektron WebSocket API, please use the Developer Community Q&A Forum.