A Treatise on Building Comprehensive and Scalable Real Time Platform

Several months back I got a request to reply to certain questions from an organization trying to understand the principles behind building a scalable real time platform. While responding I found some of the question and answers can be suitable for general sharing.

Here are they:

Key Trends and Approaches What are other emerging  trends    and approaches  in    processing and analyzing high-volume, low-latency data?

Real Time Streaming Data is processed in two fundamental ways.
1.    Micro Batching: In this method the real time data is first batched up into small chunks (for example chunks containing data points received every minute) and subsequently processed. This technique is mainly used for high performance integration with external, slower batch processing systems such as those based on Apache Hadoop. Open Source technologies like Apache Spark Streaming and Apache Storm extension called Storm-Trident may be used to implement such Platforms
2.    Tuple Processing: In Tuple Processing each Data Point is processed individually and as soon as they arrive. This methodology typically requires distributed in memory computing engines which can perform relevant computations on the incoming data points in a stateful or stateless manner. The output may be as varied from as simple KPIs calculated on the incoming data (such as aggregations), to temporal/stateful correlation information among the data points to recognize patterns and trigger predictions. Apache Storm based platforms are seen to be leading the way in developing such solutions
Real Time Stream Processing generally requires defining a ‘Topology’ which is basically a Data Flow Graph (DFG) that is executed continuously (unlike Batch Processing) to work on the inputs data stream(s).
Key applications of Real Time Stream processing can be seen emerging in the following areas:
1.    Stream ETL: Streaming ETL enables real time Extract-Transform-Load capabilities on incoming data. Data can be pipelined through various stages of cleansing, structuring, filtration and transformation before being sent to the target applications and storage systems (archives, databases etc.). In Memory Partitioning and Batching is one of the common ways to achieve such use cases. 
2.    Real Time Pattern Recognition or Pattern Matching: The key objective of Real Time Pattern Matching is to execute a distributed topology of operations on the Stream Processing Platform to identify complex temporal correlations among the events/data points.

   A general purpose Stream Processing Platform (such as Apache Spark Streaming or Apache Storm) is not adequate to perform stateful computations especially those related to analysing temporal relationships that are occurring in the streaming data. They also do not give any DSL (Domain Specific Language) which can be used to define, query and analyse the streaming data as complex events. A Complex Event Processing (CEP) Engine in general addresses these issues and exposes capabilities like CQL/PQL (Continuous Query Language/Pattern Query Language) which are SQL like languages and can be used identify complex events.
The schematic diagram below explains how simple Data Streams can be exploited to identify Complex Patterns which may be helpful to take better decisions based on Real Time Contextual Inisghts.

3.   Real Time Decision Making: Real Time Decision Making allows contextual insights generated from a Stream Processing/Pattern Matching engine to be correlated with historical data and decision rules to take automated decisions. Such systems are key to implement Real Time Decision Support Systems (RTDSS) which can find wide application in care, optimization, security and other enterprise use cases.

Pros and Cons of Varying Approaches What are their respective pros and cons, and what are the key considerations in selecting an appropriate approach to process streaming data?
As we observed Real Time Stream Processing is categorized into Micro Batching and Tuple Processing techniques. A summary of pros and cons of each approach is presented below
Micro Batching
(Apache Spark Streaming)
1.     High Throughput
2.     Exactly Once Processing
1.     Latency may be high (data is not ready until batching is complete)
2.     Scalability Issues
3.     Data loss issues
4.     Not many industry adoption
Tuple Processing
(Apache Storm)
1.     Low Latency
2.     Highly Scalable
3.     No Data Loss (replays possible)
4.     Many cases of industrial adoption
1.     At Least Once Processing
2.     Mainly for Stateless Operations
From a Stream Computing perspective for High Throughput (ability to ingest large number of data points in a shorter time frame) requirements Micro Batching is an appropriate choice. Since Data Ingestion and Processing (such as Online Analytics and Machine Learning or ETL) are performed at different stages, it is possible to get higher throughput by such methods.
If however the key requirement is to have low latency processing of the Data Points (such Real Time Decisions based on event query, pattern matching or complex event analysis), Tuple based approach should be followed. Since Tuple Processing approaches require ingestion and computation at the same time (often in a stateful manner with temporal correlations), the resource requirements on each of the workers is little higher, in general affecting the cluster throughput.

Management of Imperfections What are some of the typical techniques used in managing stream imperfections, including delayed, missing and out-of- order data?     What      are  the  pros       and  cons      of these techniques?
Traditional data processing platforms depend on data being already available and handle data imperfections in elegant way. In Real Time systems since data is usually never stored the Stream Processing Platform must have special mechanisms in place to handle such cases. A brief discussion is presented below:
Delayed Data: In case a Processor waits on a Stream which does not have data points arriving at regular rate, there are risks that the engine will continue to wait indefinitely for the data to arrive, there by blocking resources and resulting sub optimal usage of the infrastructure. Such cases are more prominent in Micro Batching platforms because the batch creation may be completed at a slower rate when events arrive slowly. The usual method is to have time bound processing instead of size bound processing. In time bound processing the engine stops waiting for the data and proceeds towards processing the already arrived or accumulated data within a maximum predictable time interval.
Missing Data: It is quite possible in a Streaming Environment to lose data points as they arrive. The reason could be anything between unreliable network between the workers (which form the data processing pipeline) to application errors. Cases of Lost or Missing data are generally handled by acknowledgement methods between the origin and consumer of the data points. Each data point once received by the processing unit can be acknowledged explicitly and in case an acknowledgment is not received by the origin of the data point (within a specified interval), it is ‘replayed’.
This method is widely used in handling missing data in Tuple Processing platforms such as Apache Storm. One key disadvantage of Replaying is a data point may be emitted to the processor multiple times during a race condition (acknowledgement is received just after the wait period is over). Such cases are handled by building additional stateful capabilities on top the Tuple Processing platform (eg. Storm extensions such as Storm-Trident or even a full-fledged Complex Event Processor).
Out-Of-Order Data: In a distributed environment data points may not arrive in the order they are generated (eg. data points with an associated time stamp). This requires additional intelligence on the part of the Stream Processing Platform to ensure that data points received could be ordered before processing. The usual method is using overlapping windowed batches which can remain open beyond their closing interval. To give an example, if a window is open to filter events generated within a minute interval, it might be kept open for little more than a minute (allowing a ‘slack’). The expectation is data points which might as well be generated in the same interval but arrive later than others, can be acquired during this slack period.
Such techniques can be elegantly implemented using Micro Batching platforms where the acquired events can be re-ordered before processing. Since Tuple Processing technologies are essentially for order-less stateless data, additional stateful and windowing capabilities shall need to be added. Extensions such as Apache Storm-Trident or a CEP can address these issues.
In the following table we try to examine the characteristics of each Stream Computing approach in connection to these problems. This summary may help to understand the pros and cons of each approach.

Micro Batching
Tuple Processing
Delayed Data
Supports both time based and size based batches. To address this issue a combination of both is applied. The platform waits to data points to arrive till a size limit of the micro batch is reached or a specified time window expires – whichever is first. This way the platform continues to process on accumulated data in an optimal manner, rather than waiting indefinitely for a batch to complete.
By design Tuple Processing Platforms process each data point individually as they arrive. In case the data points/tuples don’t arrive, the engine will just sleep. The method does not delay the processing of already arrived data.
Missing Data
In general is not capable of handling missing or lost data.
Can handle lost data using method of acknowledgement and replays. However more-than-once processing of a data point may be an undesired side effect.
Out-Of-Order Data
Data Points can be batched and re-ordered before processing; ‘slack’ principles can be applied in designing the system so that all relevant data points are acquired efficiently.
In general can be used only for order-less data processing. For processing of data in-order, additional extensions with windowing and stateful processing capabilities can be designed.

Fusion of Stored and Streaming data What are some of the typical techniques used in the fusion of stored and streaming data? What are the pros and cons?
To develop a holistic approach towards data processing both Real Time and Historical (or stored) data need to be considered in the same Architecture. In modern data processing terminology this is often referred to as the Lambda Architecture, which proposes that complete insight is a function of the ‘whole data’.
The Lambda Architecture proposes three layers in the data processing architecture. These are:
1.      Batch Layer: For Storage and Batch Computing. This layer focuses on processing the stored data and generates ‘Batch Views’ which give insights to the historical data (such as summary data, machine learning predictions/classifications etc.). The layer is augmented real time with streaming data as it is periodically re-computes the batch views (often from scratch). Even though the Master Data Set in the Batch Layer is enhanced with the incremental data from the real time data sources, it does not happen instantaneously but eventually. Typically the Batch Layer would be implemented using technologies like Apache Hadoop (eg. HDFS and Map Reduce) and Apache Spark.
2.    Speed Layer: For Stream/Real Time Computing. This layer focuses on processing the Streaming Data Real Time to generate ‘Real Time’ views (such as Real Time KPIs, Complex Event Analysis Results etc.). These views are usually an estimate of the current situation over a period of time (window) and get corrected or modified as the situation changes. Because of no ‘from-scratch’ processing involved, the views can be almost instantaneously updated. Platforms based on Apache Storm or Apache Spark (Streaming) can be used in this layer.
3.      Serving Layer: Exposing Query-able Views to the Applications. This layer is the interface between the data platform and the applications and help to store the views generated by the Batch Layer and the Speed Layer. It is usually composed of several Database options like traditional RDBMS, MPP DBs, Enterprise Data Warehouse Systems, Columnar DBs, Key-Value stores and so on. 
In the following diagram we try to provide a categorization of key techniques and technologies vis a vis the Lambda Architecture Principles.
A general comparison highlighting the key pros and cons of each components is shown in the following table.
Batch Layer (Stored Data)

Common Use Case
1.     Chiefly out of core I/O  and storage intensive processing
2.     ETL and all kinds of Offline Batch Process involving One-Pass processing
1.     Highly Fault Tolerant, and Scalable
2.     Several language options for programming
3.     Programming model allows fine grained control if required
4.     Mature ecosystem with multiple tools, security, and management options
1.     Workloads in general have to be modelled as Map and Reduce tasks
2.     Very slow for multi-pass workloads (such as Iterative Algorithms, Machine Learning, Graph Processing etc.)
1.     Chiefly in-core (memory based) processing
2.     Suitable for computation intensive workloads where data-in-process not larger than total cluster memory
3.     Machine Learning, Graph Processing, Near Real Time Interactive Analysis Workloads
1.     Low Latency, High throughput processing
2.     Supports several operators for transformation (unlike just Map and Reduce in Hadoop)
3.     Programming model allows more focus on business logic
1.     Scalability is generally limited to the cluster total available memory
2.     Less mature

Speed Layer (Streaming Data)

Common Use Case
Spark Streaming
1.     Suitable for Stream Processing workloads which can be handled by Micro Batching
1.     Being part of the Spark ecosystem, integrates well with Spark and Hadoop deployments (such as cluster sharing through common resource manager)
2.     High Throughput
1.     Can support only exactly-once event processing (does not guarantee processing of all events)
2.     Higher Latency
3.     Less mature and lower industry adoption
1.     Suitable for Tuple Processing Workloads
1.     Low Latency
2.     Several language options for programming
3.     Can support at-least-once, at-most-once and exactly-once paradigms for event processing
4.     More mature and better industry adoption
1.     Not well integrated with Hadoop ecosystem, (such as cluster sharing through common resource manager)
2.     Generally Lower Throughput

As we have seen the View Layer is predominantly composed on Database and Quasi-Database kind of products. The appropriate choice depends on the scalability, Performance user interface and CAP (Consistency-Availability-Partition Tolerance) requirements. A brief comparison is shown below:

View Layer (Application Interface)

Scalability and Performance
Application Interface
Memory DBs
Upto ~100 GB
Latency: <0.1 ms
Usually  Consistency, Partition Tolerance
Upto ~1 TB
Latency: 1 ms
Consistency, Availability
Upto ~10 TB
Latency: 10 ms
Availability, Partition Tolerance
Upto ~1 PB
Latency: Low 10s ms
Consistency, Availability
Upto ~10 PB
Latency: Low 10s ms
Consistency, Partition Tolerance
Upto ~100 PB
Latency: Seconds to Minutes
Quasi SQL (Hive QL)
Availability, Partition Tolerance

High Availability What are some of the typical approaches and solutions used in real-time stream analysis to address high-availability requirements? What are the pros and cons of these approaches?
Real Time Stream Processing Platforms follow the following typical High Availability mechanisms. Typically a Distributed Stream Processing platform has one instance of ‘Master’ or ‘Coordinator’ and multiple instances of the ‘Worker’. The Master helps to manage the Stream Computing cluster such as resource allocation, task allocation and monitoring while the actual computation is performed by the Workers.

1 + 1
Active Standby
This High Availability mechanism is followed for the Master or Coordinator Nodes of the Stream Platform. There is one Active Instance and one Standby Instance. The Standby is ‘Cold’ in nature meaning it is started only in case of a failover.
1.     Simple Solution
1.     Needs additional HA Manager/Watchdog software (such as Veritas Cluster Server)
2.     Failover is slow (needs restarting of the Master components from scratch)
3.     Service may be lost during failover
1 + 1
Active Standby
This High Availability mechanism is also followed for the Master or Coordinator Nodes of the Stream Platform.  There is one Active Instance and one Standby Instance. The Standby is ‘Hot’ in nature meaning it is already started but assumes a passive role before failover.
1.     Fast Failovers
2.     Service is not lost during failover (one Master instance is always available)
3.     Lower Cost (no additional watchdog requirement)
1.     Relatively complex to implement
N- Active
(N > 1)
This High Availability mechanism is followed for the Worker Nodes of the Stream Platform. There is no real Standby Node; instead all nodes participate in data processing simultaneously in a scale-out parallel manner. In case a particular Worker fails (shutdown/hardware failure etc.), the workload is transferred to the other running instances.
1.     Fast Failovers
2.     Service is not lost during failover (one Worker instance is always available)
3.     Lower Cost (no additional watchdog requirement, no additional hardware requirement)
4.     Optimal Resource Usage
1.     Relatively complex to implement
2.     Lack of a dedicated Standby Instance increases load on the failover instances with which the workload gets shared in case of failure

Scalability What are some of the typical approaches and solutions used in real-time stream analysis to address high-scalability requirements? What are the pros and cons of these approaches?
The High Scalability requirements of Real Time Stream Processing Platforms can be met using Scale-Out architecture. In this solution, the Stream Processing Platform is deployed in a cluster of machines and additional processing power can be added by adding more machines. Scale-Out Architectures are of three types and their characteristics and key relative advantages/disadvantages are mentioned below.

Shared Storage

In this architecture the Storage is shared across the Processing nodes while each node has its own computing and memory resources. Typically the Storage is served from a SAN/NAS to all the nodes. If costs permit such architectures are suitable for storage intensive parallel computing (eg. MPP DB).
1.     High throughput, Highly Available, Reliable and Durable Storage
1.     Stream Platforms being not storage intensive, such scale-out architecture is not suitable
2.     High Cost of SAN/NAS, need specialized hardware
3.     Scalability may be limited by the amount of data stored in the cluster
Shared Memory

In this architecture the Memory Modules are also shared across the Processing nodes using a high speed interconnect (eg. Dedicated Fiber Channel, Infiniband Fabric etc.) while each node has its own computing resources. The Shared Memory is used to store application specific data, caching and so on. If costs permit such architectures are suitable for memory intensive parallel computing (eg. Grid Computing)
1.     High throughput, Highly Available, Reliable and Durable Storage for both Disk and Memory
2.     Shared memory architectures support very fast failovers  without data loss in case of node failures
1.     High Cost of network interconnect
2.     Software implementation is complex because making use of non-local memory is tricky and needs separate distributed and highly available memory manager modules for coherence, synchronization etc.

Shared Nothing

In this architecture each node has its own separate Computing, Memory and Storage resources. Individual nodes are connected using Standard Network Interconnect (eg. Gigabit Ethernet). Generally suitable for Stream Platforms and all Data Parallel/Task Parallel solutions
1.     Highly Scalable (almost linear scalability)
2.     Low Cost, additional processing power can be added by adding more nodes (commodity hardware). No need for specialized hardware
1.     Software implementation should ensure all processing is generally completed locally with optimal network communication between nodes, otherwise scalability is compromised
2.     Failovers will cause data loss unless mechanisms such as ‘replays’ or ‘check-pointing’ are implemented

In the above i have tried to include few question and answers that elucidate the best practices involved in building a comprehensive and real time data processing platform. These are not new information, but together and in a comparative mode for various technologies can help to make more informed judgement when building such solutions. Hope this was helpful.


Popular Posts