Can Spark Streaming survive Chaos Monkey?


Netflix is a data-driven organization that places emphasis on the quality of data collected and processed. In our previous blog post, we highlighted our use cases for real-time stream processing in the context of online recommendations and data monitoring. With Spark Streaming as our choice of stream processor, we set out to evaluate and share the resiliency story for Spark Streaming in the AWS cloud environment.  A Chaos Monkey based approach, which randomly terminated instances or processes, was employed to simulate failures.

Spark on Amazon Web Services (AWS) is relevant to us as Netflix delivers its service primarily out of the AWS cloud. Stream processing systems need to be operational 24/7 and be tolerant to failures. Instances on AWS are ephemeral, which makes it imperative to ensure Spark’s resiliency.

Spark Components


Apache Spark is a fast and general-purpose cluster computing system. Spark can be deployed on top of Mesos, Yarn or Spark's own cluster manager, which allocates worker node resources to an application. Spark Driver connects to the cluster manager and is responsible for converting an application to a directed graph (DAG) of individual tasks that get executed within an executor process on the worker nodes.

Creating Chaos


Netflix streaming devices periodically send events that capture member activities, which plays a significant role in personalization. These events flow to our server side applications and are routed to Kafka. Our Spark streaming application consumes these events from Kafka and computes metrics. The deployment architecture is shown below:


Fig 2: Deployment Architecture

Our goal is to validate that there is no interruption in computing metrics when the different Spark components fail. To simulate such failures, we employed a whack-a-mole approach and killed the various Spark components.

We ran our spark streaming application on Spark Standalone. The resiliency exercise was run with Spark v1.2.0, Kafka v0.8.0 and Zookeeper v3.4.5.

Spark Streaming Resiliency


Driver Resiliency: Spark Standalone supports two modes for launching the driver application. In client mode, the driver is launched in the same process as the one where the client submits the application.  When this process dies, the application is aborted.  In cluster mode, the driver is launched from one of the worker process in the cluster.  Additionally, standalone cluster mode supports a supervise option that allows restarting the application automatically on non-zero exit codes.

Master Resiliency:  Spark scheduler uses the Master to make scheduling decisions.  To avoid single point of failure, it is best to setup a multi master standalone cluster. Spark uses Zookeeper for leader election. One of the master nodes becomes the ACTIVE node and all Worker nodes get registered to it. When this master node dies, one of the STANDBY master nodes becomes the ACTIVE node and all the Worker nodes get automatically registered to it. If there are any applications running on the cluster during the master failover, they still continue to run without a glitch.

Worker Process Resiliency: Worker process launches and monitors the Executor and Driver as child processes. When the Worker process is killed, all its child processes are also killed.  The Worker process gets automatically relaunched, which in turn restarts the Driver and/or the Executor process.

Executor Resiliency: When the Executor process is killed, they are automatically relaunched by the Worker process and any tasks that were in flight are rescheduled.

Receiver Resiliency: Receiver runs as a long running task within an Executor and follows the same resiliency characteristics of an executor.

The effect on the computed metrics due to the termination of various Spark components is shown below.


Fig 3: Behavior on Receive/Driver/Master failure

Driver Failure: The main impact is back-pressure built up due to a node failure, which results in a sudden drop in message processing rate, followed by a catch up spike, before the graph settles into steady state.

Receiver Failure: The dip in computed metrics was due to the fact that default Kafka receiver is an unreliable receiver.  Spark streaming 1.2 introduced an experimental feature called write ahead logs that would make the kafka receiver reliable.  When this is enabled, applications would incur a hit to Kafka receiver throughput.  However, this could be addressed by increasing the number of receivers.

Summary


The following table summarizes the resiliency characteristics of different Spark components:

Component
Type
Behaviour on Component Failure
Resilient
Driver
Process
Client Mode: The entire application is killed
Cluster Mode with supervise: The Driver is restarted on a different Worker node
Master
Process
Single Master: The entire application is killed
Multi Master: A STANDBY master is elected ACTIVE
Worker Process
Process
All child processes (executor or driver) are also terminated and a new worker process is launched
Executor
Process
A new executor is launched by the Worker process
Receiver
Thread(s)
Same as Executor as they are long running tasks inside the Executor
Worker Node
Node
Worker, Executor and Driver processes run on Worker nodes and the behavior is same as killing them individually

We uncovered a few issues (SPARK-5967, SPARK-3495, SPARK-3496, etc.) during this exercise, but Spark Streaming team was helpful in fixing them in a timely fashion. We are also in the midst of performance testing Spark and will follow up with a blog post.

Overall, we are happy with the resiliency of spark standalone for our use cases and excited to take it to the next level where we are working towards building a unified Lambda Architecture that involves a combination of batch and real-time streaming processing.  We are in early stages of this effort, so if you interested in contributing in this area, please reach out to us


Comparison of Apache Stream Processing Frameworks: Part 2

Posted by Petr Zapletal on Wed, Mar 2, 2016

In the previous post we went through the necessary theory and also introduced popular streaming framework from Apache landscape - Storm, Trident, Spark Streaming, Samza and Flink. Today, we’re going to dig a little bit deeper and go through topics like fault tolerance, state management or performance. In addition, we’re going to discuss guidelines when building distributed streaming application and also I’ll give you recommendations for particular frameworks.

Fault Tolerance

Fault tolerance in streaming systems is inherently harder that in batch. When facing an error in batch processing system, we can just restart failed part of the computation and we’re good. But this is much harder in streaming scenarios, because data are still incoming and also a lot of jobs can run 24/7. Another challenge we have to face is state consistency, because in the end of the day we have start replaying events and of course not all state operations are idempotent. As you’ll see, fault tolerance can be pretty hard so let’s have a look at how our systems deal with that.

Storm uses a mechanism of upstream backup and record acknowledgements to guarantee that messages are re-processed after a failure. Acknowledgements work as follows: an operator sends back to the previous operator an acknowledgement for every record that has been processed. The source of the topology keeps a backup of all the records it generates. Once received acknowledgements from all generated records until the sinks, the backup can be discarded safely. At failure, if not all acknowledgements have been received, then the records are replayed by the source. This guarantees no data loss, but does result in duplicate records passing through the system. That’s at-least once delivery (for more information about delivery guarantees you may check out the previous part).

Storm implements this with a clever mechanism that only requires few bytes storage per source record to track the acknowledgements
Pure record acknowledgement architectures, regardless of their performance, fail in offering exactly once guarantees, thus burdening the application developer with deduplication. Also Storm’s mechanism is low throughput and has problems with flow control, as the acknowledgment mechanism often falsely classifies failures under back-pressure.

Screen_Shot_2016-02-24_at_16.44.08.png
Spark Streaming and its micro-batching semantics follows a different approach. The idea is terribly simple. Spark processes micro-batches on various worker nodes. Each micro-batch may either succeed or fail. At a failure, the micro-batch can be simply recomputed, as they are persistent and immutable. So exactly once delivery made easy.

Screen_Shot_2015-12-29_at_23.24.16.png
Samza’s approach is completely different. It takes an advantage of durable, offset based messaging system. It’s usually Kafka of course. Samza monitors offsets of its tasks and moves it when message is processed. Offset can be check-pointed in a persistent storage and restored in case of failure. The problem is when it restores offset from the last checkpoint it doesn’t know which upcoming messages were processed and it might do it twice. That’s at least once delivery for us.

Screen_Shot_2015-12-29_at_23.26.02.png

Flink approach is based on distributed snapshots which keeps the state of streaming job. Link sends checkpoint barriers, basically some kind of markers, through the stream and when barrier reaches the operator, operator checkpoints corresponding part of the stream. So if compare it to Storm, it’s far more efficient as it doesn’t have to acknowledge every record but does it in small batches. But don’t be confused, it’s still native streaming, conceptually it is very different from Spark. And also Flink provides exactly once delivery.

Screen_Shot_2015-12-29_at_23.27.36.png

Managing State

Most of the non-trivial streaming applications have some kind of state. On the contrary of stateless operations where we have just an input, processing and an output, we have an input and a state, then processing and then an output with a modified state. We have to manage our state, persist it and in case of failure we expect our state to be recreated. The recreation of the state may be problem a little bit, as we do not have always exactly once guarantee, some of the record may be replayed multiple times. And that is not what we want usually.

As we know Storm provides at-least once delivery guarantees. So how we can do the exactly once semantics provided by Trident ? Conceptually it’s quite simple, you just start committing the records, but obviously it’s not very efficient, so you start doing it in small batches, do some optimizations and here we are. Trident defines a couple of abstractions which determines when you can achieve exactly once guarantee, and as you can see in the picture below, there are some limitations and it needs some time to dig into it.

Screen_Shot_2015-12-29_at_23.29.35.png
When thinking about stateful operations in stream processing, we usually have a long running operator with a state and a stream of records passing through it. As we know, Spark Streaming is micro-batching system, and it addresses it differently. Basically, Spark Streaming manages a state as another micro-batched stream. So during the processing of each micro-batch spark takes a current state and a function representing the operation and the result is a processed micro-batch and an updated state.

Screen_Shot_2015-12-29_at_23.30.54.pngSamza’s solution for everything is just push it out to Kafka and problem solved, and it also works in the context of state management. Samza has real stateful operators so any task can hold a state and the state’s change log is pushed to Kafka. If needed state can be easily recreated from Kafka’s topic. To make it a little bit faster Samza allows us to plug-in key-value stores as a local storage so it doesn’t have to go to the Kafka all the time. The concept is illustrated on the picture below. Unfortunately, Samza provides at-least one semantics only, and it hurts a lot, but the implementation of exactly once delivery is planned.
Screen_Shot_2015-12-29_at_23.32.21.pngFlink provides stageful operators conceptually similar to Samza. When working with Flink, we can use two different types of states. First one is local or task state, it is a current state of particular operator instance only and these guys don’t interact between each other. Then we have partitioned, or if you want key state, which maintains state of whole partitions. And of course, Flink provides exactly-once semantics. In the picture below you can see outline of Flink’s long running operator with 3 local states.

Screen_Shot_2015-12-29_at_23.33.42.png

Counting Words with State

So let’s have a look how to count words, focusing on state management. For naive wordcount implementation just check out the previous post.

Let’s start with Trident.

1 public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = ...

TridentTopology topology = new TridentTopology();

TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

...

}

We can create a state by calling persistent aggregate at line 9. Important argument is the Count, which is built in component for storing numbers. If we would want to process the data from it, we would have to create a stream for that. As you can see int the snippet, it is not very convenient.

Spark’s declarative approach is a little bit better.

1123456789
10
11
12
13
14
15
16
17
// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List.empty[(String, Int)])

val lines = ...
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

val trackStateFunc = (batchTime: Time, word: String, one: Option[Int],
state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
Some(output)
}

val stateDstream = wordDstream.trackStateByKey(
StateSpec.function(trackStateFunc).initialState(initialRDD))

Firstly we have to create RDD used as initial state (line 2), then we do some transformations (lines 5 and 6). Then, as you can see at lines 8 - 14, we have to define transition function, which takes a word, its count and a current state. The function does the computation, updates a state and returns result. And finally we can put all the bits together at lines 16 and 17 and get a state stream which contains word counts.


Let’s have a look at Samza.

1123456789
10
11
12
13
14
15
16
17
18
19
20
21
class WordCountTask extends StreamTask with InitableTask {

private var store: CountStore = _

def init(config: Config, context: TaskContext) {
this.store = context.getStore("wordcount-store")
.asInstanceOf[KeyValueStore[String, Integer]]
}

override def process(envelope: IncomingMessageEnvelope,
collector: MessageCollector, coordinator: TaskCoordinator) {

val words = envelope.getMessage.asInstanceOf[String].split(" ")

words.foreach { key =>
val count: Integer = Option(store.get(key)).getOrElse(0)
store.put(key, count + 1)
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"),
(key, count)))
}
}

Firstly we need to define our state at line 3, in this case its key-value store and also definite how it should be initialized (lines 5 - 8). And then, we can use it during the computation. As you can see above, it is pretty straightforward.

And finally, let’s have a look at Flink with its neat API.

1123456789
10
11
12
13
val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements(...)
val words = text.flatMap ( _.split(" ") )

words.keyBy(x => x).mapWithState {
(word, count: Option[Int]) =>
{
val newCount = count.getOrElse(0) + 1
val output = (word, newCount)
(output, Some(newCount))
}
}

We just call the function mapwithstate at line 6, which takes as an argument, function with two parameters. First one is a word to process and second is a state and function then returns processed output and new state.

Performance

Reasonable performance comparison is definitely a topic for whole article. So just an insight for now.

Various systems approach problems fairy differently and therefore it’s very hard to design not biased tests. When we talk about performance in streaming we talk about latency and throughput. It depends on many variables, but in general and for simple task. If you’re at 500k records per second per node it’s ok, if you can reach 1 million it’s nice, over a million it is great. Speaking about nodes, I mean pretty standard node like 24 cores and reasonable amount of memory, like 24 or 48 GBs.

For latency, in case of micro-batch, we are usually thinking in seconds. In case of native streaming, we can expect lower hundreds of millis for most of the systems, but tuned storm can in operate in tens of millis easily.

Also it is important to keep in mind the cost of delivery guarantees, fault tolerance and state management. For example turning on fault-tolerance may cost you like 10 to 15%. But in case storm it can be like 70% of throughput. As always, there is no free lunch. In this and previous post I’ve shown you stateful and stateless word count examples and of course, stateless would be faster. In case you’re wondering how much, so in the context of Apache Flink the difference was like 25% but in case of Spark it was around 50%. I’m pretty sure it could be tuned but it could give us an idea it’s something we should have in mind. Speaking about tuning, the systems have very rich tuning options which may lead to significant performance gains and you should always find some time to have a look at it.

Also it’s important to have in mind, all operations are distributed and sending data through the network is pretty expensive. So try to take an advantage of data locality and also try to tune up your application’s serialization.

Project Maturity

When picking up the framework for your application, you should always consider its maturity. So Let’s have a quick look how does it look like in our cases. Storm was the first mainstream streaming system and became de-facto industrial standard for a long time and is used in many companies like Twitter, Yahoo, Spotify and many more. Spark is the most trending Scala repository these days and one of the engines behind Scala’s popularity. Spark’s adoption grows every day, it is used by companies like Netflix, Cisco, DataStax, Intel, IBM and so on. Samza is used by LinkedIn and is also by tens of other companies, as an example we can have Netflix or UberFlink is still an emerging project, but we can see its first production deployments and I’m sure more will follow very soon.

You may also find interesting the number of project contributors. Storm & Trident have around 180 of them, whole Spark has more than 720. Samza has, according to github, more around 40 contributors and Flink has already more than 130.

Summary

Before we jump at framework recommendations, in may be helpful to check out the summary table below.

Screen_Shot_2015-12-28_at_16.44.49.png

Framework Recommendations

The answer for the typical question, which one should I use, is as always, it depends. So in general, always try to evaluate requirements of your application carefully and be sure you fully understand the consequences of choosing the particular framework. I’d also recommend you to pick a framework with high level API, as it’s more elegant and more importantly much more productive. Also keep in mind, the most of the streaming applications are stateful so the state management of a particular framework should be up on your evaluation list. I’d also recommend to go for a framework with exactly once delivery semantics as it makes things easier, but of course, this really depends on requirements. There are definitely use cases when at least once or at most once delivery guarantees are all you need. But also keep in mind, system supporting exactly once does not have to implicitly support weaker guarantees. Lastly, make sure your system is able to recover quickly, you can use Chaos Monkey or similar tool for testing, because as we discussed, fast recovery is crucial in stream processing.

Storm is still a great fit for small and fast tasks. If you care mainly about the latency, storm might be a good way to go. But also keep in mind the fault tolerance or trident’s state management hurts the performance a lot. Interesting option might be a potential update to Twitter’s Heron, which is designed as Storm’s replacement and should be better in every single task, but it also keeps the api. The problem is there is no guarantee Twitter is going to open-source it so who knows if it’s a good idea.

For Spark Streaming, you should definitely at least try it if Spark is already part of your infrastructure, because, in this case, Streaming comes basically for free and you can also take an advantage of various Spark’s libraries. Also if you really want to use Lambda architecture, it is a pretty decent choice. But you should always keep in mind micro-batching limitations and be sure latency is not critical for you.

When thinking about adopting Samza, Kafka should be a cornerstone of your architecture. I know it’s pluggable but nearly everyone is using Kafka so I would stick with that. Also as mentioned before, Samza is shipped with powerful local storage and it’s great for managing large states, it can handle states in tens of gigabytes easily, which is pretty nice. But keep in mind Samza’s at least once delivery limitation.

Flink is conceptually great streaming system which fits very most streaming use cases And it often provides progressive functionality, like advanced windowing or time handling, which may not be implemented by its competitors. So you should always consider Flink when you need a functionality which might be hard to implement in Spark, or generally, in any micro-batching system. And apart of that, Flink also has an API for a common batch processing which may be pretty useful. But you need to have enough courage to adopt emerging project and also don’t forget to check out its roadmap.

Dataflow and Open Source

And the last thing I want to mention is Dataflow and its open source initiative. Dataflow is a part of Google Cloud platform and Cloud Platform has all sort of things in it as huge data storage, BigQuery, Cloud PubSub, some tools for data analysis, and so on and also aforementioned Cloud Dataflow.

Dataflow is Google’s managed service for batch and stream data processing with unified API. It is built upon well known Google technologies such as  MapReduce for batch processing, FlumeJava for programming model definition and MillWheel for stream processing. And all of them are really good.

You may be asking why I’m wriming about that as I said we would be focused on about apache stream processing platforms and this is clearly Google’s proprietary solution, but Google decided to open source Dataflow SDK recently and guys behind both Spark and Flink have implemented its runners. So now we have an ability to run jobs defined by Dataflow API by Google Cloud Platform, by Flink or by Spark and it’s also pretty possible more engines will follow very soon.

Dataflow provides API in Java and in Python implemented by Google itself and also I’ve found two Scala DSLs implemented by community. Apart from that, Google and a number of partners submitted this as a new Apache proposal named Apache Beam. So it seems like a pretty interesting option, I would definitely at least think about it. But it is very important to emphasize, all of this is very recent and the implementation of particular features might be missing

dataflow_ASF.png

Conclusion

In this short blog post series, we went through popular streaming frameworks from Apache landscape and discussed their similarities, differences, the trade-offs they have made and also their fitting use cases. I hope it was interesting for you and I believe it will be helpful when designing your own streaming solution. There’s definitely a couple of interesting frameworks which were not discussed here, but I plan to address them in separate posts. Also if you have any questions, don’t hesitate to contact me as I’m always happy to discus the topic. 

 


Comparison of Apache Stream Processing Frameworks: Part 1

Posted by Petr Zapletal on Tue, Feb 2, 2016

 

A couple of months ago we were discussing the reasons behind increasing demand for distributed stream processing. I also stated there was a number of available frameworks to address it. Now it’s a time have a look at them and discuss their similarities and differences and their, from my opinion, recommended use cases.

As you probably suspect, distributed stream processing is continuous processing, aggregation and analysis of unbounded data. It’s a general computation model as MapReduce, but we expect latencies in mills or in seconds. These systems are usually modeled as Directed Acyclic Graphs, DAGs.

DAG is a graphical representation of chain of tasks and we use it for description of the topology of streaming job. I’ll help myself with terminology from Akka streams a little bit. So as you can see in the picture below, data flows through chain of processors from sources to sinks which represents the streaming task. And speaking about Akka streams, I think it’s very important to emphasize the word distributed. Because even local solutions can create and run DAG but we’re going to focus only to solutions running on multiple machines

Screen_Shot_2015-12-28_at_16.36.48.png

Points of Interest

When choosing between different systems there are a couple of points we should take care of. So let’s start with the Runtime and Programming model. The programming model provided by a platform determines a lot of its features and it should be sufficient to handle all possible use-cases for the application. This is a really crucial topic and I’ll come back very soon. 

The Functional Primitives exposed by a processing platform should be able to provide rich functionalities at individual message level like map or filter, which are pretty easy to implement even if you want to scale a lot. But it should also provide across messages functionality, like aggregations, and across stream operations like joins for example, which are much harder to scale.

State Management - Most of the applications have stateful processing logic that requires maintaining a state. The platform should allow us to maintain, access and update the state information.

For Message Delivery Guarantees, we have a couple of classes like at most once, at least once and exactly once. At most once delivery means that for each message handed to the mechanism, that message is delivered zero or one times, so messages may be lost. At least once delivery means that for each message handed to the mechanism potentially multiple attempts are made at delivering it, such that at least one succeeds. Messages may be duplicated but not lost. And finally exactly once delivery means that for each message handed to the mechanism exactly one delivery is made to the recipient, the message can neither be lost nor duplicated. So another important thing to consider.

Failures can and will happen at various levels - for example network partitions, disk failures or nodes going down and so on. Platform should be able to recover from all such failures and resume from their last successful state without harming the result

And then, we have more performance related requirements like Latency, Throughput and Scalability which are extremely important in streaming applications.

We should also take care of Maturity and Adoption Leve this information could give us a clue about potential support, available libraries or even stackoverflow answers. And it may help us a lot when choosing correct platform.

And also, last but not least, the Ease of Development and Ease of Operability. It is great when we have super fancy system which covers all our use cases but if we cannot write a program for it or we cannot deploy it we are done anyway

Runtime and Programming Model

Runtime and Programing model, is probably the most important trait of the system because it defines its expressiveness, possible operations and future limitations. Therefore it defines system capabilities and its use cases.

There are two distinctive approaches how to implement streaming system. First one is called the native streaming. It means all incoming records, or events if you want, are processed as they arrive, one by one.


Screen_Shot_2015-12-28_at_16.39.08.png

Second approach is called micro-batching. Short batches are created from incoming records and go through the system. These batches are created according to pre-defined time constant, typically every couple of seconds.

Screen_Shot_2015-12-28_at_16.40.36.png


Both approaches have inherent advantages and disadvantages. Let’s start with native streaming. The great advantage of native streaming is its expressiveness. Because it takes stream as it is, it is not limited by any unnatural abstraction over it. Also as the records are processed immediately upon arrival, achievable latencies of these systems are always better than its micro-batching companions. Apart from that, stateful operations are much easier to implement, as you’ll see later in this post. Native streaming systems have usually lower throughput and fault-tolerance is much more expensive as it has to take care (~persist & replay) of every single record. Also load-balancing is kind of issue. For example, let’s say we have data partitioned by a key and we want to process it. If the processing of some key of the partition is more resource intensive for any reason, this partition quickly becomes the job’s bottleneck

Secondly, the micro-batching. Splitting stream into micro-batches inevitably reduces system expressiveness. Some operations, especially state management or joins and splits, are much harder to implement as systems has to manipulate with whole batch. Moreover, the batch interval connects two things which should never be connected - an infrastructure property and a business logic. On the contrary, fault tolerance and load balancing are much simpler as systems just sends every batch to a worker node and if something goes wrong it just use the different. Lastly, it is good to remark, we can build micro-batching system atop native streaming quite easily

Programming models can be classified as Compositional and Declarative. Compositional approach provides basic building blocks like sources or operators and they must be tied together in order to create expected topology. New components can be usually defined by implementing some kind of interfaces. On the contrary, operators in declarative API are defined as higher order functions. It allows us to write functional code with abstract types and all its fancy stuff and the system creates and optimizes topology itself. Also declarative APIs usually provides more advanced operations like windowing or state management out of the box. We are going to have look at some code samples very soon.

Apache Streaming Landscape

There is a number of diverse frameworks available and it is literally impossible to cover all of them. So I’m forced to limit it somehow and I want to go for popular streaming solutions from Apache landscape which also provide Scala API. Therefore We’re going to focus on Apache Storm and its sibling Trident and on streaming module of very popular Spark. We’re also going talk about streaming system behind linkedIn named Samza and finally, we’re going to discuss promising Apache project Flink. I believe this is a great selection, because even if all of them are streaming systems, they approach various challenges very differently. Unfortunately I won’t talk about proprietary systems like Google MillWheel or Amazon Kinesis and also we’re going to miss interesting but still limitedly adopted systems like Intel GearPump or Apache Apex. That may be for a next time.

Screen_Shot_2015-12-28_at_16.43.04.png
Apache Storm was originally created by Nathan Marz and his team at BackType in 2010. Later it was acquired and open-sourced by Twitter and it became apache top-level project in 2014. Without any doubts, Storm was a pioneer in large scale stream processing and became de-facto industrial standard. Storm is a native streaming system and provides low-level API. Also, storm uses Thrift for topology definition and it also implements Storm multi-language protocol this basically allows to implement our solutions in large number of languages, which is pretty unique and Scala is of course of them.

Trident is a higher level micro-batching system build atop Storm. It simplifies topology building process and also adds higher level operations like windowing, aggregations or state management which are not natively supported in Storm. In addition to Storm's at most once, Trident provides exactly once delivery, on the contrary of Storm’s at most once guarantee. Trident has Java, Clojure and Scala APIs.

As we all know, Spark is very popular batch processing framework these days with a couple of built-in libraries like SparkSQL or MLlib and of course Spark Streaming. Spark’s runtime is build for batch processing and therefore spark streaming, as it was added a little bit later, does micro-batching. The stream of input data is ingested by receivers which create micro-batches and these micro-batches are processed in similar way as other Spark’s jobs. Spark streaming provides high-level declarative API in Scala, Java and Python.

Samza was originally developed in LinkedIn as proprietary streaming solution and with Kafka, which is another great linkedIn contribution to our community, it became key part of their infrastructure. As you’re going to see a little bit later, Samza builds heavily on Kafka’s log based philosophy and both together integrates very well. Samza provides compositional api and of course Scala is supported.

And the last but least, Flink. Flink is pretty old project, it has it’s origins in 2008, but right now is getting quite a lot of attention. Flink is native streaming system and provides a high level API. Flink also provides API for batch processing like Spark, but there is a fundamental distinction between those two. Flink handles batch as a special case of streaming. Everything is a stream and this is definitely better abstraction, because this is how the world really looks like.

That was a quick introduction of the systems and, as you can see on the table below, they do have pretty different traits.

Screen_Shot_2015-12-28_at_16.44.49.png

Counting Words

Wordcount is something like hello world of stream processing, it nicely shows main differences between our frameworks. Let’s start with Storm and please note, the example was simplified significantly. 

123456789101112131415 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new Split(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); ... Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) {   String word = tuple.getString(0);   Integer count = counts.containsKey(word) ? counts.get(word) + 1 : 1;   counts.put(word, count);   collector.emit(new Values(word, count)); }

First, let’s have a look at its topology definition. As you can see at line 2, we have to define a spout, or if you want, a source. And then, there is a bold, a processing component, which splits the text into the words. Then I have defined another bolt for actual word count calculation at line 4. Also have a look at the magic numbers 5, 8 and 12. These are the parallelism hints and they define how many independent threads around the cluster will be used for execution of every component. As you can see, all is very manual and low-level. Now, let’s have a look (lines 8 - 15) how is the actual WordCount bolt implemented. As long as Storm does not have in-build support for managed state so I have defined a local state, which is far from being ideal but good as a sample. Apart of that it is not very interesting, so let’s move on and have a look at Trident.

As I mentioned before, Trident is Storm’s micro-batching extension and Trident, apart of many other goodies, provides state management which is pretty useful when implementing wordcount.

1public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = ...

TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count"));

...

}

As you can see, I could use higher level operations like each (line 7) and groupBy (line 8), so it’s a little bit better. And also I was able to use Trident managed state for storing counts at line 9. 


Now it is time for a pretty declarative API provided by Apache Spark. Also keep in mind, on the contrary to the previous examples, which were significantly simplified, this is nearly all code you need, to run this simple streaming wordcount.

1val conf = new SparkConf().setAppName("wordcount")
val ssc = new StreamingContext(conf, Seconds(1))

val text = ...

val counts = text.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)

counts.print()

ssc.start()
ssc.awaitTermination()

Every spark streaming job requires StreamingContext which is basically the entry point to the streaming functionality. StreamingContext takes a configuration which is, as you can see at line 1, in our case very limited, but more importantly, it defines its batch interval (line 2), which is set to 1 second. Now you can see whole word count computation (lines 6 - 8), quite a difference, isn’t it ? That’s the reason why Spark is sometimes called Distributed Scala. As you can see, it’s quite standard functional code and spark takes care of topology definition and its distributed execution. And now, at line 12, the last part of every spark streaming job - starting the computation, just keep in mind, once started job cannot be modified.

Now let’s have a look at Apache Samza, another representative of compositional API.

1class WordCountTask extends StreamTask {

override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector,
coordinator: TaskCoordinator) {

val text = envelope.getMessage.asInstanceOf[String]

val counts = text.split(" ").foldLeft(Map.empty[String, Int]) {
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}

collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), counts))

}

The Topology is defined in samza’s properties file, but for the sake of clarity, you won’t find it here. For us it’s important the task has defined input and output channels and the communication goes through kafka topics. In our case the whole topology is the WordCountTask, which does all the work. In Samza the components are defined by implementing particular interfaces, in this case it’s a StreamTask and I’ve just overridden method process at line 3. Its parameter list contains all what’s need for connecting with the rest of the system. The computation itself at lines 8 - 10 is just a simple Scala.

Now let’s have a look at Flink, as you can see API is pretty similar to spark streaming, but notice we are not setting any batch interval.

1 val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements(...)
val counts = text.flatMap ( _.split(" ") )
.map ( (_, 1) )
.groupBy(0)
.sum(1)

counts.print()

env.execute("wordcount")

Computation itself is pretty straight forward. As you can see, there’s just a couple of functional calls and Flink takes care of its distributed computation.

Conclusion

That’s all folks for today. We went through the necessary theory and also introduced popular streaming framework from Apache landscape. Next time, we’re going to dig a little bit deeper and go through the rest of the points of interest. Hope you find it interesting and stay tuned. Also if you have any questions, don’t hesitate to contact me as I’m always happy to discus the topic.


+ Recent posts