Introduction to Machine Readable News with WebSocket API 

Wasin Waeosri
Developer Advocate Developer Advocate

Introduction

Update: December 2021

This article shows how developers may use the Websocket API for Pricing Streaming and Real-Time Service aka Websocket API application to subscribe Machine Readable News (MRN) from Refinitiv Real-Time Distribution System (Refinitiv Real-Time Advanced Data Hub and Refinitiv Real-Time Advanced Distribution Server), then assemble and decode MRN textual news message to display news in a console/Jupyter Notebook.

Refinitiv Machine Readable News (MRN) is an advanced service for automating the consumption and systematic analysis of news. It delivers deep historical news archives, ultra-low latency structured news, and news analytics directly to your applications. This enables algorithms to exploit the power of news to seize opportunities, capitalize on market inefficiencies, and manage event risk.

Update (As of December 2021): The GitHub example's Main/Master branch now supports the Refinitiv Real-Time -- Optimized (RTO - formerly known as ERT in Cloud) connection. You do not need to switch to the ERT-in-Cloud branch anymore.

  • The RTO console example: Please check my colleague's Refinitiv-API-Samples/Example.WebSocketAPI.Python.MRN.RTO GitHub Repository.
  • The RTO notebook example: mrn_notebook_app_rto.ipynb notebook file.
  • The deployed Refinitiv Real-Time Distribution System (RTDS) examples are mrn_console_app.py console application and mrn_notebook_app.ipynb notebook files.

WebSocket API Overview and Prerequisite

The WebSocket API is direct access that enables easy integration into a multitude of client technology environments such as scripting and web.  This API runs directly on your Refinitiv Real-Time Distribution System infrastructure or the on the cloud and presents data in an open (JSON) readable format.

This example is focusing on the Refinitiv Machine Readable News (MRN) data processing only. I highly recommend you check the WebSocket API Tutorials page if you are not familiar with WebSocket API.

The Tutorials page provides a step-by-step guide (connect, log in, request data, parse data, etc) for developers who are interested in developing a WebSocket application to consume real-time data from Refinitiv Real-Time. The Tutorials also cover both Refinitiv Real-Time Optimized (formerly known as ERT in Cloud) and deployed Refinitiv Real-Time Distribution System infrastructure connection scenarios. 

MRN Overview

MRN is published over Refinitiv Real-Time using an Open Message Model (OMM) envelope in News Text Analytics domain messages. The Real-time News content set is made available over MRN_STORY RIC. The content data is contained in a FRAGMENT field that has been compressed and potentially fragmented across multiple messages, to reduce bandwidth and message size.

A FRAGMENT field has a different data type based on a connection type:

  • RSSL connection (RTSDK C++/Java): BUFFER type
  • WebSocket connection: Base64 ASCII string

The data goes through the following series of transformations:

  1. The core content data is a UTF-8 JSON string
  2. This JSON string is compressed using gzip
  3. The compressed JSON is split into several fragments (BUFFER or Base64 ASCII string) which each fit into a single update message
  4. The data fragments are added to an update message as the FRAGMENT field value in a FieldList envelope

Therefore, to parse the core content data, the application will need to reverse this process.

Data Model

Five fields, as well as the RIC itself, are necessary to determine whether the entire item has been received in its various fragments and how to concatenate the fragments to reconstruct the item:

  • MRN_SRC: identifier of the scoring/processing system that published the FRAGMENT;
  • GUID: globally unique identifier for the data item, all messages for this data item will have the same GUID value;
  • FRAGMENT: compressed data item fragment;
  • TOT_SIZE: total size in bytes of the fragmented data;
  • FRAG_NUM: sequence number of fragments within a data item, which is set to 1 for the first fragment of each item published

and is incremented for each subsequent fragment for the same item. A single MRN data item publication is uniquely identified by the combination of RIC name, MRN_SRC, and GUID.

Fragmentation

For a given RIC/MRN_SRC/GUID combination, when a data item requires only a single message, then TOT_SIZE will equal the number of bytes length in the FRAGMENT and FRAG_NUM will be 1.

When multiple messages are required, then the data item can be deemed as fully received once the sum of the number of bytes length of each FRAGMENT equals TOT_SIZE. The consumer will also observe that all FRAG_NUM range from 1 to the number of the fragment, with no intermediate integers skipped. In other words, a data item transmitted over three messages will contain FRAG_NUM values of 1, 2, and 3.

The WebSocket application also needs to convert the FRAGMENT field data from Base64 string to bytes data before further process this field. Python application can use the base64 module to decode Base64 string to bytes data.

Compression

The FRAGMENT field is compressed with gzip compression, thus requiring the consumer to decompress to reveal the JSON plain-text data in that FID.

When an MRN data item is sent in multiple messages, all the messages must be received and their FRAGMENTs concatenated before being decompressed. In other words, the FRAGMENTs should not be decompressed independently of each other.

The decompressed output is encoded in UTF-8 and formatted as JSON. Python application can uses zlib module to decompress JSON string.

How to process MRN data

Once the application has established a connection and requested MRN RIC data from Refinitiv Real-Time Distribution System, the application can process incoming MRN data message with the following flow:

Please see more detail in Code Walkthrough section below.

 

Code Walkthrough

Requesting data

The application can subscribe MRN data with the NewsTextAnalytics domain and the following MRN-specific RIC name:

  • MRN_STORY: Real-time News
  • MRN_TRNA: News Analytics: Company and C&E assets
  • MRN_TRNA_DOC: News Analytics: Macroeconomic News & events
  • MRN_TRSI: News Sentiment Indices
    	
            

# Global Default Variables

mrn_domain = 'NewsTextAnalytics'

mrn_item = 'MRN_STORY'

 

def send_mrn_request(ws):

    """ Create and send MRN request """

    mrn_req_json = {

        'ID': 2,

        "Domain": mrn_domain,

        'Key': {

            'Name': mrn_item

        }

    }

 

    ws.send(json.dumps(mrn_req_json))

    print("SENT:")

    print(json.dumps(mrn_req_json, sort_keys=True, indent=2, separators=(',', ':')))

Handle Refresh Message

The MRN Refresh response message does not contain any news or fragment information. It contains the relevant feed-related or other static Fields. The application just prints out each incoming field data in a console for informational purposes.

    	
            

def processRefresh(ws, message_json):

 

    print("RECEIVED: Refresh Message")

    decodeFieldList(message_json["Fields"])

 

def decodeFieldList(fieldList_dict):

    for key, value in fieldList_dict.items():

        print("Name = %s: Value = %s" % (key, value))

Example result of MRN Refresh response message is following:

    	
            

RECEIVED: Refresh Message

Name = PROD_PERM: Value = 10001

Name = ACTIV_DATE: Value = 2019-07-20

Name = RECORDTYPE: Value = 30

Name = RDN_EXCHD2: Value = MRN

Name = TIMACT_MS: Value = 37708132

Name = GUID: Value = None

Name = CONTEXT_ID: Value = 3752

Name = DDS_DSO_ID: Value = 4232

Name = SPS_SP_RIC: Value = .[SPSML2L1

Name = MRN_V_MAJ: Value = 2

Name = MRN_TYPE: Value = STORY

Name = MRN_V_MIN: Value = 10

Name = MRN_SRC: Value = HK1_PRD_A

Name = FRAG_NUM: Value = 1

Name = TOT_SIZE: Value = 0

Name = FRAGMENT: Value = None

Handle Update Messages

The Update response messages contain news information and fragment(s) data. Firstly, the application gets FRAGMENT, FRAG_NUM, GUID, and MRN_SRC fields values from the Update message. We use Python's base64 module to decode a FRAGMENT field value from ASCII string to bytes data.

    	
            

def processMRNUpdate(ws, message_json):  # process incoming News Update messages

 

    fields_data = message_json["Fields"]

    # declare variables

    tot_size = 0

    guid = None

 

    # Get data for all requried fields

    fragment = base64.b64decode(fields_data["FRAGMENT"])

    frag_num = int(fields_data["FRAG_NUM"])

    guid = fields_data["GUID"]

    mrn_src = fields_data["MRN_SRC"]

Process the first fragment/single fragment news

Next, we check if a FRAG_NUM = 1 which means it is the first fragment of this news message. The application then checks if a FRAGMENT bytes length = TOT_SIZE field.

  • If equal, this is a piece of single fragment news and this news message is completed
  • If not, this is the first fragment of multi-fragments news. We store this Update message's all fields data to the _news_envelopes list object and waiting for the next fragment(s).
    	
            

def processMRNUpdate(ws, message_json):  # process incoming News Update messages

    ...

    # Get data for all requried fields

    ...

    if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details

        pass

    else:  # FRAG_NUM = 1 The first fragment

            tot_size = int(fields_data["TOT_SIZE"])

            print("FRAGMENT length = %d" % len(fragment))

            # The fragment news is not completed, waiting and add this news data to envelop object.

            if tot_size != len(fragment):

                print("Add new fragments to news envelop for guid %s" % guid)

                _news_envelopes.append({  # the envelop object is a Python dictionary with GUID as a key and other fields are data

                    "guid": guid,

                    "data": {

                        "fragment": fragment,

                        "mrn_src": mrn_src,

                        "frag_num": frag_num,

                        "tot_size": tot_size

                    }

                })

                return None

Process multi-fragments news

When the application receives Update message with FRAG_NUM > 1, it means this message is multi-fragments news. We get the previous fragment(s) data from _news_envelopes list object via a GUID. Please note that Update messages with FRAG_NUM >1 will contain fewer fields as the metadata has been included in the first Update message (FRAG_NUM=1) for that news message.

The application also needs to check the validity of the received fragment by checking the MRN_SRC and FRAG_NUM order with previous fragments. If the received fragment is valid, the application assembles a received FRAGMENT bytes data with previous fragments and compares a total fragments byte length value with TOT_SIZE value.

  • If equal, it means this multi-fragments news message is completed
  • If not, we update current news Update message fields to the _news_envelopes list object and waiting for more fragment(s).
    	
            

def processMRNUpdate(ws, message_json):  # process incoming News Update messages

    ...

    # Get data for all requried fields

    ...

    if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details

        guid_index = next((index for (index, d) in enumerate(

                _news_envelopes) if d["guid"] == guid), None)

        envelop = _news_envelopes[guid_index]

        

        if envelop and envelop["data"]["mrn_src"] == mrn_src and frag_num == envelop["data"]["frag_num"] + 1:

            print("process multiple fragments for guid %s" % envelop["guid"])

            

            # Merge incoming data to existing news envelop and getting FRAGMENT and TOT_SIZE data to local variables

            fragment = envelop["data"]["fragment"] = envelop["data"]["fragment"] + fragment

            envelop["data"]["frag_num"] = frag_num

            tot_size = envelop["data"]["tot_size"]

            print("TOT_SIZE = %d" % tot_size)

            print("Current FRAGMENT length = %d" % len(fragment))

 

            # The multiple fragments news are not completed, waiting.

            if tot_size != len(fragment):

                return None

            # The multiple fragments news are completed, delete assoiclate GUID envelop

            elif tot_size == len(fragment):

                del _news_envelopes[guid_index]

    else:  # FRAG_NUM = 1 The first fragment

        ...

Handle a completed news FRAGMENT message

To unzip the content and get JSON string news message, we use Python's zlib module to unzip the gzip bytes data, then we parse it to JSON message.

    	
            

def processMRNUpdate(ws, message_json):  # process incoming News Update messages

    ...

    # Get data for all requried fields

    ...

    if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details

        ...

    else:  # FRAG_NUM = 1 The first fragment

        ...

 

    # News Fragment(s) completed, decompress and print data as JSON to console

    if tot_size == len(fragment):

        print("decompress News FRAGMENT(s) for GUID  %s" % guid)

        decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32)

        print("News = %s" % json.loads(decompressed_data))

Example Result

The example application supports console, Docker and classic Jupyter Notebook environments. Please check the README.md file in the source code project for more detail regarding how to run the application in each environment.

Note: The news message is in UTF-8 JSON string format. Some news messages that contain special Unicode characters may not be able to show in Windows OS console (cmd, git bash, powershell, etc) due to the OS limitation. Those messages will be print as UnicodeEncodeError exception. Cannot decode Unicode character message in a console instead.

    	
            

RECEIVED:

[

  {

    "DoNotCache":true,

    "DoNotConflate":true,

    "DoNotRipple":true,

    "Domain":"NewsTextAnalytics",

    "Fields":{

      "ACTIV_DATE":"2019-08-14",

      "FRAGMENT":"H4sIAAAAAAAC/41W727iRhD/3qcY+UN1lRJigzFg6dQScAg5Y6jtkN6VqlrwAtvYa253HYKqvkufpU/WWa+T0JPaXqQws7Mzs/Pnt7P+3SK5mmaWb/E4iex2t3NHrAuLVBmjfEOl5f9sRQs/TIJojPKGtX65sNZldkKzm1LAmkoFgsoqVxKOe8rhIBhXjO9A7ZkEwnlZobeCcnUBh5wSSWGTs80jlBxyxh/RRV4e/RXfK3Xwr64O2bYlaKWokK1NWVztVZFzepSvTIvIw/fsvduxu4NNZ721aafXdZ1vq/eV4L7WKHL/zIPftp2B3Xdc/y3PFV/xOEogqoo1FeBDLV7xW8ozNMPQhjnac6LYE4VECaIgxUwX4WjFHReG1a7Cpfa84n/9af5vsVBBnMwjmEbLIElnQZTCzX00TiCczqZpMD7XGYZpEEfDdLoMIEnjYRpMpkECaXyfpOackO5IDgFWU51gmmEF2ZZR4UPb6fRt+84Lw/lo2Bndj2+8szC+DO8/cqI7RiWkQisf8g28W1lqT2FUFgfCTyvruxUf7QnfUVAljJmgG4U9n/JtKQp0U3Jz4pQD2WxKkRHsNByZ2kPIZA2CuMopDFpey3Ev4Mz3KzAkSrG4MwETQfakWPF5nonylKGfPEdwADkcSoQUzYAgnoCX/JI+001Vp5G9BFVu4Xo2h0lerrFoSUHynAoMvz5OZ6nzq0Oj2y2awFaUBTgwR2sNAV2r1lsRNba3lcCIBbC3fN8gjP79pu00p5zALRFP9GQsCc80vNd0T/IthnbehIRuBFVEMB0mFU9M1yBkBcMMV/xl96Qr/vWdW/EUozjsS041lm3o9Z0+5tTumhhDUgkC6b4siNT9esJrq69k42ERw4xwstMVuyO8kmdHG+VSyH85w+t4uKPv+lmdAJcHUT6xDNu2PgHeNdN+fT1BmrR1z7QsLHmmC6NKnArB86ZGXEvbaDfYfnRk3GjtG8YRZrp6I7SrsJPDSu1LoS8J1oygoMbJQrBC1/EMrSirQ9ItrX3dc111+IBIzcqiBSkVhay7t0HfTNtInG45MQOtrI0qbL9WyRDigq2r2nOdyhc1KMhJR5+fWvC/cCq5wshXXHD5Qy7pTg8ueFcQlqvSPxfilQR09sQkQ/Xj8djCXaPdDNAz2RVqN6DGCb5lQqqRoIgePfU14i/t/qXjpnbf73b8rt0aOPYn1NxTkuFspqgVBxO4/CcSm3mI4teRMKabnIg6JzRn2v3rrP3VTN9296fPd/chG39y6cfQO8nlLpuxeed5xx7j43IcTj5MqaOtuVR6kMj5Vj9Bcew7tu3pF2jqB/fxVTCM61coR5hUiFk8i+pTCyolLtPTAUVtXLOiWViKPqur58tDThi/3LJnzP/CavApcD9K9NumZdU6UURV+PhZGIXyK0nWOcUdWa1/w1zrV/Had9pdlCF1e4Z6A6dm2l5DezbSid+pf3t3NRni78x3fhwb+hDVtNM25LYmuvwL320Puv2B53l6L/Zvh0naCuudjus6rtdz3V6vXndt/ed5g/7bstdtu9ow8J36172uiZcasqxJYIJY2oZ+6jW031ATXPu6DuDudtLSgqjtL+IgSWLDX8+TxDHsLJpMk6RRWYSz+fJFZRiPG51kOhnNX0yHo9RwN9MosfuGx4czPeffNAw3QqHhHoJ7w0yuDUVozBfNOfFy1BhEwUNzIAK58TFbRB8NG5rvGUUeaUI/V/qzx/KdC6sSO+TxG6dzYT0h7hHWX3Fv/vjmb4qautlTCQAA",

      "FRAG_NUM":1,

      "GUID":"RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1",

      "MRN_SRC":"HK1_PRD_A",

      "MRN_TYPE":"STORY",

      "MRN_V_MAJ":"2",

      "MRN_V_MIN":"10",

      "TIMACT_MS":35640745,

      "TOT_SIZE":1335

    },

    "ID":2,

    "Key":{

      "Name":"MRN_STORY",

      "Service":"ELEKTRON_DD"

    },

    "PermData":"AwhCEBVM",

    "SeqNumber":13678,

    "Type":"Update",

    "UpdateType":"Unspecified"

  }

]

FRAGMENT length = 1335

decompress News FRAGMENT(s) for GUID  RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1

News = {'altId': 'nRSN0253Ja', 'audiences': ['NP:LSEND', 'NP:LSEN'], 'body': 'For best results when printing this announcement, please click on link below:\nhttp://pdf.reuters.com/htmlnews/htmlnews.asp?i=43059c3bf0e37541&u=urn:newsml:reuters.com:20190814:nRSN0253Ja\n\nRNS Number : 0253J\nHenderson Alternative Strat Tst PLC\n14 August 2019\n\xa0\n\xa0\nHENDERSON INVESTMENT FUNDS LIMITED\nHENDERSON ALTERNATIVE STRATEGIES TRUST PLC\nLegal Entity Identifier: 213800J6LLOCA3CUDF69\n\xa0\n\xa0\n14 August 2019\nHenderson Alternative Strategies Trust plc ("the Company")\nChange to Director Information\n\xa0\nIn accordance with Listing Rule 9.6.14, the Company announces that Mr Graham\nOldroyd will be appointed as a non-executive director of BMO Global Smaller\nCompanies plc with effect from 1 October 2019.\n\xa0\n\xa0\nFor further information, please call:\n\xa0\nHelena Harvey\nFor and on behalf of\nHenderson Secretarial Services Limited\nSecretary to Henderson Alternative Strategies Trust plc\nTelephone: 020 7818 2025\n\xa0\nLaura Thomas\nInvestment Trust PR Manager\nJanus Henderson Investors\nTelephone: 020 7818 2636\nThis information is provided by RNS, the news service of the London Stock Exchange. RNS is approved by the Financial Conduct Authority to act as a Primary Information Provider in the United Kingdom. Terms and conditions relating to the use and distribution of this information may apply. For further information, please contact\nrns@lseg.com (mailto:rns@lseg.com)\n or visit\nwww.rns.com (http://www.rns.com/)\n.\n\xa0\n', 'firstCreated': '2019-08-14T08:53:50.910Z', 'headline': 'REG - Henderson Alt Strat  - Director Declaration', 'id': 'RSN0253Ja_19081425XqJULiDZ4eYL6ysVgdMiO3xgikRwVDLGKIe1', 'instancesOf': ['RR:1006', 'NI:EUR/EARN'], 'language': 'en', 'messageType': 2, 'mimeType': 'text/x-plain-fixed', 'provider': 'NS:LSE', 'pubStatus': 'stat:usable', 'subjects': ['B:125', 'B:147', 'B:1691', 'B:261', 'B:270', 'G:3', 'G:7J', 'G:A', 'M:1QD', 'M:1WN', 'M:32', 'M:3H', 'M:Z', 'P:4295896662', 'R:HAST.L', 'P:34414674477', 'P:5000006698', 'P:5000075242', 'E:1', 'E:4B', 'E:6T', 'E:6V', 'E:E', 'M:1V0', 'M:1Z7', 'M:1Z8', 'M:1ZN', 'M:2B', 'R:JHG.N', 'N2:PRESSR', 'N2:BOSS1', 'N2:MNGISS', 'N2:PPLMOV', 'N2:BOARD1', 'N2:SIGCOR', 'N2:BACT', 'N2:FINS08', 'N2:INVT08', 'N2:INVT', 'N2:FINS', 'N2:CINV', 'N2:WEU', 'N2:GB', 'N2:EUROP', 'N2:SRVCS', 'N2:NEWR', 'N2:REG', 'N2:CMPNY', 'N2:LEN'], 'takeSequence': 1, 'urgency': 3, 'versionCreated': '2019-08-14T08:53:50.910Z'}

Popular Topics