출처 : http://the-paper-trail.org/blog/columnar-storage/


Columnar Storage

You’re going to hear a lot about columnar storage formats in the next few months, as a variety of distributed execution engines are beginning to consider them for their IO efficiency, and the optimisations that they open up for query execution. In this post, I’ll explain why we care so much about IO efficiency and show how columnar storage – which is a simple idea – can drastically improve performance for certain workloads.

Caveat: This is a personal, general research summary post, and as usual doesn’t neccessarily reflect our thinking at Cloudera about columnar storage.

Disks are still the major bottleneck in query execution over large datasets. Even a machine with twelve disks running in parallel (for an aggregate bandwidth of north of 1GB/s) can’t keep all the cores busy; running a query against memory-cached data can get tens of GB/s of throughput. IO bandwidth matters. Therefore, the best thing an engineer can do to improve the performance of disk-based query engines (like RDBMs and Impala) usually is to improve the performance of reading bytes from disk. This can mean decreasing the latency (for small queries where the time to find the data to read might dominate), but most usually this means improving the effective throughput of reads from disk.

The traditional way to improve disk bandwidth has been to wait, and allow disks to get faster. However, disks are not getting faster very quickly (having settled at roughly 100 MB/s, with ~12 disks per server), and SSDs can’t yet achieve the storage density to be directly competitive with HDDs on a per-server basis.

The other way to improve disk performance is to maximise the ratio of ‘useful’ bytes read to total bytes read. The idea is not to read more data than is absolutely necessary to serve a query, so the useful bandwidth realised is increased without actually improving the performance of the IO subsystem. Enter columnar storage, a principle for file format design that aims to do exactly that for query engines that deal with record-based data.

Columns vs. Rows

Traditional database file format store data in rows, where each row is comprised of a contiguous collection of column values. On disk, that looks roughly like the following:

Row-major On-disk Layout

This row-major layout usually has a header for each row that describes, for example, which columns in the row are NULL. Each column value is then stored contiguously after the header, followed by another row with its own header, and so on.

Both HDDs and SSDs are at their most efficient when reading data sequentially from disk (for HDDs the benefits are particularly pronounced). In fact, even a read of a few bytes usually brings in an entire block of 4096 bytes from disk, because it is effectively the same cost to read (and the operating system usually deals with data in 4k page-sized chunks). For row-major formats it’s therefore most efficient to read entire rows at a time.

Queries that do full table-scans – i.e. those that don’t take advantage of any kind of indexing and need to visit every row – are common in analytical workloads; with row-major formats a full scan of a table will read every single byte of the table from disk. For certain queries, this is appropriate. Trivially, SELECT * FROM table requires returning every single column of every single row in the table, and so the IO costs for executing that query on a row-major format are a single-seek and a single large contiguous read (although that is likely to be broken up for pipelining purposes). The read is unavoidable, as is the single seek; therefore row-major formats allow for optimal IO usage. More generally, SELECT <col_set> FROM table WHERE <predicate_set> will be relatively efficient for row-major formats if either a) evaluating the predicate_set requires reading a large subset of the set of columns or b) col_set is a large subset of the set of columns (i.e. the projectivity is high) and the set of rows returned by the evaluation of the predicates over the table is a large proportion of the total set of rows (i.e. the selectivity is high). More simply, a query is going to be efficient if it requires reading most of the columns of most of the rows. In these cases, row-major formats allow the query execution engine to achieve good IO efficiency.

However, there is a general consensus that these SELECT * kinds of queries are not representative of typical analytical workloads; instead either a large number of columns are not projected, or they are projected only for a small subset of rows where only a few columns are required to decide which rows to return. Coupled with a general trend towards very wide tables with high column counts, the total number of bytes that are required to satisfy a query are often a relatively small fraction of the size on disk of the target table. In these cases, row-major formats often are quite wasteful in the amount of IO they require to execute a query.

Instead of a format that makes it efficient to read entire rows, it’s advantageous for analytical workloads to make it efficient to read entire columns at once. Based on our understanding of what makes disks efficient, we can see that the obvious approach is to store columns values densely and contiguously on disk. This is the basic idea behind columnar file formats. The following diagram shows what this looks like on disk:

Column-Major On-disk Layout

A row is split across several column blocks, which may even be separate files on disk. Reading an entire column now requires a single seek plus a large contiguous read, but the read length is much less than for extracting a single column from a row-major format. In this figure we have organised the columns so that they are all ordered in the same way; later we’ll see how we can relax that restriction and use different orderings to make different queries more efficient.

Query Execution

The diagram below shows what a simple query plan for SELECT col_b FROM table WHERE col_a > 5 might look like for a query engine reading from a traditional row-major file format. A scan node reads every row in turn from disk, and streams the rows to a predicate evaluation node, which looks at the value of col_a in each row. Those rows that pass the predicate are sent to a projection node which constructs result tuples containing col_b.

Row Query Plan

Compare that to the query plan below, for a query engine reading from columnar storage. Each column referenced in the query is read independently. The predicate is evaluated over col_a to produce a list of matching row IDs. col_b is then scanned with respect to that list of IDs, and each matching value is returned as a query result. This query plan performs two IO seeks (to find the beginning of both column files), instead of one, and issues two consecutive reads rather than one large read. The pattern of using IDs for each column value is very common to make reconstructing rows easier; usually columns are all sorted on the same key so the Nth value of col_a belongs to the same row as the Nth value of col_b.

Columnar Query Plan

The extra IO cost for the row-format query is therefore the time it takes to read all those extra columns. Let’s assume the table is 10 columns wide, ten million rows long and each value is 4 bytes, which are all conservative estimates. Then there is an extra 8 * 1M * 4 bytes, or 32MB of extra data read, which is ~3.20s on a query that would likely otherwise take 800ms; an overhead of 300%. When disks are less performant, or column widths wider, the effect becomes exaggerated.

This, then, is the basic idea of columnar storage: we recognise that analytical workloads rarely require full scans of all table data, but do often require full scans of a small subset of the columns, and so we arrange to make column scans cheap at the expense of extra cost reading individual rows.

The Cost of Columnar

Is this a free lunch? Should every analytical database go out and change every file format to be column-major? Obviously the story is more complicated than that. There are some query archetypes that suffer when data is stored in a columnar format.

The obvious drawback is that it is expensive to reassemble a row, since the separate values that comprise it are spread far across the disk. Every column included in a projection implies an extra disk seek, and this can add up when the projectivity of a query is high. Therefore, for highly projective queries, row-major formats can be more efficient (and therefore columnar formats are not strictly better than row-major storage even from a pure IO perspective).

There are more subtle repurcussions of each row being scattered across the disk. When a row-major format is read into memory, and ultimately into CPU cache, it is in a format that permits cheap reference to multiple columns at a time. Row-major formats have good in-memory spatial locality, and there are common operations that benefit enormously from this.

For example, a query that selects the sum of two columns can sometimes be executed (once the data is in memory) faster on row-major formats, since the columns are almost always in the same cache line for each row. Columnar representations are less well suited; each column must be brought into memory at the same time and moved through in lockstep (yet this is still not cache efficient if each column is ordered differently), or the initial column must be scanned, each value buffered and then the second column scanned separately to complete the half-finished output tuple.

The same general problem arises when preparing each tuple to write out as a result of (non-aggregating) query. Selecting several columns at once requires ‘row reconstruction’ at some point in the query lifecycle. Deciding when to do this is a complicated process, and (as we shall see) the literature has not yet developed a good rule of thumb. Many databases are row-major internally, and therefore a columnar format is transposed into a row-major one relatively early in the scanning process. As described above, this can require buffering half-constructed tuples in memory. For this reason, columnar formats are often partiioned into ‘row-groups’; each column chunk N contains rows (K*N) to ((K+1) * N). This reduces the amount of buffering required, at the cost of a few more disk seeks.

Further Aspects of Columnar Storage

Fully column-oriented execution engines

Relevant papers:
C-Store: A Column-oriented DBMS
The Vertica Analytic Database: C-Store 7 Years Later
Materialization Strategies in a Column-Oriented DBMS
Performance Tradeoffs in Read-Optimized Databases
Column-Stores vs. Row-Stores: How Different Are They Really?

In this post, I’ve talked mostly about the benefits of columnar storage for scans – query operators that read data from disk, but whose ultimate output is a batch of rows for the rest of the query plan to operate on. In fact, columnar data can be integrated into pretty much every operator in a query execution engine. C-Store, the research project precursor to Vertica, explored a lot of the consequences of keeping data in columns until later on in the query plan. Eventually, of course, the columns have to be converted to rows, since the user expects a result in row-major format. The choice of when to perform this conversion is called late or early materialisation; viewed this way column-stores and row-stores can be considered two points on a spectrum of early to late materialisation strategies. Materialisation is studied in detail in the materialisation strategies paper above. Their conclusions are that the correct time to construct a tuple depends on the query plan (two broad patterns are considered: pipelining and parallel scans) and the query selectivity. Unfortunately, supporting both strategies would involve significant implementation cost – each operator would have to support two interfaces, and two parallel execution engines would effectively be frankensteined together. In general, late materialisation can lead to significant advantages: for example, by delaying the cost of reconstructing a tuple, it can be avoided if the tuple is ultimately filtered out by a predicate.

The difference between row-based and columnar execution engines is studied in the Performance Tradeoffs… and Column-Stores vs. Row-Stores… papers. The former takes a detailed look at when each strategy is superior – coming out in favour mostly of column-stores, but only with simple queries and basic query plans. The latter tries to implement common column-store optimisations in a traditional row-store, without changing the code. This means a number of increasingly brittle hacks to emulate columnar storage.

Compression

Relevant papers:
Integrating Compression and Execution on Column-Oriented Database Systems

A column of values drawn from the same set (like item price, say) is likely to be highly amenable to compression since the values contained are similar, and often identical. Compressing a column has at least two significant advantages on IO cost: less space is required on disk, and less IO required to bring a column into memory (at the cost of some CPU to decompress which is usually going spare). Some compression formats – for example run-length encoding – allow execution engines to operate on the compressed data directly, filtering large chunks at a time without first decompressing them. This is another advantage of late materialisation – by keeping the data compressed until late in the query plan, these optimisations become available to many operators, not just the scan.

Hybrid approaches

Relevant papers:
Weaving Relations for Cache Performance

Since neither row-major nor column-major is strictly superior on every workload, it’s natural that some research has been done into hybrid approaches that can achieve the best of both worlds. The most commonly known approach is PAX – Partition Attributes Across – which splits the table into page-sized groups of rows, and inside those groups formats the rows in column-major order. This is the same approach as the row-groups used to prevent excessive buffering described earlier, but this is not the aim of PAX; with PAX the original intention was to make CPU processing more efficient by having individual columns available contiguously to perform filtering, but also to have all the columns for a particular row nearby inside a group to make tuple reconstruction cheaper. The result of this approach is that IO costs don’t go down (because each row-group is only a page long, and is therefore read in its entirety), but reconstruction and filtering is cheaper than for true columnar formats.


출처 : http://www.smartdatacollective.com/kingmesal/386160/apache-drill-vs-apache-spark-what-s-right-tool-job


11
70
81

Image

If you’re looking to implement a big data project, you’re probably deciding whether to go with Apache Spark SQL or Apache Drill. This article can help you decide which query tool you should use for the kinds of projects you’re working on.

Spark SQL

Spark SQL is simply a module that lets you work with structured data using Apache Spark. It allows you to mix SQL within your existing Spark projects. Not only do you get access to a familiar SQL query language, you also get access to powerful tools such as Spark Streaming and the MLlib machine learning library.

Spark uses a special data structure called a DataFrame that represents data as named columns, similar to relational tables. You can query the data from Scala, Python, Java, and R. This enables you to perform powerful analysis of your data rather than just retrieving it. But it’s even more powerful when extracting data for use with the machine learning library. With MLlib, you can perform sophisticated analyses, detect credit card fraud, and process data coming from servers.

As with Drill, Spark SQL is compatible with a number of data formats, including some of the same ones that Drill supports: Parquet, JSON, and Hive. Spark SQL can handle multiple data sources similar to the way Drill can, but you can funnel the data into your machine learning systems mentioned earlier. This gives you a lot of power to analyze multiple data points, especially when combined with Spark Streaming. Spark SQL serves as a way to glue together different data sources and libraries into a powerful application.

Apache Drill

Apache Drill is a powerful database engine that also lets you use SQL for queries. You can use a number of data formats, including Parquet, MongoDB, MapR-DB, HDFS, MapR-FS, Amazon S3, Azure Blob Storage, Google Cloud Storage, Swift, NAS, and more.

You can use data from multiple data sources and join them without having to pull the data out, making Drill especially useful for business intelligence.

The ability to view multiple types of data, some of which have both strict and loose schema, as well as being able to allow for complex data models, might seem like a drag on performance. However, Drill uses schema discovery and a hierarchical columnar data model to treat data like a set of tables, independently of how the data is actually modeled. 

Almost all existing BI tools, including Tableau, Qlik, MicroStrategy, Spotfire, SAS, and even Excel, can use Drill’s JDBC and ODBC drivers to connect to it. This makes Drill very useful for people already using BI and SQL databases to move up to big data workloads using tools they’re already familiar with.

Drill’s JDBC driver lets BI tools access Drill. JDBC lets developers query large datasets using Java. This has a similar advantage that using ANSI SQL does: lots of developers are already familiar with Java and can transfer their skills to Drill.

Easy Data Access in Drill

One of Drill’s biggest strengths is its ability to secure databases at the file level using views and impersonation.

Views within Drill are the same as those within relational databases. They allow a simplified query to hide the complexities of the underlying tables. Impersonation allows a user to access data as another user. This enables fine-grained access to the raw data when other members of your team should not be able to view sensitive or secure data.

Views and impersonation are beyond the scope of Apache Spark.

Conclusion

So which query engine should you choose? As always, it depends. If you’re mainly looking to query data quickly, even across multiple data sources, then you should look into Drill. If you want to go beyond querying data and work with data in more algorithmic ways, then Spark SQL might be for you. You can always test both out by playing around in your own Sandbox environment, which lets you play around with these powerful systems on your own machine.



Authored by:

Jim Scott

James A. Scott (prefers to go by Jim) is Director, Enterprise Strategy & Architecture at MapR Technologies and is very active in the Hadoop community. Jim helped build the Hadoop community in Chicago as cofounder of the Chicago Hadoop Users Group. He has implemented Hadoop at three different companies, supporting a variety of enterprise use cases from managing Points of Interest for mapping ...

See complete profile


'빅데이터' 카테고리의 다른 글

Columnar Storage  (0) 2016.07.14
Hello, TensorFlow!  (0) 2016.07.08
분산 로그 수집기 Fluentd 소개  (0) 2016.06.14
람다 아키텍처(Lambda Architecture)  (0) 2016.05.18
Lambda Architecture  (0) 2016.05.18

출처 : https://www.oreilly.com/learning/hello-tensorflow


Hello, TensorFlow!

Building and training your first TensorFlow graph from the ground up.

The TensorFlow project is bigger than you might realize. The fact that it's a library for deep learning, and its connection to Google, has helped TensorFlow attract a lot of attention. But beyond the hype, there are unique elements to the project that are worthy of closer inspection:

  • The core library is suited to a broad family of machine learning techniques, not “just” deep learning.
  • Linear algebra and other internals are prominently exposed.
  • In addition to the core machine learning functionality, TensorFlow also includes its own logging system, its own interactive log visualizer, and even its own heavily engineered serving architecture.
  • The execution model for TensorFlow differs from Python's scikit-learn, or most tools in R.

Cool stuff, but—especially for someone hoping to explore machine learning for the first time—TensorFlow can be a lot to take in.

Get O'Reilly's weekly data newsletter

How does TensorFlow work? Let's break it down so we can see and understand every moving part. We'll explore the data flow graph that defines the computations your data will undergo, how to train models with gradient descent using TensorFlow, and how TensorBoard can visualize your TensorFlow work. The examples here won't solve industrial machine learning problems, but they'll help you understand the components underlying everything built with TensorFlow, including whatever you build next!

Names and execution in Python and TensorFlow

The way TensorFlow manages computation is not totally different from the way Python usually does. With both, it's important to remember, to paraphrase Hadley Wickham, that an object has no name (see Figure 1). In order to see the similarities (and differences) between how Python and TensorFlow work, let’s look at how they refer to objects and handle evaluation.

Names “have” objects, rather than the reverse
Figure 1. Names “have” objects, rather than the reverse. Image courtesy of Hadley Wickham, used with permission.

The variable names in Python code aren't what they represent; they're just pointing at objects. So, when you say in Python that foo = [] and bar = foo, it isn't just that foo equals bar; foo is bar, in the sense that they both point at the same list object.

>>> foo = []
>>> bar = foo
>>> foo == bar
## True
>>> foo is bar
## True

You can also see that id(foo) and id(bar) are the same. This identity, especially with mutable data structures like lists, can lead to surprising bugs when it's misunderstood.

Internally, Python manages all your objects and keeps track of your variable names and which objects they refer to. The TensorFlow graph represents another layer of this kind of management; as we’ll see, Python names will refer to objects that connect to more granular and managed TensorFlow graph operations.

When you enter a Python expression, for example at an interactive interpreter or Read Evaluate Print Loop (REPL), whatever is read is almost always evaluated right away. Python is eager to do what you tell it. So, if I tell Python to foo.append(bar), it appends right away, even if I never use foo again.

A lazier alternative would be to just remember that I said foo.append(bar), and if I ever evaluate foo at some point in the future, Python could do the append then. This would be closer to how TensorFlow behaves, where defining relationships is entirely separate from evaluating what the results are.

TensorFlow separates the definition of computations from their execution even further by having them happen in separate places: a graph defines the operations, but the operations only happen within a session. Graphs and sessions are created independently. A graph is like a blueprint, and a session is like a construction site.

Back to our plain Python example, recall that foo and bar refer to the same list. By appending bar into foo, we've put a list inside itself. You could think of this structure as a graph with one node, pointing to itself. Nesting lists is one way to represent a graph structure like a TensorFlow computation graph.

>>> foo.append(bar)
>>> foo
## [[...]]

Real TensorFlow graphs will be more interesting than this!

The simplest TensorFlow graph

To start getting our hands dirty, let’s create the simplest TensorFlow graph we can, from the ground up. TensorFlow is admirably easier to install than some other frameworks. The examples here work with either Python 2.7 or 3.3+, and the TensorFlow version used is 0.8.

>>> import tensorflow as tf

At this point TensorFlow has already started managing a lot of state for us. There's already an implicit default graph, for example. Internally, the default graph lives in the _default_graph_stack, but we don't have access to that directly. We use tf.get_default_graph().

>>> graph = tf.get_default_graph()

The nodes of the TensorFlow graph are called “operations,” or “ops.” We can see what operations are in the graph with graph.get_operations().

>>> graph.get_operations()
## []

Currently, there isn't anything in the graph. We’ll need to put everything we want TensorFlow to compute into that graph. Let's start with a simple constant input value of one.

>>> input_value = tf.constant(1.0)

That constant now lives as a node, an operation, in the graph. The Python variable name input_value refers indirectly to that operation, but we can also find the operation in the default graph.

>>> operations = graph.get_operations()
>>> operations
## [<tensorflow.python.framework.ops.Operation at 0x1185005d0>]
>>> operations[0].node_def
## name: "Const"
## op: "Const"
## attr {
##   key: "dtype"
##   value {
##     type: DT_FLOAT
##   }
## }
## attr {
##   key: "value"
##   value {
##     tensor {
##       dtype: DT_FLOAT
##       tensor_shape {
##       }
##       float_val: 1.0
##     }
##   }
## }

TensorFlow uses protocol buffers internally. (Protocol buffers are sort of like a Google-strength JSON.) Printing the node_def for the constant operation above shows what's in TensorFlow's protocol buffer representation for the number one.

People new to TensorFlow sometimes wonder why there's all this fuss about making “TensorFlow versions” of things. Why can't we just use a normal Python variable without also defining a TensorFlow object? One of the TensorFlow tutorials has an explanation:

To do efficient numerical computing in Python, we typically use libraries like NumPy that do expensive operations such as matrix multiplication outside Python, using highly efficient code implemented in another language. Unfortunately, there can still be a lot of overhead from switching back to Python every operation. This overhead is especially bad if you want to run computations on GPUs or in a distributed manner, where there can be a high cost to transferring data.

TensorFlow also does its heavy lifting outside Python, but it takes things a step further to avoid this overhead. Instead of running a single expensive operation independently from Python, TensorFlow lets us describe a graph of interacting operations that run entirely outside Python. This approach is similar to that used in Theano or Torch.

TensorFlow can do a lot of great things, but it can only work with what's been explicitly given to it. This is true even for a single constant.

If we inspect our input_value, we see it is a constant 32-bit float tensor of no dimension: just one number.

>>> input_value
## <tf.Tensor 'Const:0' shape=() dtype=float32>

Note that this doesn't tell us what that number is. To evaluate input_value and get a numerical value out, we need to create a “session” where graph operations can be evaluated and then explicitly ask to evaluate or “run” input_value. (The session picks up the default graph by default.)

>>> sess = tf.Session()
>>> sess.run(input_value)
## 1.0

It may feel a little strange to “run” a constant. But it isn't so different from evaluating an expression as usual in Python; it's just that TensorFlow is managing its own space of things—the computational graph—and it has its own method of evaluation.

The simplest TensorFlow neuron

Now that we have a session with a simple graph, let's build a neuron with just one parameter, or weight. Often, even simple neurons also have a bias term and a non-identity activation function, but we'll leave these out.

The neuron's weight isn't going to be constant; we expect it to change in order to learn based on the “true” input and output we use for training. The weight will be a TensorFlow variable. We'll give that variable a starting value of 0.8.

>>> weight = tf.Variable(0.8)

You might expect that adding a variable would add one operation to the graph, but in fact that one line adds four operations. We can check all the operation names:

>>> for op in graph.get_operations(): print(op.name)
## Const
## Variable/initial_value
## Variable
## Variable/Assign
## Variable/read

We won't want to follow every operation individually for long, but it will be nice to see at least one that feels like a real computation.

>>> output_value = weight * input_value

Now there are six operations in the graph, and the last one is that multiplication.

>>> op = graph.get_operations()[-1]
>>> op.name
## 'mul'
>>> for op_input in op.inputs: print(op_input)
## Tensor("Variable/read:0", shape=(), dtype=float32)
## Tensor("Const:0", shape=(), dtype=float32)

This shows how the multiplication operation tracks where its inputs come from: they come from other operations in the graph. To understand a whole graph, following references this way quickly becomes tedious for humans. TensorBoard graph visualization is designed to help.

How do we find out what the product is? We have to “run” the output_value operation. But that operation depends on a variable: weight. We told TensorFlow that the initial value of weight should be 0.8, but the value hasn't yet been set in the current session. The tf.initialize_all_variables() function generates an operation which will initialize all our variables (in this case just one) and then we can run that operation.

>>> init = tf.initialize_all_variables()
>>> sess.run(init)

The result of tf.initialize_all_variables() will include initializers for all the variables currently in the graph, so if you add more variables you'll want to use tf.initialize_all_variables() again; a stale init wouldn't include the new variables.

Now we're ready to run the output_value operation.

>>> sess.run(output_value)
## 0.80000001

Recall that's 0.8 * 1.0 with 32-bit floats, and 32-bit floats have a hard time with 0.8; 0.80000001 is as close as they can get.

See your graph in TensorBoard

Up to this point, the graph has been simple, but it would already be nice to see it represented in a diagram. We'll use TensorBoard to generate that diagram. TensorBoard reads the name field that is stored inside each operation (quite distinct from Python variable names). We can use these TensorFlow names and switch to more conventional Python variable names. Using tf.mul here is equivalent to our earlier use of just * for multiplication, but it lets us set the name for the operation.

>>> x = tf.constant(1.0, name='input')
>>> w = tf.Variable(0.8, name='weight')
>>> y = tf.mul(w, x, name='output')

TensorBoard works by looking at a directory of output created from TensorFlow sessions. We can write this output with a SummaryWriter, and if we do nothing aside from creating one with a graph, it will just write out that graph.

The first argument when creating the SummaryWriter is an output directory name, which will be created if it doesn't exist.

>>> summary_writer = tf.train.SummaryWriter('log_simple_graph', sess.graph)

Now, at the command line, we can start up TensorBoard.

$ tensorboard --logdir=log_simple_graph

TensorBoard runs as a local web app, on port 6006. (“6006” is “goog” upside-down.) If you go in a browser to localhost:6006/#graphs you should see a diagram of the graph you created in TensorFlow, which looks something like Figure 2.

A TensorBoard visualization of the simplest TensorFlow neuron
Figure 2. A TensorBoard visualization of the simplest TensorFlow neuron.

Making the neuron learn

Now that we’ve built our neuron, how does it learn? We set up an input value of 1.0. Let's say the correct output value is zero. That is, we have a very simple “training set” of just one example with one feature, which has the value one, and one label, which is zero. We want the neuron to learn the function taking one to zero.

Currently, the system takes the input one and returns 0.8, which is not correct. We need a way to measure how wrong the system is. We'll call that measure of wrongness the “loss” and give our system the goal of minimizing the loss. If the loss can be negative, then minimizing it could be silly, so let's make the loss the square of the difference between the current output and the desired output.

>>> y_ = tf.constant(0.0)
>>> loss = (y - y_)**2

So far, nothing in the graph does any learning. For that, we need an optimizer. We'll use a gradient descent optimizer so that we can update the weight based on the derivative of the loss. The optimizer takes a learning rate to moderate the size of the updates, which we'll set at 0.025.

>>> optim = tf.train.GradientDescentOptimizer(learning_rate=0.025)

The optimizer is remarkably clever. It can automatically work out and apply the appropriate gradients through a whole network, carrying out the backward step for learning.

Let's see what the gradient looks like for our simple example.

>>> grads_and_vars = optim.compute_gradients(loss)
>>> sess.run(tf.initialize_all_variables())
>>> sess.run(grads_and_vars[1][0])
## 1.6

Why is the value of the gradient 1.6? Our loss is error squared, and the derivative of that is two times the error. Currently the system says 0.8 instead of 0, so the error is 0.8, and two times 0.8 is 1.6. It's working!

For more complex systems, it will be very nice indeed that TensorFlow calculates and then applies these gradients for us automatically.

Let's apply the gradient, finishing the backpropagation.

>>> sess.run(optim.apply_gradients(grads_and_vars))
>>> sess.run(w)
## 0.75999999  # about 0.76

The weight decreased by 0.04 because the optimizer subtracted the gradient times the learning rate, 1.6 * 0.025, pushing the weight in the right direction.

Instead of hand-holding the optimizer like this, we can make one operation that calculates and applies the gradients: the train_step.

>>> train_step = tf.train.GradientDescentOptimizer(0.025).minimize(loss)
>>> for i in range(100):
>>>     sess.run(train_step)
>>> 
>>> sess.run(y)
## 0.0044996012

Running the training step many times, the weight and the output value are now very close to zero. The neuron has learned!

Training diagnostics in TensorBoard

We may be interested in what's happening during training. Say we want to follow what our system is predicting at every training step. We could print from inside the training loop.

>>> sess.run(tf.initialize_all_variables())
>>> for i in range(100):
>>>     print('before step {}, y is {}'.format(i, sess.run(y)))
>>>     sess.run(train_step)
>>> 
## before step 0, y is 0.800000011921
## before step 1, y is 0.759999990463
## ...
## before step 98, y is 0.00524811353534
## before step 99, y is 0.00498570781201

This works, but there are some problems. It's hard to understand a list of numbers. A plot would be better. And even with only one value to monitor, there's too much output to read. We're likely to want to monitor many things. It would be nice to record everything in some organized way.

Luckily, the same system that we used earlier to visualize the graph also has just the mechanisms we need.

We instrument the computation graph by adding operations that summarize its state. Here, we'll create an operation that reports the current value of y, the neuron's current output.

>>> summary_y = tf.scalar_summary('output', y)

When you run a summary operation, it returns a string of protocol buffer text that can be written to a log directory with a SummaryWriter.

>>> summary_writer = tf.train.SummaryWriter('log_simple_stats')
>>> sess.run(tf.initialize_all_variables())
>>> for i in range(100):
>>>     summary_str = sess.run(summary_y)
>>>     summary_writer.add_summary(summary_str, i)
>>>     sess.run(train_step)
>>> 

Now after running tensorboard --logdir=log_simple_stats, you get an interactive plot at localhost:6006/#events (Figure 3).

A TensorBoard visualization of a neuron’s output against training iteration number
Figure 3. A TensorBoard visualization of a neuron’s output against training iteration number.

Flowing onward

Here's a final version of the code. It's fairly minimal, with every part showing useful (and understandable) TensorFlow functionality.

import tensorflow as tf

x = tf.constant(1.0, name='input')
w = tf.Variable(0.8, name='weight')
y = tf.mul(w, x, name='output')
y_ = tf.constant(0.0, name='correct_value')
loss = tf.pow(y - y_, 2, name='loss')
train_step = tf.train.GradientDescentOptimizer(0.025).minimize(loss)

for value in [x, w, y, y_, loss]:
    tf.scalar_summary(value.op.name, value)

summaries = tf.merge_all_summaries()

sess = tf.Session()
summary_writer = tf.train.SummaryWriter('log_simple_stats', sess.graph)

sess.run(tf.initialize_all_variables())
for i in range(100):
    summary_writer.add_summary(sess.run(summaries), i)
    sess.run(train_step)

The example we just ran through is even simpler than the ones that inspired it in Michael Nielsen's Neural Networks and Deep Learning. For myself, seeing details like these helps with understanding and building more complex systems that use and extend from simple building blocks. Part of the beauty of TensorFlow is how flexibly you can build complex systems from simpler components.

If you want to continue experimenting with TensorFlow, it might be fun to start making more interesting neurons, perhaps with different activation functions. You could train with more interesting data. You could add more neurons. You could add more layers. You could dive into more complex pre-built models, or spend more time with TensorFlow's own tutorials and how-to guides. Go for it!


Article image: Braided river. (source: National Park Service, Alaska Region on Flickr).

Aaron Schumacher

Aaron Schumacher is a data scientist and software engineer for Deep Learning Analytics. He has taught with Python and R for General Assembly and the Metis data science bootcamp. Aaron has also worked with data at Booz Allen Hamilton, New York University, and the New York City Department of Education. Aaron’s career-best breakdancing result was advancing to the semi-finals of the R16 Korea 2009 individual footwork battle. He is honored to now be the least significant contributor to TensorFlow 0.9. 



http://bcho.tistory.com/1115

분산 로그 수집기 Fluentd 소개


조대협 (http://bcho.tistory.com)



요즘 들어 빅데이타 분석 관련 기술들을 보다보니, 역시나 여러 데이타 소스에서 데이타를 수집해 오는 부분이 여러 데이타 소스를 커버해야 하고, 분산된 여러 서버에서 데이타를 수집해야 하는 만큼 수집 컴포넌트의 중요성이 점점 더 올라가는 것 같다.

그래서 요즘 빅데이타를 위한 데이타(및 로그) 수집 플랫폼을 보고 있는데, 예전 Flume 등 여러 로그 수집 솔루션이 있었는 것에 비해서 조금 정리된 느낌이라고나 할까?  Scribed, Fluentd 그리고 ELK (Elastic Search + Logstash + Kibana 조합)에서 사용되는 Logstash등이 있는데, 대부분 Fluentd와 Logstash로 수렴 되는 것 같다. 양쪽 모두 오픈소스이고 별도의 엔터프라이즈 라이센스 정책을 가지고 있다.

Logstash는 아키텍쳐 적응에 대한 유연성과 연동 솔루션에 대한 호환성을 강조하고 있기 때문에 타 솔루션과 연동이 강하고 반면, Fluentd는 아키텍쳐의 단순성과 이를 기반으로 한 안정성을 초점을 두고 있다. 그래서 아키텍쳐 구성이나 설정이 간단하다.


이 글에서는 Fluentd에 대한 간략한 개념과 사용 방법에 대해서 알아보도록 하겠다.

Fluentd를 이용한 로그 수집 아키텍쳐

Fluentd를 이용한 로그 수집 아키텍쳐를 살펴보면 다음과 같다.

아래 그림과 같이 각 서버에, Fluentd를 설치하면, 서버에서 기동되고 있는 서버(또는 애플리케이션)에서 로그를 수집해서 중앙 로그 저장소 (Log Store)로 전송 하는 방식이다.




위의 그림은 가장 기본적인 구조로 Fluentd가 로그 수집 에이전트 역할만을 하는 구조인데, 이에 더해서 다음과 같이 각 서버에서 Fluentd에서 수집한 로그를 다른 Fluentd로 보내서 이 Fluentd가 최종적으로 로그 저장소에 저장하도록 할 수 도 있다.


중간에 fluentd를 넣는 이유는, 이 fluentd가 앞에서 들어오는 로그들을 수집해서 로그 저장소에 넣기 전에 로그 트래픽을 Throttling (속도 조절)을 해서 로그 저장소의 용량에 맞게 트래픽을 조정을 할 수 있다.

또는 다음 그림과 같이 로그를 여러개의 저장소에 복제해서 저장하거나 로그의 종류에 따라서 각각 다른 로그 저장소로 라우팅이 가능하다.


Fluentd 내부 구조

Fluentd를 이용해서 로그 수집 아키텍쳐를 구성하는 방법을 대략적으로 알아보았는데, 그렇다면 Fluentd 자체의 구조는 어떻게 되어 있을까?

Fluentd는 크게 다음 그림과 같이 Input,Parser,Engine,Filter,Buffer,Ouput,Formatter 7개의 컴포넌트로 구성이 된다.  7개의 컴포넌트중 Engine을 제외한 나머지 6개는 플러그인 형태로 제공이 되서 사용자가 설정이 가능하다.

일반적인 데이타 흐름은 Input → Engine → Output 의 흐름으로 이루어 지고,  Parser, Buffer, Filter, Formatter 등은 설정에 따라서 선택적으로 추가 또는 삭제할 수 있다.



Input

Input은 로그를 수집하는 플러그인으로, 다양한 로그 소스를 지원한다. HTTP, tail, TCP 등 기본 플러그인 이외에도, 확장 플러그인을 통해서 다양한 서버나 애플리케이션으로 부터 다양한 포맷의 데이타를 수집할 수 있도록 해준다.

Parser (Optional)

Input 플러그인을 통해서 데이타를 읽어도 데이타 포맷이 Fluentd에서 지원하지 않는 데이타 포맷인 경우가 있기 때문에, 이 데이타를 파싱 하기 위해서, Parser 플러그인을 선택적으로 사용할 수 있다. Regular expression  기반으로 스트링을 Parsing 하는 플러그인 뿐 아니라, apache, nginx, syslog등 다양한 포맷의 데이타를 파싱할 수 있는 플러그인을 제공한다.

Filter (Optional)

Filter 플러그인을 읽어드린 데이타를 output으로 보내기 전에, 다음과 같은 3가지 기능을 한다.

  • 필터링

  • 데이타 필드 추가

  • 데이타 필드 삭제 또는 특정 필드 마스킹

필터링은 특정 데이타만 output 필드로 보내고, 나머지는 버리도록 한다. 예를 들어 로그 데이타에 “seoul”이라는 문자열이 있을 경우만 로그 서버로 보내거나 “error”, “warning”과 같은 특정 패턴이 있을 경우에만 로그 저장소로 보내도록할 수 있다.


데이타 필드 추가는 기존 들어온 로그 데이타에 데이타를 전송한 서버명 (Host명)등을 추가해서 로그 저장소로 보낼 수 있다.

마지막으로 데이타 필드 삭제는 불필요한 필드를 삭제하거나 개인 정보등 민감 정보를 삭제하거나 해쉬화하여 데이타 저장소로 보낼 수 있는 기능을 한다.

Output

Output은 Input 플러그인과 반대로, 앞에서 필터링된 데이타를  데이타 저장소 솔루션에 데이타를 저장하도록 한다. (mongodb나 AWS S3 , Google의 Big query등)

Formatter (Optional)

Output 플러그인을 통해서 데이타를 저장소에 쓸 때, Formatter 를 이용하면 쓰는 데이타의 포맷을 정의할 수 있다.(cf. Input의 parser가 포맷에 맞게 읽는 플러그인이라면, Formatter는 Output을 위한 포맷을 지정하는 플러그인이라고 보면 된다.)

Buffer (Optional)

Input에서 들어온 데이타를 바로 Output으로 보내서 쓰는것이 아니라 중간에 선택적으로 Buffer를 둬서 Throttling을 할 수 있다. 버퍼는 File과  Memory 두가지를 사용할 수 있다.

간단하게 구조와 작동 원리를 보면 다음과 같다.



<그림. fluentd의 로그 writing 흐름>

원본  http://docs.fluentd.org/articles/buffer-plugin-overview


버퍼에는 로그데이타를 분리하는 tag 단위로 chunk가 생성이 된다.

chunk는 태그별 큐라고 보면 된다. 예를 들어 error, info, warning, user 와 같이 태그를 분리하면 error 로그는 error chunk에 저장이 되고, info 로그는 info chunk에 저장된다.

Chunk에 데이타가 쌓여서 buffer_chunk_limit 만큼 chunk가 쌓여서 full이 되거나, 또는 설정값에 정의된 flush_interval 주기가 되면 로그 저장소로 로그를 쓰기 위해서 Queue에 전달이 된다.


<그림. Memory buffer 설정 예제>

참고 : http://docs.fluentd.org/articles/buffer-plugin-overview


다음 Queue에서는 데이타를 읽어서 로그 저장소에 데이타를 쓰는데, 로그 저장소에 문제가 없다면 바로 로그가 써지겠지만, 네트워크 에러나 로그 저장소 에러로 로그를 쓰지 못할때는 retry_wait 시간 만큼 대기를 한 후에, 다시 쓰기를 시도한다. 다시 쓰기를 실패하면 전에 기다린 시간의 2배 만큼, 또 실패하면 또 2배만큼을 기다린다. (1초, 2초, 4초,8초…) 다시 쓰기 시도는 설정값에 지정된 retry_limit 횟수까지 계속 진행한다.

만약에 Queue 가 차버렸을때 처리에 대한 정책을 설정할 수 있는데, “exception”과, “block” 모드 두가지고 있고, exception 모드일 경우에는 BufferQueueLimitError 를 내도록 하고, block 모드의 경우에는 BufferQueueLimitError가 해결될때 까지, input plugin을 중지 시킨다 (더이상 로그를 수집하지 않는다는 이야기).

Queue가 차버렸을때 다른 처리 방법으로는 큐가 다 찾을때, Sencondary output을 지정해서, 다른 로그 저장소에 로그를 저장하는 방법이 있다. 예를 들어 로그를 mongodb에 저장하도록 했는데, mongodb 나 네트워크 장애로 로그를 쓸 수 없는 경우에는 secondary output을 AWS S3로 지정해놓고, S3로 로그를 일단 저장하게 하고 나중에 mongodb가 복구된 후에, S3에서 다시 mongodb로 로그를 수집하는 방식을 취할 수 있다.



<그림. Secondary output 설정 예제>

출처 : http://docs.fluentd.org/articles/buffer-plugin-overview


Buffer 플러그인과, 에러 처리에 대한 자세한 내용은 http://docs.fluentd.org/articles/buffer-plugin-overview 를 참고하기 바란다.


데이타 구조

다음으로 Fluentd가 내부적으로 어떻게 로그 데이타를 핸들링 하는지 데이타 구조를 살펴보면 다음과 같다.



출처 :http://pt.slideshare.net/frsyuki/fluentd-set-up-once-collect-more


데이타는 크게 3가지 파트로 구성된다. Time, tag, record

  • Time : 로그데이타의 생성 시간

  • Record : 로그 데이타의 내용으로 JSON형태로 정의된다.

  • Tag : 이게 가장 중요한데, 데이타의 분류이다. 각 로그 레코드는 tag를 통해서 로그의 종류가 정해지는데, 이 tag에 따라서 로그에 대한 필터링,라우팅과 같은 플러그인이 적용 된다.

간단한 테스트

테스트 환경은 맥북을 기준으로 하였다.

http://docs.fluentd.org/articles/install-by-dmg 를 따라서 테스트를 하면 되는데, 먼저 fluentd를 받아서 인스톨을 한다.

인스톨이 끝나면, fluentd 프로세스인 td-agent는 /opt/td-agent/usr/sbin/에 인스톨이 된다.

그리고 디폴트 설정 파일은 /etc/td-agent/td-agent.conf에 저장된다.

td-agent.conf의 내용을 보면 다음과 같다.



<ROOT>

 <match td.*.*>

   type tdlog

   apikey xxxxxx

   auto_create_table

   buffer_type file

   buffer_path /var/log/td-agent/buffer/td

   <secondary>

     type file

     path /var/log/td-agent/failed_records

     buffer_path /var/log/td-agent/failed_records.*

   </secondary>

 </match>

 <match debug.**>

   type stdout

 </match>

 <source>

   type forward

 </source>

 <source>

   type http

   port 8888

 </source>

 <source>

   type debug_agent

   bind 127.0.0.1

   port 24230

 </source>

</ROOT>



<source> 부분을 보면 type이 http, port가 8888인 정의가 있다. 이 정의는 http://localhost:8888 로 부터 로그를 수집하겠다는 정의이다.

다음 <match>부분을 보면 <match debug.**> 라는 정의로 태그가 debug.**  로 정의된 로그에 대해서 type stdout으로, stdout (화면)으로 바로 출력하겠다는 정의이다.

http://localhost:8888/{debug.**} 로 들어오는 요청에 대해서 stdout으로 로그를 출력하겠다는 설정이다.


설정 파일을 확인했으면, 이제 기동을 해보자

/opt/td-agent/usr/sbin 디렉토리에서 -c 옵션으로 설정 파일을 지정하고 td-agent를 다음과 같이 실행해보자

% ./td-agent -c /etc/td-agent/td-agent.conf


에이전트가 실행되었으면 curl 명령을 이용하여 http://localhost:8888/debug.test 로 {"json":"message"} 로그 문자열을 전송해보자

% curl -X POST -d 'json={"json":"message"}' http://localhost:8888/debug.test


다음은 실행 결과 이다.





다음과 같이 td-agent가 기동된 후에,  맨 아랫줄에 debug.test 라는 태그 이름으로 {“json”:”message”}라는 로그가 수집되어 출력된것을 볼 수 있다.

데몬으로 실행하기

앞에서는 CLI상에서 foreground로 실행을 하였는데, 맥에서 서비스로 백그라운드 작업으로 실행을 할 수 있다. 실행 방법은

%sudo launchctl load /Library/LaunchDaemons/td-agent.plist

를 실행하면 백그라운드로 실행된다. 백그라운드로 실행을 위한 스크립트인 td-agent.plist는 fluentd설치시  /Library/LaunchDaemons/td-agent.plist에 자동 생성된다.

백그라운드 작업이기 때문에, stdout이 없고 stdout으로 출력되는 로그는 /var/log/td-agent/td-agent.log로 확인할 수 있다.





실행중인 프로세스를 종료 하는 방법은

%sudo launchctl unload /Library/LaunchDaemons/td-agent.plist

를 사용하면 된다.


다음 글에는 실제로 fluentd 를 설정해서 Google의 Bigquery또는 큐로 로그를 전달하는 설정 방법에 대해서 알아보겠다.


람다 아키텍처(Lambda Architecture)

우연잖게 찾게 된 람다 아키텍처(Lambda Architecture)
제대로 기억해 두기 위하여 글로 남겨둔다.

람다(Lamdba)는 도대체 무엇인가?
람다 대수라는 정의가 있고 람다(λ)에 대한 다양한 쓰임을 설명한 것도 있다. 간단하게는 축약 함수라는 단어로 설명하기도 한다.
또 Java 8 에 적용될 람다의 경우 병렬 처리를 위한 작업 함수를 정의하는 간결한 방식이라고 설명할 수 있다.
람다 아키텍처도 병렬 처리에 대한 간력한 방식을 설명하고 있는 것이라고 볼 수 있다.

우선 아키텍처의 High-Level 개념도를 살펴보자.



개념도를 살펴보면 batch layer 에서 배치 작업을 통해 데이터를 생산해 내고 serving layer 에서 데이터를 조회할 수 있다. 물론 배치 작업이 이루어지지 않은 부분에 대하여 speed layer 에서 조회할 수 있다. 조회된 2개의 데이터 덩어리를 merge 하면 원하는 데이터를 얻는다는 것이 람다 아키텍처의 내용이다.

빅데이터가 나오면서 람다 아키텍처에 대한 사항이 많이 나왔지만 예전부터 쓰던 방식이다. 예를 들어 일별 통계 데이터와 실시간 통계 데이터에 대한 것을 들수 있다. 전일까지의 데이터는 새벽 배치작업을 통해 작업해 두고 당일 데이터는 실시간으로 조회할 수 있도록 하여 SQL 문에서 union 을 통해 데이터를 조회하도록 하는 방식이다. 이렇게 생각하면 람다 아키텍처는 쉽게 이해할 수 있을 것이다.

조금 더 상세한 내용을 살펴보도록 하자.

  • Batch Layer

더 이상 변하지 않는 데이터들에 대하여 배치 작업을 통해 결과 값을 저장해 둔다.
가능하다면 변하지 않는 데이터들이어야하나 실무에서 수작업이라는 엄청나게 무서운 일들이 발생하는 경우가 있어 가능하다면 수동으로 재작업이 가능하도록 만들어 두는 것이 좋다.


  • Serving Layer


배치 작업을 통해 저장된 Batch View 에 대하여 조회할 수 있도록 제공한다. 데이터에 대한 업데이트는 반드시 Batch Layer 의 배치작업을 통해 이루어져야 한다. Batch Layer 에서도 언급하였지만 실무에서는 뜻하지 않은 수작업이라는 것이 존재한다. 절대로 Batch View 에 대하여 직접적인 작업을 하지 말아야한다. 건드리는 순간 쓰레기가 된다. 굳이 하자면 원본 데이터를 수정하고 Batch Layer 에서 배치작업을 수동으로 하여야한다. 가능하다면 이런 수작업을 하지 말아야 한다.

  • Speed Layer
일반적인 데이터 조회 기능을 제공한다. 다만 배치 작업을 통해 생성된 Batch View 와 중복되는 데이터가 없도록 처리하는 것이 중요하다. 그리고 실시간 데이터 조회를 처리하므로 되도록 퍼포먼스에 대하여 신경을 써야한다. 퍼포먼스를 위하여 많은 부분을 Batch Layer 으로 넘겨 처리를 하거나 조회 쿼리에 대하여 훌륭하게 튜닝해두면 된다.

아래는 빅데이터 처리에 적용한 람다 아키텍처 예이다. 실제 빅데이터에서 어떻게 사용하는지 조금더 구체적인 사항들이 나와있다.


어설픈 설명이지만 일단 이렇게 정리를 해둔다.
차후 직접 만들어 볼 기회가 있다면 이를 바탕으로 만들어보고 구축 경험을 더 정리해 보자.



reference
* http://lambda-architecture.net/
* http://www.databasetube.com/database/big-data-lambda-architecture/
* http://www.datasciencecentral.com/profiles/blogs/lambda-architecture-for-big-data-systems


'빅데이터' 카테고리의 다른 글

Hello, TensorFlow!  (0) 2016.07.08
분산 로그 수집기 Fluentd 소개  (0) 2016.06.14
Lambda Architecture  (0) 2016.05.18
Can Spark Streaming survive Chaos Monkey?  (0) 2016.05.11
Comparison of Apache Stream Processing Frameworks: Part 2  (0) 2016.05.11

Lambda Architecture

A repository dedicated to the Lambda Architecture (LA). We collect and publish examples and good practices around the LA.

Updates

What is the Lambda Architecture?

Nathan Marz came up with the term Lambda Architecture (LA) for a generic, scalable and fault-tolerant data processing architecture, based on his experience working on distributed data processing systems at Backtype and Twitter.

The LA aims to satisfy the needs for a robust system that is fault-tolerant, both against hardware failures and human mistakes, being able to serve a wide range of workloads and use cases, and in which low-latency reads and updates are required. The resulting system should be linearly scalable, and it should scale out rather than up.

Here’s how it looks like, from a high-level perspective:

LA overview

  1. All data entering the system is dispatched to both the batch layer and the speed layer for processing.
  2. The batch layer has two functions: (i) managing the master dataset (an immutable, append-only set of raw data), and (ii) to pre-compute the batch views.
  3. The serving layer indexes the batch views so that they can be queried in low-latency, ad-hoc way.
  4. The speed layer compensates for the high latency of updates to the serving layer and deals with recent data only.
  5. Any incoming query can be answered by merging results from batch views and real-time views.

Resources

Who is behind this?

See the about us section for details.


Can Spark Streaming survive Chaos Monkey?


Netflix is a data-driven organization that places emphasis on the quality of data collected and processed. In our previous blog post, we highlighted our use cases for real-time stream processing in the context of online recommendations and data monitoring. With Spark Streaming as our choice of stream processor, we set out to evaluate and share the resiliency story for Spark Streaming in the AWS cloud environment.  A Chaos Monkey based approach, which randomly terminated instances or processes, was employed to simulate failures.

Spark on Amazon Web Services (AWS) is relevant to us as Netflix delivers its service primarily out of the AWS cloud. Stream processing systems need to be operational 24/7 and be tolerant to failures. Instances on AWS are ephemeral, which makes it imperative to ensure Spark’s resiliency.

Spark Components


Apache Spark is a fast and general-purpose cluster computing system. Spark can be deployed on top of Mesos, Yarn or Spark's own cluster manager, which allocates worker node resources to an application. Spark Driver connects to the cluster manager and is responsible for converting an application to a directed graph (DAG) of individual tasks that get executed within an executor process on the worker nodes.

Creating Chaos


Netflix streaming devices periodically send events that capture member activities, which plays a significant role in personalization. These events flow to our server side applications and are routed to Kafka. Our Spark streaming application consumes these events from Kafka and computes metrics. The deployment architecture is shown below:


Fig 2: Deployment Architecture

Our goal is to validate that there is no interruption in computing metrics when the different Spark components fail. To simulate such failures, we employed a whack-a-mole approach and killed the various Spark components.

We ran our spark streaming application on Spark Standalone. The resiliency exercise was run with Spark v1.2.0, Kafka v0.8.0 and Zookeeper v3.4.5.

Spark Streaming Resiliency


Driver Resiliency: Spark Standalone supports two modes for launching the driver application. In client mode, the driver is launched in the same process as the one where the client submits the application.  When this process dies, the application is aborted.  In cluster mode, the driver is launched from one of the worker process in the cluster.  Additionally, standalone cluster mode supports a supervise option that allows restarting the application automatically on non-zero exit codes.

Master Resiliency:  Spark scheduler uses the Master to make scheduling decisions.  To avoid single point of failure, it is best to setup a multi master standalone cluster. Spark uses Zookeeper for leader election. One of the master nodes becomes the ACTIVE node and all Worker nodes get registered to it. When this master node dies, one of the STANDBY master nodes becomes the ACTIVE node and all the Worker nodes get automatically registered to it. If there are any applications running on the cluster during the master failover, they still continue to run without a glitch.

Worker Process Resiliency: Worker process launches and monitors the Executor and Driver as child processes. When the Worker process is killed, all its child processes are also killed.  The Worker process gets automatically relaunched, which in turn restarts the Driver and/or the Executor process.

Executor Resiliency: When the Executor process is killed, they are automatically relaunched by the Worker process and any tasks that were in flight are rescheduled.

Receiver Resiliency: Receiver runs as a long running task within an Executor and follows the same resiliency characteristics of an executor.

The effect on the computed metrics due to the termination of various Spark components is shown below.


Fig 3: Behavior on Receive/Driver/Master failure

Driver Failure: The main impact is back-pressure built up due to a node failure, which results in a sudden drop in message processing rate, followed by a catch up spike, before the graph settles into steady state.

Receiver Failure: The dip in computed metrics was due to the fact that default Kafka receiver is an unreliable receiver.  Spark streaming 1.2 introduced an experimental feature called write ahead logs that would make the kafka receiver reliable.  When this is enabled, applications would incur a hit to Kafka receiver throughput.  However, this could be addressed by increasing the number of receivers.

Summary


The following table summarizes the resiliency characteristics of different Spark components:

Component
Type
Behaviour on Component Failure
Resilient
Driver
Process
Client Mode: The entire application is killed
Cluster Mode with supervise: The Driver is restarted on a different Worker node
Master
Process
Single Master: The entire application is killed
Multi Master: A STANDBY master is elected ACTIVE
Worker Process
Process
All child processes (executor or driver) are also terminated and a new worker process is launched
Executor
Process
A new executor is launched by the Worker process
Receiver
Thread(s)
Same as Executor as they are long running tasks inside the Executor
Worker Node
Node
Worker, Executor and Driver processes run on Worker nodes and the behavior is same as killing them individually

We uncovered a few issues (SPARK-5967, SPARK-3495, SPARK-3496, etc.) during this exercise, but Spark Streaming team was helpful in fixing them in a timely fashion. We are also in the midst of performance testing Spark and will follow up with a blog post.

Overall, we are happy with the resiliency of spark standalone for our use cases and excited to take it to the next level where we are working towards building a unified Lambda Architecture that involves a combination of batch and real-time streaming processing.  We are in early stages of this effort, so if you interested in contributing in this area, please reach out to us


Comparison of Apache Stream Processing Frameworks: Part 2

Posted by Petr Zapletal on Wed, Mar 2, 2016

In the previous post we went through the necessary theory and also introduced popular streaming framework from Apache landscape - Storm, Trident, Spark Streaming, Samza and Flink. Today, we’re going to dig a little bit deeper and go through topics like fault tolerance, state management or performance. In addition, we’re going to discuss guidelines when building distributed streaming application and also I’ll give you recommendations for particular frameworks.

Fault Tolerance

Fault tolerance in streaming systems is inherently harder that in batch. When facing an error in batch processing system, we can just restart failed part of the computation and we’re good. But this is much harder in streaming scenarios, because data are still incoming and also a lot of jobs can run 24/7. Another challenge we have to face is state consistency, because in the end of the day we have start replaying events and of course not all state operations are idempotent. As you’ll see, fault tolerance can be pretty hard so let’s have a look at how our systems deal with that.

Storm uses a mechanism of upstream backup and record acknowledgements to guarantee that messages are re-processed after a failure. Acknowledgements work as follows: an operator sends back to the previous operator an acknowledgement for every record that has been processed. The source of the topology keeps a backup of all the records it generates. Once received acknowledgements from all generated records until the sinks, the backup can be discarded safely. At failure, if not all acknowledgements have been received, then the records are replayed by the source. This guarantees no data loss, but does result in duplicate records passing through the system. That’s at-least once delivery (for more information about delivery guarantees you may check out the previous part).

Storm implements this with a clever mechanism that only requires few bytes storage per source record to track the acknowledgements
Pure record acknowledgement architectures, regardless of their performance, fail in offering exactly once guarantees, thus burdening the application developer with deduplication. Also Storm’s mechanism is low throughput and has problems with flow control, as the acknowledgment mechanism often falsely classifies failures under back-pressure.

Screen_Shot_2016-02-24_at_16.44.08.png
Spark Streaming and its micro-batching semantics follows a different approach. The idea is terribly simple. Spark processes micro-batches on various worker nodes. Each micro-batch may either succeed or fail. At a failure, the micro-batch can be simply recomputed, as they are persistent and immutable. So exactly once delivery made easy.

Screen_Shot_2015-12-29_at_23.24.16.png
Samza’s approach is completely different. It takes an advantage of durable, offset based messaging system. It’s usually Kafka of course. Samza monitors offsets of its tasks and moves it when message is processed. Offset can be check-pointed in a persistent storage and restored in case of failure. The problem is when it restores offset from the last checkpoint it doesn’t know which upcoming messages were processed and it might do it twice. That’s at least once delivery for us.

Screen_Shot_2015-12-29_at_23.26.02.png

Flink approach is based on distributed snapshots which keeps the state of streaming job. Link sends checkpoint barriers, basically some kind of markers, through the stream and when barrier reaches the operator, operator checkpoints corresponding part of the stream. So if compare it to Storm, it’s far more efficient as it doesn’t have to acknowledge every record but does it in small batches. But don’t be confused, it’s still native streaming, conceptually it is very different from Spark. And also Flink provides exactly once delivery.

Screen_Shot_2015-12-29_at_23.27.36.png

Managing State

Most of the non-trivial streaming applications have some kind of state. On the contrary of stateless operations where we have just an input, processing and an output, we have an input and a state, then processing and then an output with a modified state. We have to manage our state, persist it and in case of failure we expect our state to be recreated. The recreation of the state may be problem a little bit, as we do not have always exactly once guarantee, some of the record may be replayed multiple times. And that is not what we want usually.

As we know Storm provides at-least once delivery guarantees. So how we can do the exactly once semantics provided by Trident ? Conceptually it’s quite simple, you just start committing the records, but obviously it’s not very efficient, so you start doing it in small batches, do some optimizations and here we are. Trident defines a couple of abstractions which determines when you can achieve exactly once guarantee, and as you can see in the picture below, there are some limitations and it needs some time to dig into it.

Screen_Shot_2015-12-29_at_23.29.35.png
When thinking about stateful operations in stream processing, we usually have a long running operator with a state and a stream of records passing through it. As we know, Spark Streaming is micro-batching system, and it addresses it differently. Basically, Spark Streaming manages a state as another micro-batched stream. So during the processing of each micro-batch spark takes a current state and a function representing the operation and the result is a processed micro-batch and an updated state.

Screen_Shot_2015-12-29_at_23.30.54.pngSamza’s solution for everything is just push it out to Kafka and problem solved, and it also works in the context of state management. Samza has real stateful operators so any task can hold a state and the state’s change log is pushed to Kafka. If needed state can be easily recreated from Kafka’s topic. To make it a little bit faster Samza allows us to plug-in key-value stores as a local storage so it doesn’t have to go to the Kafka all the time. The concept is illustrated on the picture below. Unfortunately, Samza provides at-least one semantics only, and it hurts a lot, but the implementation of exactly once delivery is planned.
Screen_Shot_2015-12-29_at_23.32.21.pngFlink provides stageful operators conceptually similar to Samza. When working with Flink, we can use two different types of states. First one is local or task state, it is a current state of particular operator instance only and these guys don’t interact between each other. Then we have partitioned, or if you want key state, which maintains state of whole partitions. And of course, Flink provides exactly-once semantics. In the picture below you can see outline of Flink’s long running operator with 3 local states.

Screen_Shot_2015-12-29_at_23.33.42.png

Counting Words with State

So let’s have a look how to count words, focusing on state management. For naive wordcount implementation just check out the previous post.

Let’s start with Trident.

1 public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = ...

TridentTopology topology = new TridentTopology();

TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

...

}

We can create a state by calling persistent aggregate at line 9. Important argument is the Count, which is built in component for storing numbers. If we would want to process the data from it, we would have to create a stream for that. As you can see int the snippet, it is not very convenient.

Spark’s declarative approach is a little bit better.

1123456789
10
11
12
13
14
15
16
17
// Initial RDD input to updateStateByKey
val initialRDD = ssc.sparkContext.parallelize(List.empty[(String, Int)])

val lines = ...
val words = lines.flatMap(_.split(" "))
val wordDstream = words.map(x => (x, 1))

val trackStateFunc = (batchTime: Time, word: String, one: Option[Int],
state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
Some(output)
}

val stateDstream = wordDstream.trackStateByKey(
StateSpec.function(trackStateFunc).initialState(initialRDD))

Firstly we have to create RDD used as initial state (line 2), then we do some transformations (lines 5 and 6). Then, as you can see at lines 8 - 14, we have to define transition function, which takes a word, its count and a current state. The function does the computation, updates a state and returns result. And finally we can put all the bits together at lines 16 and 17 and get a state stream which contains word counts.


Let’s have a look at Samza.

1123456789
10
11
12
13
14
15
16
17
18
19
20
21
class WordCountTask extends StreamTask with InitableTask {

private var store: CountStore = _

def init(config: Config, context: TaskContext) {
this.store = context.getStore("wordcount-store")
.asInstanceOf[KeyValueStore[String, Integer]]
}

override def process(envelope: IncomingMessageEnvelope,
collector: MessageCollector, coordinator: TaskCoordinator) {

val words = envelope.getMessage.asInstanceOf[String].split(" ")

words.foreach { key =>
val count: Integer = Option(store.get(key)).getOrElse(0)
store.put(key, count + 1)
collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"),
(key, count)))
}
}

Firstly we need to define our state at line 3, in this case its key-value store and also definite how it should be initialized (lines 5 - 8). And then, we can use it during the computation. As you can see above, it is pretty straightforward.

And finally, let’s have a look at Flink with its neat API.

1123456789
10
11
12
13
val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements(...)
val words = text.flatMap ( _.split(" ") )

words.keyBy(x => x).mapWithState {
(word, count: Option[Int]) =>
{
val newCount = count.getOrElse(0) + 1
val output = (word, newCount)
(output, Some(newCount))
}
}

We just call the function mapwithstate at line 6, which takes as an argument, function with two parameters. First one is a word to process and second is a state and function then returns processed output and new state.

Performance

Reasonable performance comparison is definitely a topic for whole article. So just an insight for now.

Various systems approach problems fairy differently and therefore it’s very hard to design not biased tests. When we talk about performance in streaming we talk about latency and throughput. It depends on many variables, but in general and for simple task. If you’re at 500k records per second per node it’s ok, if you can reach 1 million it’s nice, over a million it is great. Speaking about nodes, I mean pretty standard node like 24 cores and reasonable amount of memory, like 24 or 48 GBs.

For latency, in case of micro-batch, we are usually thinking in seconds. In case of native streaming, we can expect lower hundreds of millis for most of the systems, but tuned storm can in operate in tens of millis easily.

Also it is important to keep in mind the cost of delivery guarantees, fault tolerance and state management. For example turning on fault-tolerance may cost you like 10 to 15%. But in case storm it can be like 70% of throughput. As always, there is no free lunch. In this and previous post I’ve shown you stateful and stateless word count examples and of course, stateless would be faster. In case you’re wondering how much, so in the context of Apache Flink the difference was like 25% but in case of Spark it was around 50%. I’m pretty sure it could be tuned but it could give us an idea it’s something we should have in mind. Speaking about tuning, the systems have very rich tuning options which may lead to significant performance gains and you should always find some time to have a look at it.

Also it’s important to have in mind, all operations are distributed and sending data through the network is pretty expensive. So try to take an advantage of data locality and also try to tune up your application’s serialization.

Project Maturity

When picking up the framework for your application, you should always consider its maturity. So Let’s have a quick look how does it look like in our cases. Storm was the first mainstream streaming system and became de-facto industrial standard for a long time and is used in many companies like Twitter, Yahoo, Spotify and many more. Spark is the most trending Scala repository these days and one of the engines behind Scala’s popularity. Spark’s adoption grows every day, it is used by companies like Netflix, Cisco, DataStax, Intel, IBM and so on. Samza is used by LinkedIn and is also by tens of other companies, as an example we can have Netflix or UberFlink is still an emerging project, but we can see its first production deployments and I’m sure more will follow very soon.

You may also find interesting the number of project contributors. Storm & Trident have around 180 of them, whole Spark has more than 720. Samza has, according to github, more around 40 contributors and Flink has already more than 130.

Summary

Before we jump at framework recommendations, in may be helpful to check out the summary table below.

Screen_Shot_2015-12-28_at_16.44.49.png

Framework Recommendations

The answer for the typical question, which one should I use, is as always, it depends. So in general, always try to evaluate requirements of your application carefully and be sure you fully understand the consequences of choosing the particular framework. I’d also recommend you to pick a framework with high level API, as it’s more elegant and more importantly much more productive. Also keep in mind, the most of the streaming applications are stateful so the state management of a particular framework should be up on your evaluation list. I’d also recommend to go for a framework with exactly once delivery semantics as it makes things easier, but of course, this really depends on requirements. There are definitely use cases when at least once or at most once delivery guarantees are all you need. But also keep in mind, system supporting exactly once does not have to implicitly support weaker guarantees. Lastly, make sure your system is able to recover quickly, you can use Chaos Monkey or similar tool for testing, because as we discussed, fast recovery is crucial in stream processing.

Storm is still a great fit for small and fast tasks. If you care mainly about the latency, storm might be a good way to go. But also keep in mind the fault tolerance or trident’s state management hurts the performance a lot. Interesting option might be a potential update to Twitter’s Heron, which is designed as Storm’s replacement and should be better in every single task, but it also keeps the api. The problem is there is no guarantee Twitter is going to open-source it so who knows if it’s a good idea.

For Spark Streaming, you should definitely at least try it if Spark is already part of your infrastructure, because, in this case, Streaming comes basically for free and you can also take an advantage of various Spark’s libraries. Also if you really want to use Lambda architecture, it is a pretty decent choice. But you should always keep in mind micro-batching limitations and be sure latency is not critical for you.

When thinking about adopting Samza, Kafka should be a cornerstone of your architecture. I know it’s pluggable but nearly everyone is using Kafka so I would stick with that. Also as mentioned before, Samza is shipped with powerful local storage and it’s great for managing large states, it can handle states in tens of gigabytes easily, which is pretty nice. But keep in mind Samza’s at least once delivery limitation.

Flink is conceptually great streaming system which fits very most streaming use cases And it often provides progressive functionality, like advanced windowing or time handling, which may not be implemented by its competitors. So you should always consider Flink when you need a functionality which might be hard to implement in Spark, or generally, in any micro-batching system. And apart of that, Flink also has an API for a common batch processing which may be pretty useful. But you need to have enough courage to adopt emerging project and also don’t forget to check out its roadmap.

Dataflow and Open Source

And the last thing I want to mention is Dataflow and its open source initiative. Dataflow is a part of Google Cloud platform and Cloud Platform has all sort of things in it as huge data storage, BigQuery, Cloud PubSub, some tools for data analysis, and so on and also aforementioned Cloud Dataflow.

Dataflow is Google’s managed service for batch and stream data processing with unified API. It is built upon well known Google technologies such as  MapReduce for batch processing, FlumeJava for programming model definition and MillWheel for stream processing. And all of them are really good.

You may be asking why I’m wriming about that as I said we would be focused on about apache stream processing platforms and this is clearly Google’s proprietary solution, but Google decided to open source Dataflow SDK recently and guys behind both Spark and Flink have implemented its runners. So now we have an ability to run jobs defined by Dataflow API by Google Cloud Platform, by Flink or by Spark and it’s also pretty possible more engines will follow very soon.

Dataflow provides API in Java and in Python implemented by Google itself and also I’ve found two Scala DSLs implemented by community. Apart from that, Google and a number of partners submitted this as a new Apache proposal named Apache Beam. So it seems like a pretty interesting option, I would definitely at least think about it. But it is very important to emphasize, all of this is very recent and the implementation of particular features might be missing

dataflow_ASF.png

Conclusion

In this short blog post series, we went through popular streaming frameworks from Apache landscape and discussed their similarities, differences, the trade-offs they have made and also their fitting use cases. I hope it was interesting for you and I believe it will be helpful when designing your own streaming solution. There’s definitely a couple of interesting frameworks which were not discussed here, but I plan to address them in separate posts. Also if you have any questions, don’t hesitate to contact me as I’m always happy to discus the topic. 

 


Comparison of Apache Stream Processing Frameworks: Part 1

Posted by Petr Zapletal on Tue, Feb 2, 2016

 

A couple of months ago we were discussing the reasons behind increasing demand for distributed stream processing. I also stated there was a number of available frameworks to address it. Now it’s a time have a look at them and discuss their similarities and differences and their, from my opinion, recommended use cases.

As you probably suspect, distributed stream processing is continuous processing, aggregation and analysis of unbounded data. It’s a general computation model as MapReduce, but we expect latencies in mills or in seconds. These systems are usually modeled as Directed Acyclic Graphs, DAGs.

DAG is a graphical representation of chain of tasks and we use it for description of the topology of streaming job. I’ll help myself with terminology from Akka streams a little bit. So as you can see in the picture below, data flows through chain of processors from sources to sinks which represents the streaming task. And speaking about Akka streams, I think it’s very important to emphasize the word distributed. Because even local solutions can create and run DAG but we’re going to focus only to solutions running on multiple machines

Screen_Shot_2015-12-28_at_16.36.48.png

Points of Interest

When choosing between different systems there are a couple of points we should take care of. So let’s start with the Runtime and Programming model. The programming model provided by a platform determines a lot of its features and it should be sufficient to handle all possible use-cases for the application. This is a really crucial topic and I’ll come back very soon. 

The Functional Primitives exposed by a processing platform should be able to provide rich functionalities at individual message level like map or filter, which are pretty easy to implement even if you want to scale a lot. But it should also provide across messages functionality, like aggregations, and across stream operations like joins for example, which are much harder to scale.

State Management - Most of the applications have stateful processing logic that requires maintaining a state. The platform should allow us to maintain, access and update the state information.

For Message Delivery Guarantees, we have a couple of classes like at most once, at least once and exactly once. At most once delivery means that for each message handed to the mechanism, that message is delivered zero or one times, so messages may be lost. At least once delivery means that for each message handed to the mechanism potentially multiple attempts are made at delivering it, such that at least one succeeds. Messages may be duplicated but not lost. And finally exactly once delivery means that for each message handed to the mechanism exactly one delivery is made to the recipient, the message can neither be lost nor duplicated. So another important thing to consider.

Failures can and will happen at various levels - for example network partitions, disk failures or nodes going down and so on. Platform should be able to recover from all such failures and resume from their last successful state without harming the result

And then, we have more performance related requirements like Latency, Throughput and Scalability which are extremely important in streaming applications.

We should also take care of Maturity and Adoption Leve this information could give us a clue about potential support, available libraries or even stackoverflow answers. And it may help us a lot when choosing correct platform.

And also, last but not least, the Ease of Development and Ease of Operability. It is great when we have super fancy system which covers all our use cases but if we cannot write a program for it or we cannot deploy it we are done anyway

Runtime and Programming Model

Runtime and Programing model, is probably the most important trait of the system because it defines its expressiveness, possible operations and future limitations. Therefore it defines system capabilities and its use cases.

There are two distinctive approaches how to implement streaming system. First one is called the native streaming. It means all incoming records, or events if you want, are processed as they arrive, one by one.


Screen_Shot_2015-12-28_at_16.39.08.png

Second approach is called micro-batching. Short batches are created from incoming records and go through the system. These batches are created according to pre-defined time constant, typically every couple of seconds.

Screen_Shot_2015-12-28_at_16.40.36.png


Both approaches have inherent advantages and disadvantages. Let’s start with native streaming. The great advantage of native streaming is its expressiveness. Because it takes stream as it is, it is not limited by any unnatural abstraction over it. Also as the records are processed immediately upon arrival, achievable latencies of these systems are always better than its micro-batching companions. Apart from that, stateful operations are much easier to implement, as you’ll see later in this post. Native streaming systems have usually lower throughput and fault-tolerance is much more expensive as it has to take care (~persist & replay) of every single record. Also load-balancing is kind of issue. For example, let’s say we have data partitioned by a key and we want to process it. If the processing of some key of the partition is more resource intensive for any reason, this partition quickly becomes the job’s bottleneck

Secondly, the micro-batching. Splitting stream into micro-batches inevitably reduces system expressiveness. Some operations, especially state management or joins and splits, are much harder to implement as systems has to manipulate with whole batch. Moreover, the batch interval connects two things which should never be connected - an infrastructure property and a business logic. On the contrary, fault tolerance and load balancing are much simpler as systems just sends every batch to a worker node and if something goes wrong it just use the different. Lastly, it is good to remark, we can build micro-batching system atop native streaming quite easily

Programming models can be classified as Compositional and Declarative. Compositional approach provides basic building blocks like sources or operators and they must be tied together in order to create expected topology. New components can be usually defined by implementing some kind of interfaces. On the contrary, operators in declarative API are defined as higher order functions. It allows us to write functional code with abstract types and all its fancy stuff and the system creates and optimizes topology itself. Also declarative APIs usually provides more advanced operations like windowing or state management out of the box. We are going to have look at some code samples very soon.

Apache Streaming Landscape

There is a number of diverse frameworks available and it is literally impossible to cover all of them. So I’m forced to limit it somehow and I want to go for popular streaming solutions from Apache landscape which also provide Scala API. Therefore We’re going to focus on Apache Storm and its sibling Trident and on streaming module of very popular Spark. We’re also going talk about streaming system behind linkedIn named Samza and finally, we’re going to discuss promising Apache project Flink. I believe this is a great selection, because even if all of them are streaming systems, they approach various challenges very differently. Unfortunately I won’t talk about proprietary systems like Google MillWheel or Amazon Kinesis and also we’re going to miss interesting but still limitedly adopted systems like Intel GearPump or Apache Apex. That may be for a next time.

Screen_Shot_2015-12-28_at_16.43.04.png
Apache Storm was originally created by Nathan Marz and his team at BackType in 2010. Later it was acquired and open-sourced by Twitter and it became apache top-level project in 2014. Without any doubts, Storm was a pioneer in large scale stream processing and became de-facto industrial standard. Storm is a native streaming system and provides low-level API. Also, storm uses Thrift for topology definition and it also implements Storm multi-language protocol this basically allows to implement our solutions in large number of languages, which is pretty unique and Scala is of course of them.

Trident is a higher level micro-batching system build atop Storm. It simplifies topology building process and also adds higher level operations like windowing, aggregations or state management which are not natively supported in Storm. In addition to Storm's at most once, Trident provides exactly once delivery, on the contrary of Storm’s at most once guarantee. Trident has Java, Clojure and Scala APIs.

As we all know, Spark is very popular batch processing framework these days with a couple of built-in libraries like SparkSQL or MLlib and of course Spark Streaming. Spark’s runtime is build for batch processing and therefore spark streaming, as it was added a little bit later, does micro-batching. The stream of input data is ingested by receivers which create micro-batches and these micro-batches are processed in similar way as other Spark’s jobs. Spark streaming provides high-level declarative API in Scala, Java and Python.

Samza was originally developed in LinkedIn as proprietary streaming solution and with Kafka, which is another great linkedIn contribution to our community, it became key part of their infrastructure. As you’re going to see a little bit later, Samza builds heavily on Kafka’s log based philosophy and both together integrates very well. Samza provides compositional api and of course Scala is supported.

And the last but least, Flink. Flink is pretty old project, it has it’s origins in 2008, but right now is getting quite a lot of attention. Flink is native streaming system and provides a high level API. Flink also provides API for batch processing like Spark, but there is a fundamental distinction between those two. Flink handles batch as a special case of streaming. Everything is a stream and this is definitely better abstraction, because this is how the world really looks like.

That was a quick introduction of the systems and, as you can see on the table below, they do have pretty different traits.

Screen_Shot_2015-12-28_at_16.44.49.png

Counting Words

Wordcount is something like hello world of stream processing, it nicely shows main differences between our frameworks. Let’s start with Storm and please note, the example was simplified significantly. 

123456789101112131415 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new Split(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); ... Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) {   String word = tuple.getString(0);   Integer count = counts.containsKey(word) ? counts.get(word) + 1 : 1;   counts.put(word, count);   collector.emit(new Values(word, count)); }

First, let’s have a look at its topology definition. As you can see at line 2, we have to define a spout, or if you want, a source. And then, there is a bold, a processing component, which splits the text into the words. Then I have defined another bolt for actual word count calculation at line 4. Also have a look at the magic numbers 5, 8 and 12. These are the parallelism hints and they define how many independent threads around the cluster will be used for execution of every component. As you can see, all is very manual and low-level. Now, let’s have a look (lines 8 - 15) how is the actual WordCount bolt implemented. As long as Storm does not have in-build support for managed state so I have defined a local state, which is far from being ideal but good as a sample. Apart of that it is not very interesting, so let’s move on and have a look at Trident.

As I mentioned before, Trident is Storm’s micro-batching extension and Trident, apart of many other goodies, provides state management which is pretty useful when implementing wordcount.

1public static StormTopology buildTopology(LocalDRPC drpc) {
FixedBatchSpout spout = ...

TridentTopology topology = new TridentTopology();
TridentState wordCounts = topology.newStream("spout1", spout)
.each(new Fields("sentence"),new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count"));

...

}

As you can see, I could use higher level operations like each (line 7) and groupBy (line 8), so it’s a little bit better. And also I was able to use Trident managed state for storing counts at line 9. 


Now it is time for a pretty declarative API provided by Apache Spark. Also keep in mind, on the contrary to the previous examples, which were significantly simplified, this is nearly all code you need, to run this simple streaming wordcount.

1val conf = new SparkConf().setAppName("wordcount")
val ssc = new StreamingContext(conf, Seconds(1))

val text = ...

val counts = text.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)

counts.print()

ssc.start()
ssc.awaitTermination()

Every spark streaming job requires StreamingContext which is basically the entry point to the streaming functionality. StreamingContext takes a configuration which is, as you can see at line 1, in our case very limited, but more importantly, it defines its batch interval (line 2), which is set to 1 second. Now you can see whole word count computation (lines 6 - 8), quite a difference, isn’t it ? That’s the reason why Spark is sometimes called Distributed Scala. As you can see, it’s quite standard functional code and spark takes care of topology definition and its distributed execution. And now, at line 12, the last part of every spark streaming job - starting the computation, just keep in mind, once started job cannot be modified.

Now let’s have a look at Apache Samza, another representative of compositional API.

1class WordCountTask extends StreamTask {

override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector,
coordinator: TaskCoordinator) {

val text = envelope.getMessage.asInstanceOf[String]

val counts = text.split(" ").foldLeft(Map.empty[String, Int]) {
(count, word) => count + (word -> (count.getOrElse(word, 0) + 1))
}

collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), counts))

}

The Topology is defined in samza’s properties file, but for the sake of clarity, you won’t find it here. For us it’s important the task has defined input and output channels and the communication goes through kafka topics. In our case the whole topology is the WordCountTask, which does all the work. In Samza the components are defined by implementing particular interfaces, in this case it’s a StreamTask and I’ve just overridden method process at line 3. Its parameter list contains all what’s need for connecting with the rest of the system. The computation itself at lines 8 - 10 is just a simple Scala.

Now let’s have a look at Flink, as you can see API is pretty similar to spark streaming, but notice we are not setting any batch interval.

1 val env = ExecutionEnvironment.getExecutionEnvironment

val text = env.fromElements(...)
val counts = text.flatMap ( _.split(" ") )
.map ( (_, 1) )
.groupBy(0)
.sum(1)

counts.print()

env.execute("wordcount")

Computation itself is pretty straight forward. As you can see, there’s just a couple of functional calls and Flink takes care of its distributed computation.

Conclusion

That’s all folks for today. We went through the necessary theory and also introduced popular streaming framework from Apache landscape. Next time, we’re going to dig a little bit deeper and go through the rest of the points of interest. Hope you find it interesting and stay tuned. Also if you have any questions, don’t hesitate to contact me as I’m always happy to discus the topic.


IDINCU Engineering

오픈소스를 이용한 사내 데이터 분배 시스템 개발 (Feat. Thrift, Zookeeper)

1. 필요성

요새 그렇지 않은 서비스가 어디있겠냐마는, 저희 서비스 역시 하루에 수십~수백만 건의 로그가 군데 군데 쌓이고 있으며, 수십~수백만 건의 설문 응답 데이터 등이 중앙 RDBMS에 저장되고 있습니다. 그런데 최근 서비스 기반 아키텍쳐 (SOA)로 시스템을 점차 전환하면서 각각의 분산 서버에서는 기존의 중앙 RDBMS에 직접 접근하지 않게 되자, 제각기 발생하는 엄청나게 다양한 로그 및 데이터를 효율적으로 관리/분배할 수 있는 시스템이 필요하게 되었습니다. 대표적으로 Netflix 사에서 공개한 Suro라는 오픈소스 프로젝트가 있었지만, 데이터를 전송하는 transport layer부터 데이터를 저장하는 sink 단까지 저희 서비스에 딱 맞게 입맛대로 구성하고 싶었고 (non-java client 지원, metadata 등을 능동적으로 읽어 오기도 하는 시스템에 융합된 sink 개발, …) 새로이 개발하는 공수가 대단히 클 것 같지는 않았기에 직접 만들어 보게 되었습니다. 라고 쓰고 시켜서 만들었다고 읽습니다 /ㅁ/

2. 사용된 오픈소스

1) Thrift
가장 많은 트래픽을 감당해야 하는 시스템이니만큼 속도와 안정성 측면에서 충분한 검증을 받은 라이브러리를 transport layer로 선택하여야 했습니다. Protocol Buffer나 Avro에 대한 고려도 해 보았지만, 역시나 Facebook의 이름값에 넘어가 비교적 가장 많이들 사용하고 있는 Thrift를 사용하기로 결정하였습니다.

2) Zookeeper
데이터를 어떻게 분배할 것인가에 대한 rule이나, 현재 동작 중인 서버들의 상태 등을 관리하기 위해 사용하였습니다. 무난하게 RDBMS를 사용할 수도 있지만 매번 rule을 조회하기에는 부하가 너무 크고, cache를 사용하기에는 rule 변경 시 바로 적용되지도 않을뿐더러 서버마다 cache가 만료되는 시점이 달라 문제가 생길 수가 있습니다. 하지만 Zookeeper를 사용할 경우 rule의 변경 사항이 생길 때 곧바로 푸시를 해 줄 수 있고, 커넥션이 끊어질 경우 데이터가 휘발되게 하는 옵션이 있어 서버의 동작 여부를 알아 보기에도 매우 용이합니다.

3) Curator
원래도 아주 간단한 Zookeeper이긴 하지만, 그 Zookeeper를 보다 더 간편하게 사용할 수 있도록 해 주는 라이브러리입니다.

3. 프로젝트 구성

본 프로젝트는 thrift-lib, server, coordinator, client, 총 4개의 서브 프로젝트로 구성되어 있습니다. 각 서브 프로젝트의 역할과 함께 Thrift, Zookeeper의 활용 방법도 소개드리겠습니다.

1) thrift-lib
말 그대로 서비스 전반에서 thrift를 기반으로 통신하기 위해 데이터 및 호출 스펙을 정의한 서브 프로젝트입니다. 다른 모든 서브 프로젝트에서 본 서브 프로젝트를 import하여 사용합니다.

위와 같은 형식으로 [filename].thrift 파일을 생성한 후

thrift –gen java [filename].thrift

와 같이 명령을 내리면 gen-java 디렉터리 안에 java에서 사용할 수 있는 통신 라이브러리를 자동으로 생성해 줍니다. 해당 코드를 사용하여 아주 간편하게 통신을 할 수 있습니다. 물론 java 외에도 수많은 언어를 지원하기 때문에 나중에 다른 언어의 server나 client를 개발하기에도 아주 용이합니다.

2) server
server는 client로부터 다량의 데이터를 받아 큐에 쌓아 두고, 별개의 thread에서는 큐를 읽어 정해진 rule에 따라 데이터를 여러 sink에 분배합니다. 로컬 파일로 저장하는 sink, graylog로 전송하는 sink, email로 전송하는 sink, 자체적으로 관리하는 storage 서비스에 전송하는 sink 등 다양한 sink를 만들어 붙일 수 있겠습니다. 앞에서 언급했던 Zookeeper는 다음과 같이 활용하고 있습니다.

위와 같이 특정 node를 watch하여 변경 사항이 생길 시 listener를 통해 즉시 반영할 수 있습니다. 또한 아래와 같이 Zookeeper에 노드를 생성할 수도 있으며, CreateMode.EPHEMERAL로 해당 커넥션이 종료될 경우 노드가 자동으로 삭제되게 할 수도 있습니다. 참고로 위의 node watch 기능이 거의 latency 없이 작동했던 반면, 커넥션이 종료된 후 node가 사라지는 데까지는 수십 초 가량의 시간이 걸려, 서버 재시작 시 기존 node가 살아 있어 익셉션이 나는 것을 방지하기 위해 retry 처리를 해 주어야 했습니다.

3) coordinator
coodinator는 위에서 server들이 Zookeeper에 생성에 놓은 node들을 토대로 client에게 현재 서버들의 상태를 알려 주는 아주 간단한 역할을 맡고 있습니다. HAProxy 같은 load balancer를 사용할 수도 있겠지만 이처럼 로그와 같은 대량의 데이터가 실시간으로 몰리는 시스템에서는 load balancer 그 자체가 single point of failure가 될 수 있기 때문에, 맨 처음 접속 시나 장애 시에만 coordinator로부터 서버들의 정보를 받아온 후로는 각 서버에 바로 접속하는 구조를 사용하게 되었습니다. 다음은 앞서 1)에서 설명했던 thrift-lib을 이용하여 서버를 구동하는 코드입니다. 보시는 바와 같이 아주 간단하게 적용할 수 있으며, Nonblocking 서버 외에도 ThreadPool 서버나 Simple 서버 등을 제공합니다.

 

4) client
client는 구동시 coorinator에 한 번 접속하여 server list를 받아온 후, 한 server에 접속하여 데이터를 전송할 수 있게 해 줍니다.

client 측 코드 역시 Thrift를 사용하여 간단하게 작성할 수 있으며, logback을 통해 로그 또한 간편하게 전송할 수 있도록 새로운 appender도 만들어 주었습니다.

이렇게 appender를 만들고 난 후 logback.xml에 추가하면 @Slf4j 어노테이션을 통해 간편하게 로그를 전송할 수 있게 됩니다.

 

4. 마치며

데이터 분배 시스템 구성도
데이터 분배 시스템 구성도

결과적으로 이런 시스템이 완성되었습니다! 저희는 현재 본 시스템을 사용하여 데이터를 분배하고 있으며, 이 외에도 사내 클라우드 시스템 또한 오픈소스를 잘 사용하여 개발, 사용하고 있습니다.  오픈소스를 이용하는 중에 문제가 보이면 직접 contribute하는 분들도 계시고요. 이미 다들 그러시겠지만, 혹 아직 그렇지 않은 분들도 계시다면 편리하고 보람찬 오픈소스 생활을 누려 봅시다 : D

+ Recent posts