#onenote# Big data Architect

Lambda vs Kappa

Over the past few years, there have been many discussions about how to design a good real-time data processing architecture. A good real-time data processing architecture needs to be fault-tolerant and scalable; it needs to support batch and incremental updates, and must be extensible.

One important milestone in these discussions was Nathan Marz, creator of Apache Storm, describing what we have come to know as the Lambda architecture. The Lambda architecture has proven to be relevant to many use-cases and is indeed used by a lot of companies, for example Yahoo and Netflix. But of course, Lambda is not a silver bullet and has received some fair criticism on the coding overhead it can create.

In the summer of 2014, Jay Kreps from LinkedIn posted an article describing what he called the Kappa architecture, which addresses some of the pitfalls associated with Lambda. Kappa is not a replacement for Lambda, though, as some use-cases deployed using the Lambda architecture cannot be migrated.

It can be challenging to accurately evaluate which architecture is best for a given use-case and making a wrong design decision can have serious consequences for the implementation of a data analytics project.

Now let’s get into greater detail about the two data processing architectures.

Figure 1 Lambda architecture

The Lambda Architecture, shown in Figure 1, is composed of three layers: batch, speed, and serving.

The batch layer has two major tasks: (a) managing historical data; and (b) recomputing results such as machine learning models. Specifically, the batch layer receives arriving data, combines it with historical data and recomputes results by iterating over the entire combined data set. The batch layer operates on the full data and thus allows the system to produce the most accurate results. However, the results come at the cost of high latency due to high computation time.

The speed layer is used in order to provide results in a low-latency, near real-time fashion. The speed layer receives the arriving data and performs incremental updates to the batch layer results. Thanks to the incremental algorithms implemented at the speed layer, computation cost is significantly reduced.

Finally, the serving layer enables various queries of the results sent from the batch and speed layers.

Figure 2 Kappa architecture

The Kappa architecture is shown in Figure 2. One of the important motivations for inventing the Kappa architecture was to avoid maintaining two separate code bases for the batch and speed layers. The key idea is to handle both real-time data processing and continuous data reprocessing using a single stream processing engine. Data reprocessing is an important requirement for making visible the effects of code changes on the results. As a consequence, the Kappa architecture is composed of only two layers: stream processing and serving. The stream processing layer runs the stream processing jobs. Normally, a single stream

processing job is run to enable real-time data processing. Data reprocessing is only done when some code of the stream processing job needs to be modified. This is achieved by running another modified stream processing job and replying all previous data. Finally, similarly to the Lambda architecture, the serving layer is used to query the results.

The two architectures can be implemented by combining various open-source technologies, such as Apache Kafka, Apache HBase, Apache Hadoop (HDFS, MapReduce), Apache Spark, Apache Drill, Spark Streaming, Apache Storm, and Apache Samza.

For example, data can be ingested into the Lambda and Kappa architectures using a publish-subscribe messaging system, for example Apache Kafka. The data and model storage can be implemented using persistent storage, like HDFS. A high-latency batch system such as Hadoop MapReduce can be used in the batch layer of the Lambda architecture to train models from scratch. Low-latency systems, for instance Apache Storm, Apache Samza, and Spark Streaming can be used to implement incremental model updates in the speed layer. The same technologies can be used to implement the stream processing layer in the Kappa architecture.

Alternatively, Apache Spark can be used as a common platform to develop the batch and speed layers in the Lambda architecture. This way, much of the code can be shared between the batch and speed layers. The serving layer can be implemented using a NoSQL database, such as Apache HBase, and an SQL query engine like Apache Drill.

So when should we use one architecture or the other? As is often the case, it depends on some characteristics of the application that is to be implemented. Let’s go through a few common examples:

A very simple case to consider is when the algorithms applied to the real-time data and to the historical data are identical. Then it is clearly very beneficial to use the same code base to process historical and real-time data, and therefore to implement the use-case using the Kappa architecture.

Now, the algorithms used to process historical data and real-time data are not always identical. In some cases, the batch algorithm can be optimized thanks to the fact that it has access to the complete historical dataset, and then outperform the implementation of the real-time algorithm. Here, choosing between Lambda and Kappa becomes a choice between favoring batch execution performance over code base simplicity.

Finally, there are even more complex use-cases, in which even the outputs of the real-time and batch algorithm are different. For example, a machine learning application where generation of the batch model requires so much time and resources that the best result achievable in real-time is computing and approximated updates of that model. In such cases, the batch and real-time layers cannot be merged, and the Lambda architecture must be used.

From <https://www.ericsson.com/research-blog/data-knowledge/data-processing-architectures-lambda-and-kappa/>

Kappa Architecture

Kappa Architecture is a software architecture pattern. Rather than using a relational DB like SQL or a key-value store like Cassandra, the canonical data store in a Kappa Architecture system is an append-only immutable log. From the log, data is streamed through a computational system and fed into auxiliary stores for serving.

Kappa Architecture is a simplification of Lambda Architecture. A Kappa Architecture system is like a Lambda Architecture system with the batch processing system removed. To replace batch processing, data is simply fed through the streaming system quickly.

Pasted from <http://milinda.pathirage.org/kappa-architecture.com/>

Lambda Architecture

Lambda architecture is a data-processing architecture designed to handle massive quantities of data by taking advantage of both batch– and stream-processing methods. This approach to architecture attempts to balance latencythroughput, and fault-tolerance by using batch processing to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing to provide views of online data. The two view outputs may be joined before presentation. The rise of lambda architecture is correlated with the growth of big data, real-time analytics, and the drive to mitigate the latencies of map-reduce

realtime view        =        function(realtime view, new data)

query        =        function(batch view. realtime view)

batch view        =        function(all data)

The speed layer uses databases that support random reads and random writes. Because these databases support random writes, they’re orders of magnitude more complex than the databases you use in the serving layer, both in terms of implementation and operation.

The beauty of the Lambda Architecture is that once data makes it through the batch layer into the serving layer, the corresponding results in the realtime views are no longer needed.

The master dataset in the Lambda Architecture serves as the source of truth for your Big Data system. Errors at the serving and speed layers can be corrected, but corruption of the master dataset is irreparable.

In the fact-based model, you deconstruct the data into fundamental units called (unsurprisingly) facts. the fact-based model

  • Stores your raw data as atomic facts
  • Keeps the facts immutable and eternally true by using timestamps
  • Ensures each fact is identifiable so that query processing can identify duplicates

Data Storage Options

Major considerations for Hadoop data storage include:

  • File format

– plain text (and structure  text: xml or json)

– SequenceFile

– Avro

– Parquet

  • Compression (codecs)

– compress rate

– compress speed

– CPU cost

– split-ability

  • Data storage system

– whether you should use HBase or HDFS directly to store the data.

– Additionally, tools such as Hive and Impala allow you to define additional structure around your data in Hadoop.

SequenceFiles store data as binary key-value pairs. There are three formats available for records stored within SequenceFiles:

  • Uncompressed

For the most part, uncompressed SequenceFiles don’t provide any advantages over their compressed alternatives, since they’re less efficient for input/output (I/O) and take up more space on disk than the same data in compressed form.

  • Record-compressed

This format compresses each record as it’s added to the file.

  • Block-compressed

This format waits until data reaches block size to compress, rather than as each record is added. Block compression provides better compression ratios compared to record-compressed SequenceFiles, and is generally the preferred compression option for SequenceFiles. Also, the reference to block here is unrelated to the HDFS or filesystem block. A block in block compression refers to a group of records that are compressed together within a single HDFS block.

Serialization Formats

Serialization refers to the process of turning data structures into byte streams either for storage or transmission over a network. Conversely, deserialization is the process of converting a byte stream back into data structures. Serialization is core to a distributed processing system such as Hadoop, since it allows data to be converted into a format that can be efficiently stored as well as transferred across a network connection

The main serialization format utilized by Hadoop is Writables. Writables are compact and fast, but not easy to extend or use from languages other than Java, serialization frameworks seeing increased use within the Hadoop ecosystem, including Thrift, Protocol Buffers, and Avro.

  • Thrift

Thrift uses an Interface Definition Language (IDL) to define interfaces, and uses an IDL file to generate stub code to be used in implementing RPC clients and servers that can be used across languages.

Thrift has several drawbacks: it does not support internal compression of records, it’s not splittable, and it lacks native MapReduce support.

  • Protocol Buffers

The Protocol Buffer (protobuf) format was developed at Google  to facilitate data exchange between services written in different languages. Like Thrift, protobuf structures are defined via an IDL, which is used to generate stub code for multiple languages

Protocol Buffers do not support internal compression of records, are not splittable, and have no native MapReduce support. But also like Thrift, the Elephant Bird project can be used to encode protobuf records, providing support for MapReduce, compression, and splittability

  • Avro

Avro is a language-neutral data serialization system designed to address the major downside of Hadoop Writables: lack of language portability. Like Thrift and Protocol Buffers, Avro data is described through a language-independent schema

Avro stores the schema in the header of each file, it’s self-describing and Avro files can easily be read later

Avro also provides better native support for MapReduce since Avro data files are compressible and splittable. Another important feature of Avro that makes it superior to SequenceFiles for Hadoop applications is support for schema evolution; that is, the schema used to read a file does not need to match the schema used to write the file. This makes it possible to add new fields to a schema as requirements change.

Avro supports Snappy and Deflate compression.

Columnar Formats

if your analysis heavily relied on fetching all fields for records that belonged to a particular time range, row-oriented storage would make sense

More recently, a number of databases have introduced columnar storage, which provides several benefits over earlier row-oriented systems:

Skips I/O and decompression (if applicable) on columns that are not a part of the query.

Works well for queries that only access a small subset of columns. If many columns are being accessed, then row-oriented is generally preferable.

Is generally very efficient in terms of compression on columns because entropy within a column is lower than entropy within a block of rows. In other words, data is more similar within the same column, than it is in a block of rows. This can make a huge difference especially when the column has few distinct values.

Is often well suited for data-warehousing-type applications where users want to aggregate certain columns over a large collection of records.

  • RCFile

The RCFile format was developed to provide fast data loading, fast query processing, and highly efficient storage space utilization. The RCFile format breaks files into row splits, then within each split uses column-oriented storage.

it also has some deficiencies that prevent optimal performance for query times and compression.

  • ORC

The ORC format was created to address some of the shortcomings with the RCFile format

specifically around query performance and storage efficiency.

  • Provides lightweight, always-on compression provided by type-specific readers and writers. ORC also supports the use of zlib, LZO, or Snappy to provide further compression.
  • Allows predicates to be pushed down to the storage layer so that only required data is brought back in queries.
  • Supports the Hive type model, including new primitives such as decimal and complex types.
  • Is a splittable storage format.

A drawback of ORC as of this writing is that it was designed specifically for Hive, and so is not a general-purpose storage format that can be used with non-Hive MapReduce interfaces such as Pig or Java, or other query engines such as Impala.

  • Parquet

Parquet shares many of the same design goals as ORC, but is intended to be a general-purpose storage format for Hadoop.

Parquet provides the following benefits, many of which it shares with ORC:

  • Similar to ORC files, Parquet allows for returning only required data fields, thereby reducing I/O and increasing performance.
  • Provides efficient compression; compression can be specified on a per-column level.
  • Is designed to support complex nested data structures.
  • Stores full metadata at the end of files, so Parquet files are self-documenting.
  • Fully supports being able to read and write to with Avro and Thrift APIs.
  • Uses efficient and extensible encoding schemas—for example, bit-packaging/run length encoding (RLE).

Compression

 Common input format

Compression format Tool Algorithm File extention Splittable
gzip gzip DEFLATE .gz No
bzip2 bizp2 bzip2 .bz2 Yes
LZO lzop LZO .lzo Yes if indexed
Snappy N/A Snappy .snappy No

According with Cloudera blog http://blog.cloudera.com/blog/2011/09/snappy-and-hadoop/

One thing to note is that Snappy is intended to be used with a
container format, like Sequence Files or Avro Data Files, rather than being used directly on plain text, for example, since the latter is not splittable and can’t be processed in parallel using MapReduce. This is different to LZO, where is is possible to index LZO compressed files to determine split points so that LZO files can be processed efficiently in subsequent processing.

This means that if a whole text file is compressed with Snappy then the file is NOT splittable. But if each record inside the file is compressed with Snappy then the file could be splittable, for example in Sequence files with block compression.

Compression recommendations

In general, any compression format can be made splittable when used with container file formats (Avro, SequenceFiles, etc.) that compress blocks of records or each record individually. If you are doing compression on the entire file without using a container file format, then you have to use a compression format that inherently supports splitting (e.g., bzip2, which inserts synchronization markers between blocks).

http://www.slideshare.net/StampedeCon/choosing-an-hdfs-data-storage-format-avro-vs-parquet-and-more-stampedecon-2015

Schema evolution is the term used for how the store behaves when Avro schema is changed after data has been written to the store using an older version of that schema.

More refer

https://avro.apache.org/docs/1.7.7/spec.html#Schema+Resolution

http://doc.akka.io/docs/akka/snapshot/scala/persistence-schema-evolution.html

HDFS Schema Design

  1. Location of HDFS Files

Standard locations make it easier to find and share data between teams. The following is an example HDFS directory structure that we recommend.

  • /user/

Data, JARs, and configuration files that belong only to a specific user.

  • /etl

Data in various stages of being processed by an ETL. The /etl directory will be readable and writable by ETL processes (they typically run under their own user) and members of the ETL team

e.g.

/etl////{input,processing,output,bad}

  • /tmp

Temporary data generated by tools or shared between users

  • /data

Data sets that have been processed and are shared across the organization

Very often user access is read-only, and data is written by automated (and audited) ETL processes. Since data in /data is typically business-critical, only automated ETL processes are typically allowed to write them—so changes are controlled and audited.

  • /app

Includes everything required for Hadoop applications to run, except data. This includes JAR files, Oozie workflow definitions, Hive HQL files, and more.

/app/////.

/app/BI/clickstream/latest/aggregate_preferences/uber-aggregate-preferences.ja

  • /metadata

Stores metadata. While most table metadata is stored in the Hive metastore, as described later in the “Managing Metadata”, some extra metadata (for example, Avro schema files) may need to be stored in HDFS. This directory would be the best location for storing such metadata. This directory is typically readable by ETL jobs but writable by the user used for ingesting data into Hadoop (e.g., Sqoop user). For example, the Avro schema file for a data set called movie may exist at a location like this: /metadata/movielens/movie/movie.avsc.

  1. Advanced HDFS Schema Design

Once the broad directory structure has been decided, the next important decision is how data will be organized into files.

  • small files problem.

As detailed in “File-based data structures”, storing a large number of small files in Hadoop can lead to excessive memory use for the NameNode, since metadata for each file stored in HDFS is held in memory. Also, many small files can lead to many processing tasks, causing excessive overhead in processing.

  • Partitioning

Partitioning a data set is a very common technique used to reduce the amount of I/O required to process the data set.

When placing the data in the filesystem, you should use the following directory format for partitions: /<partition_column_name=partition_column_value>/{files}. In our example, this translates to: medication_orders/date=20131101/{order1.csv, order2.csv}

This directory structure is understood by various tools, like HCatalog, Hive, Impala, and Pig, which can leverage partitioning to reduce the amount of I/O required during processing.

  • Bucketing

Bucketing is another technique for decomposing large data sets into more manageable subsets.

The solution is to bucket by physician, which will use a hashing function to map physicians into a specified number of buckets. This way, you can control the size of the data subsets (i.e., buckets) and optimize for query speed

  • Denormalizing

method of trading disk space for query performance is denormalizing data sets so there is less of a need to join data sets.

In relational databases, this pattern is often known as Materialized Views. In Hadoop, you instead have to create a new data set that contains the same data in its aggregated form.

Metadata

Metadata, in general, refers to data about the data, metadata can refer to:

  • Metadata about logical data sets

This includes information like the location of a data set (e.g., directory in HDFS or the HBase table name), the schema associated with the data set,1 the partitioning and sorting properties of the data set, if any, and the format of the data set, if applicable (e.g., CSV, TSV, SequenceFile, etc.). Such metadata is usually stored in a separate metadata repository.

  • Metadata about files on HDFS

This includes information like permissions and ownership of such files and the location of various blocks of that file on data nodes. Such information is usually stored and managed by Hadoop NameNode.

  • Metadata about tables in HBase

This includes information like table names, associated namespace, associated attributes (e.g., MAX_FILESIZE, READONLY, etc.), and the names of column families. Such information is stored and managed by HBase itself.

  • Metadata about data ingest and transformations

This includes information like which user generated a given data set, where the data set came from, how long it took to generate it, and how many records there are or the size of the data loaded.

  • Metadata about data set statistics

This includes information like the number of rows in a data set, the number of unique values in each column, a histogram of the distribution of data, and maximum and minimum values. Such metadata is useful for various tools that can leverage it for optimizing their execution plans but also for data analysts, who can do quick analysis based on it.

Where to Store Metadata?

The first project in the Hadoop ecosystem that started storing, managing, and leveraging metadata was Apache Hive. Hive stores this metadata in a relational database called the Hive metastore.

Over time, more projects wanted to use the same metadata that was in the Hive metastore. To enable the usage of Hive metastore outside of Hive, a separate project called HCatalog was started.

Today,HCatalog is a part of Hive and serves the very important purpose of allowing other tools (like Pig and MapReduce) to integrate with the Hive metastore. It also opens up access to the Hive metastore to a broader ecosystem by exposing a REST API to the Hive metastore via the WebHCat server.

Limitations of the Hive Metastore and HCatalog

  • Problems with high availability
  • Fixed schema for metadata

Other Ways of Storing Metadata

  • Embedding metadata in file paths and names

For example, in case of a partitioned data set, the directory structure would look like:

  •  Storing the metadata in HDFS

One option to store such metadata is to create a hidden directory, say .metadata, inside the directory  containing the data in HDFS. You may decide to store the schema of the data in an Avro schema file.

For example, this is what your directory structure in HDFS would look like:

/data/event_log

/data/event_log/file1.avro

/data/event_log/.metadata

You can use something like Kite SDK to store metadata.

Kite is a high-level data layer for Hadoop. It is an API and a set of tools that speed up development. You configure how Kite stores your data in Hadoop, instead of building and maintaining that infrastructure yourself.

http://kitesdk.org/docs/current/

Data Movement

Incremental Updates

HDFS is the inability to do appends or random writes to files after they’re created

If the requirements also include modifying existing data, HDFS is read only—you can’t update records in place as you would with a relational database. In this case we first write a “delta” file that includes

the changes that need to be made to the existing file. A compaction job is required to handle the modifications. In a compaction job, the data is sorted by a primary key. If the row is found twice, then the data from the newer delta file is kept and the data from the older file is not. The results of the compaction process are written to disk, and when the process is complete the resulting compaction data will replace the older, uncompacted data

Transformations

  • Transformation

XML or JSON is converted to delimited data.

  • Partitioning

Incoming data is stock trade data and partitioning by ticker is required.

  • Splitting

The data needs to land in HDFS and HBase for different access patterns.

Network Security

Sometimes you need to ingest data from sources that you can access only by going outside the company’s firewalls

Flume includes support for encrypting data sent between Flume agents. Note that Kafka does not currently support encryption of data within a Kafka data flow, so you’ll have to do additional work to encrypt and decrypt the data outside of Kafka.

Failure Handling

Failure handling is a major consideration when you’re designing an ingestion pipeline; how to recover in case of failure is a critical question.

Kite SDK – A data API for Hadoop

 The goal of Kite SDK is to make using best practices of Hadoop easier to code

Kite is a high-level data layer for Hadoop. It is an API and a set of tools that speed up development. You configure how Kite stores your data in Hadoop, instead of building and maintaining that infrastructure yourself.

From <http://kitesdk.org/docs/1.1.0/>

Morphlines is an open source framework that reduces the time and efforts necessary to build and change Hadoop ETL stream processing applications that extract, transform and load data into Apache Solr, HBase, HDFS, Enterprise Data Warehouses, or Analytic Online Dashboards.

From <http://kitesdk.org/docs/1.1.0/morphlines/>

Orchestration Frameworks in the Hadoop Ecosystem

Oozie was developed by Yahoo! in order to support its growing Hadoop clusters and the increasing number of jobs and workflows running on those clusters.

Azkaban was developed by LinkedIn with the goal of being a visual and easy way to manage workflows.

Luigi is an open source Python package from Spotify that allows you to orchestrate long-running batch jobs and has built-in support for Hadoop.

Chronos is an open source, distributed, and fault-tolerant scheduler from Airbnb that runs on top of Mesos. It’s essentially a distributed system that’s meant to serve as a replacement for cron.

Application architect vs Solution architect vs Enterprise architect

For people who have never worked in a very large organization (or have, but it was a dysfunctional one), “architect” may have left a bad taste in their mouth. However, it is not only a legitimate role, but a highly strategic one for smart companies.

When an application becomes so vast and complex that dealing with the overall technical vision and planning, and translating business needs into technical strategy becomes a full-time job, that is an application architect. Application architects also often mentor and/or lead developers, and know the code of their responsible application(s) well.

When an organization has so many applications and infrastructure inter-dependencies that it is a full-time job to ensure their alignment and strategy without being involved in the code of any of them, that is a solution architect. Solution architect can sometimes be similar to an application architect, but over a suite of especially large applications that comprise a logical solution for a business.

When an organization becomes so large that it becomes a full-time job to coordinate the high-level planning for the solution architects, and frame the terms of the business technology strategy, that role is an enterprise architect. Enterprise architects typically work at an executive level, advising the CxO office and its support functions as well as the business as a whole.

There are also infrastructure architects, information architects, and a few others, but in terms of total numbers these comprise a smaller percentage than the “big three”.

The two most common misconceptions about “architect” are:

An architect is simply a more senior/higher-earning developer with a fancy title

An architect is someone who is technically useless, hasn’t coded in years but still throws around their weight in the business, making life difficult for developers

These misconceptions come from a lot of architects doing a pretty bad job, and organizations doing a terrible job at understanding what an architect is for. It is common to promote the top programmer into an architect role, but that is not right. They have some overlapping but not identical skillsets. The best programmer may often be, but is not always, an ideal architect. A good architect has a good understanding of many technical aspects of the IT industry; a better understanding of business needs and strategies than a developer needs to have; excellent communication skills and often some project management and business analysis skills. It is essential for architects to keep their hands dirty with code and to stay sharp technically. Good ones do.

LikeApplication architect vs Solution architect vs Enterprise architectCommentShareShare Application architect vs Solution architect vs Enterprise architect

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s