#onenote# spark

Components of Spark

The following illustration depicts the different components of Spark.

Resilient Distributed Datasets

Resilient Distributed Datasets (RDD) is a fundamental data structure of Spark. It is an immutable distributed collection of objects. Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the cluster. RDDs can contain any type of Python, Java, or Scala objects, including user-defined classes.

Formally, an RDD is a read-only, partitioned collection of records. RDDs can be created through deterministic operations on either data on stable storage or other RDDs. RDD is a fault-tolerant collection of elements that can be operated on in parallel.

 

Iterative Operations on Spark RDD

The illustration given below shows the iterative operations on Spark RDD. It will store intermediate results in a distributed memory instead of Stable storage (Disk) and make the system faster.

Note − If the Distributed memory (RAM) is not sufficient to store intermediate results (State of the JOB), then it will store those results on the disk.

Interactive Operations on Spark RDD

This illustration shows interactive operations on Spark RDD. If different queries are run on the same set of data repeatedly, this particular data can be kept in memory for better execution times.

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory, in which case Spark will keep the elements around on the cluster for much faster access, the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

 

Pasted from <http://www.tutorialspoint.com/apache_spark/apache_spark_rdd.htm>

 

Cluster Mode Overview

 

 

      1. Application

Application是创建了SparkContext实例对象的Spark用户程序,包含了一个Driver program和集群中多个Worker上的Executor,其中,每个Worker为每个应用仅仅提供一个Executor。

Spark-shell是一个应用程序,因为spark-shell在启动的时候创建了SparkContext对象,其名称为sc。

      1. Job

和Spark的action相对应,每一个action例如count、savaAsTextFile等都会对应一个Job实例,该Job实例包含多任务的并行计算。

      1. Driver Program

运行Application的main函数并且新建SparkContext实例的程序。通常用SparkContext代表Driver Program。

      1. Cluster Manager

集群资源管理的外部服务,在Spark上现在主要有Standalone、Yarn、Mesos等三种集群资源管理器,Spark自带的Standalone模式能够满足绝大部分纯粹的Spark计算环境中对集群资源管理的需求,基本上只有在集群中运行多套计算框架的时候才建议考虑Yarn和Mesos。

      1. Worker Node

集群中可以运行Application代码的工作节点,相当于Hadoop的slave节点。

      1. Executor

在Worker Node上为Application启动的一个工作进程,在进程中负责任务(Task)的运行,并且负责将数据存放在内存或磁盘上,必须注意的是,每个应用在一个Worker Node上只会有一个Executor,在Executor内部通过多线程的方式并发处理应用的任务。

每个Application都有各自独立的executors。

      1. Task

被Driver送到executor上的工作单元,通常情况下一个task会处理一个split的数据,每个split一般就是一个Block块的大小。

      1. Stage

一个Job会被拆分成多组任务(TaskSet),每一组任务被称为Stage,任务和MapReduce的map和reduce任务很像。

划分Stage的依据在于:Stage开始一般是由于读取外部数据或者Shuffle数据、一个Stage的结束一般是由于发生Shuffle(例如reduceByKey操作)或者整个Job结束时例如要把数据放到hdfs等存储系统上。

      1. RDD

Spark的基本计算单元,是Spark的一个最核心的抽象概念,可以通过一系列算子进行操作,包括Transformation和Action两种算子操作。

 

Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

 

Accumulators

Accumulators are variables that are only “added” to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers can add support for new types. If accumulators are created with a name, they will be displayed in Spark’s UI. This can be useful for understanding the progress of running stages (NOTE: this is not yet supported in Python).

scala> val accum = sc.accumulator(0, “My Accumulator”)
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10

 

      1. 广播变量(Broadcast Variables)
      1. 广播变量缓存到各个节点的内存中,而不是每个Task;
      2. 广播变量被创建后,能在集群中运行的任何函数调用;
      3. 广播变量是只读的,不能在被广播后修改;
      4. 对于大数据集的广播, Spark 尝试使用高效的广播算法来降低通信成本;
      5. 使用方法:
val broadcastVar = sc.broadcast(Array(1, 2, 3))   // 创建广播变量

broadcastVar.value                                                        // 广播变量的使用

 

      1. 累加器(Accumulator)
      1. 累加器只支持加法操作;
      2. 累加器可以高效地并行,用于实现计数器和变量求和;
      3. Spark 原生支持数值类型和标准可变集合的计数器,但用户可以添加新的类型;
      4. 只有驱动程序才能获取累加器的值
      5. 使用方法:
val accum = sc.accumulator(0)                                                          // 累加器变量的创建

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)        // 累加器变量的使用

accum.value                                                                                             // 在Driver Program 中获取累加器变量的值

Command

Spark-shell

In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the variable called sc. Making your own SparkContext will not work

 

 

 

import org.apache.spark.SparkContext

import org.apache.spark.SparkConf

val conf = new SparkConf().setAppName(“ForestSpark”).setMaster(“local”)

 

  1. val inputfile = sc.textFile(“input.txt”)
  2. spark-submit  –class “spark.SimpleApp”   –master local[4]  test-1.0-SNAPSHOT.jar

 

  1. Run on YARN

./bin/spark-submit –class org.apache.spark.examples.SparkPi \

–master yarn-cluster \

–num-executors 3 \

–driver-memory 4g \

–executor-memory 2g \

–executor-cores 1 \

lib/spark-examples*.jar \

10

  1. Persist

import org.apache.spark.storage.StorageLevel._

Val d=sc.textFile(“README.MD”)

d.persist(MEMORY_ONLY)

 

  1. Spark sql

val sqlContext = new org.apache.spark.sql.SQLContext(sc) //no need if already initialized in spark-shell

val df = sqlContext.read.json(“menu.json”)

df.show()

 

  1. val df = sqlContext.read.json(“people.json”)

// Show the content of the DataFrame

df.show()

// Print the schema in a tree format

df.printSchema()

// Select only the “name” column

df.select(“name”).show()

// Select everybody, but increment the age by 1

df.select(df(“name”), df(“age”) + 1).show()

// Select people older than 21

df.filter(df(“age”) > 21).show()

// Count people by age

df.groupBy(“age”).count().show()

 

Spark master URL

spark在那里指定master URL的问题:

1.通过spark shell,执行后进入交互界面

MASTER=spark://IP:PORT ./bin/spark-shell

 

2.程序内指定(可以通过参数传入)

val conf = new SparkConf()

.setMaster(…)

val sc = new SparkContext(conf)

 

传递给spark的master url可以有如下几种:

 

local 本地单线程

local[K] 本地多线程(指定K个内核)

local[*] 本地多线程(指定所有可用内核)

spark://HOST:PORT 连接到指定的 Spark standalone cluster master,需要指定端口。

mesos://HOST:PORT 连接到指定的 Mesos 集群,需要指定端口。

yarn-client客户端模式 连接到 YARN 集群。需要配置 HADOOP_CONF_DIR。

yarn-cluster集群模式 连接到 YARN 集群 。需要配置 HADOOP_CONF_DIR。

 

spark1.0起的版本在提交程序到集群有很大的不同,需要注意:

./bin/spark-submit \

–class <main-class>

–master <master-url> \

–deploy-mode <deploy-mode> \

… # other options

<application-jar> \

[application-arguments]

例如:

# Run application locally on 8 cores

./bin/spark-submit \

–class org.apache.spark.examples.SparkPi \

–master local[8] \

/path/to/examples.jar \

100

 

# Run on a Spark standalone cluster

./bin/spark-submit \

–class org.apache.spark.examples.SparkPi \

–master spark://207.184.161.138:7077 \

–executor-memory 20G \

–total-executor-cores 100 \

/path/to/examples.jar \

1000

 

# Run on a YARN cluster

export HADOOP_CONF_DIR=XXX

./bin/spark-submit \

–class org.apache.spark.examples.SparkPi \

–master yarn-cluster \  # can also be `yarn-client` for client mode

–executor-memory 20G \

–num-executors 50 \

/path/to/examples.jar \

1000

 

# Run a Python application on a cluster

./bin/spark-submit \

–master spark://207.184.161.138:7077 \

examples/src/main/python/pi.py \

1000

 

RDD

http://ifeve.com/spark-sql-dataframes/

A resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program (a Scala Seq). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. 

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

Understanding closures 

One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below well look at code that uses foreach() to increment a counter, but similar issues can occur for other operations as well.

Example

Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in local mode (–master = local[n]) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):

var counter = 0
var rdd = sc.parallelize(data)

// Wrong: Don’t do this!!
rdd
.foreach(x => counter += x)

println(“Counter value: “ + counter)

Shuffle operations

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Sparks mechanism for re-distributing data so that its grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR) and pair RDD functions doc (ScalaJava) for details.

Transformation

Meaning

map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.

filter(func)

Return a new dataset formed by selecting those elements of the source on which funcreturns true.

flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item).

mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

sample(withReplacementfractionseed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.

Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 

Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin, and fullOuterJoin.

cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

pipe(command[envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR)

and pair RDD functions doc (ScalaJava) for details.

Action

Meaning

reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

count()

Return the number of elements in the dataset.

first()

Return the first element of the dataset (similar to take(1)).

take(n)

Return an array with the first n elements of the dataset.

takeSample(withReplacement,num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

takeOrdered(n[ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

saveAsSequenceFile(path)

(Java and Scala)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

saveAsObjectFile(path)

(Java and Scala)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating anAccumulator or interacting with external storage systems. 

Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

Pasted from <http://spark.apache.org/docs/latest/programming-guide.html#initializing-spark>

Spark SQL

http://ifeve.com/spark-sql-dataframes/

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json(“menu.json”)

df.show()

Creating Datasets

case class Person(name: String, age: Long)

val ds = Seq(Person(“Andy”, 32)).toDS()

ds.show()

val people = sqlContext.read.json(“people1.json”).as[Person]

ds.show()

Interoperating with RDDs

import sqlContext.implicits._

case class Person(name: String, age: Int)

val newpeople = sc.textFile(“people.txt”).map(_.split(“,”)).map(p => Person(p(0), p(1).trim.toInt)).toDF()

newpeople.registerTempTable(“people”)

val teenagers = sqlContext.sql(“SELECT name, age FROM people WHERE age >= 13 AND age <= 19”)

teenagers.map(t => “Name: ” + t(0)).collect().foreach(println)

teenagers.map(t => “Name: ” + t.getAs[String](“name”)).collect().foreach(println)

teenagers.map(_.getValuesMap[Any](List(“name”, “age”))).collect().foreach(println)

Generic Load/Save Functions

val df = sqlContext.read.load(“users.parquet”)
df
.select(“name”, “favorite_color”).write.save(“namesAndFavColors.parquet”)

Or

df.select(“name”, “age”).write.format(“parquet”).save(“namesAndAges.parquet”)

Run SQL on files directly

val df = sqlContext.sql(“SELECT * FROM parquet.`users.parquet`”)

Hive Tables

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql(“CREATE TABLE IF NOT EXISTS src (key INT, value STRING)”)

sqlContext.sql(“LOAD DATA LOCAL INPATH ‘examples/src/main/resources/kv1.txt’ INTO TABLE src”)

// Queries are expressed in HiveQL

sqlContext.sql(“FROM src SELECT key, value”).collect().foreach(println)

Another sample

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

hiveContext.sql(“create table yahoo_orc_table (date STRING, open_price FLOAT, high_price FLOAT, low_price FLOAT, close_price FLOAT, volume INT, adj_price FLOAT) stored as orc”)

val yahoo_stocks = sc.textFile(“hdfs://sandbox.hortonworks.com:8020/tmp/yahoo_stocks.csv”)

yahoo_stocks.take(10)

results.map(t => “Stock Entry: ” + t.toString).collect().foreach(println)

results.write.format(“orc”).save(“yahoo_stocks_orc”)

Spark Streaming

Discretized Streams (DStreams)

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset (see Spark Programming Guidefor more details). Each RDD in a DStream contains data from a certain interval, as shown in the following figure.

Points to remember

      • When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL, where n > number of receivers to run (see Spark Properties for information on how to set the master).
      • Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process it.

 

Window Operations

Spark Streaming also provides windowed computations, which allow you to apply transformations over a sliding window of data. The following figure illustrates this sliding window.

 

      • window length – The duration of the window (3 in the figure).
      • sliding interval – The interval at which the window operation is performed (2 in the figure).

Checkpointing

A streaming application must operate 24/7 and hence must be resilient to failures unrelated to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible, Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures. There are two types of data that are checkpointed.

DataFrame and SQL Operations

You can easily use DataFrames and SQL operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier word count example to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.

/** DataFrame operations inside your streaming program */

val words: DStream[String] =

words.foreachRDD { rdd =>

// Get the singleton instance of SQLContext
val sqlContext =SQLContext.getOrCreate(rdd.sparkContext)
importsqlContext.implicits._

// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF(“word”)

// Register as table
wordsDataFrame.registerTempTable(“words”)

// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
sqlContext.sql(“select word, count(*) as total from words group by word”)
wordCountsDataFrame.show()
}

 

Caching / Persistence

Similar to RDDs, DStreams also allow developers to persist the stream’s data in memory. That is, using the persist() method on a DStream will automatically persist every RDD of that DStream in memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple operations on the same data). For window-based operations like reduceByWindow and reduceByKeyAndWindow and state-based operations like updateStateByKey, this is implicitly true. Hence, DStreams generated by window-based operations are automatically persisted in memory, without the developer calling persist().

 

Pasted from <http://spark.apache.org/docs/latest/streaming-programming-guide.html>

 

RDD  DataFrame DataSet

http://ifeve.com/spark-sql-dataframes/

 

RDD

The RDD (Resilient Distributed Dataset) API has been in Spark since the 1.0 release. This interface and its Java equivalent, JavaRDD, will be familiar to any developers who have worked through the standard Spark tutorials. From a developer’s perspective, an RDD is simply a set of Java or Scala objects representing data

 

Scala:

 

rdd.filter(_.age > 21) // transformation

.map(._last)            // transformation

.saveAsObjectFile(“under21.bin”); // action

 

From <http://www.kdnuggets.com/2016/02/apache-spark-rdd-dataframe-dataset.html>

 

 

DataFrames

DataFrame是一种分布式数据集合,每一条数据都由几个命名字段组成。概念上来说,她和关系型数据库的表 或者 R和Python中的data frame等价,只不过在底层,DataFrame采用了更多优化。DataFrame可以从很多数据源(sources)加载数据并构造得到,如:结构化数据文件,Hive中的表,外部数据库,或者已有的RDD。

 

From <http://ifeve.com/spark-sql-dataframes/>

 

A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

 

Spark 1.3 introduced a new DataFrame API as part of the Project Tungsten initiative which seeks to improve the performance and scalability of Spark. The DataFrame API introduces the concept of a schema to describe the data, allowing Spark to manage the schema and only pass data between nodes, in a much more efficient way than using Java serialization. There are also advantages when performing computations in a single process as Spark can serialize the data into off-heap storage in a binary format and then perform many transformations directly on this off-heap memory, avoiding the garbage-collection costs associated with constructing individual objects for each row in the data set.

 

创建DataFrame

Spark应用可以用SparkContext创建DataFrame,所需的数据来源可以是已有的RDD(existing RDD),或者Hive表,或者其他数据源(data sources.)

 

 

val sc: SparkContext // An existing SparkContext.
val sqlContext =new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json(“examples/src/main/resources/people.json”)

// Displays the content of the DataFrame to stdout
df.show()

df.filter(“age > 21”)

 

Spark SQL有两种方法将RDD转为DataFrame。

  1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,而且如果你事先知道数据schema,推荐使用这种方式;
  2. 编程方式构建一个schema,然后应用到指定RDD上。这种方式更啰嗦,但如果你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你很可能需要用这种方式

 

 

 

 

Dataset API

Datasets are similar to RDDs, however, instead of using Java Serialization or Kryo they use a specialized Encoder to serialize the objects for processing or transmitting over the network. While both encoders and standard serialization are responsible for turning an object into bytes, encoders are code generated dynamically and use a format that allows Spark to perform many operations like filtering, sorting and hashing without deserializing the bytes back into an object.

 

 

The Dataset API, released as an API preview in Spark 1.6, aims to provide the best of both worlds; the familiar object-oriented programming style and compile-time type-safety of the RDD API but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API.

 

创建Dataset

Dataset API和RDD类似,不过Dataset不使用Java序列化或者Kryo,而是使用专用的编码器(Encoder )来序列化对象和跨网络传输通信。如果这个编码器和标准序列化都能把对象转字节,那么编码器就可以根据代码动态生成,并使用一种特殊数据格式,这种格式下的对象不需要反序列化回来,就能允许Spark进行操作,如过滤、排序、哈希等。

 

 

 

Example: Creating Dataset from a list of objects

 

Scala

 

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

import sqlContext.implicits._

val sampleData: Seq[ScalaPerson] = ScalaData.sampleData()

val dataset = sqlContext.createDataset(sampleData)

 

From <http://www.kdnuggets.com/2016/02/apache-spark-rdd-dataframe-dataset.html/2>

Spark Internal

Spark在接收到提交的作业后,会进行RDD依赖分析并划分成多个stage,以stage为单位生成taskset并提交调度。这里stage划分是一个设计的亮点,这两天学习一下。

Spark作业调度

对RDD的操作分为transformation和action两类,真正的作业提交运行发生在action之后,调用action之后会将对原始输入数据的所有transformation操作封装成作业并向集群提交运行。这个过程大致可以如下描述:

      • 由DAGScheduler对RDD之间的依赖性进行分析,通过DAG来分析各个RDD之间的转换依赖关系
      • 根据DAGScheduler分析得到的RDD依赖关系将Job划分成多个stage
      • 每个stage会生成一个TaskSet并提交给TaskScheduler,调度权转交给TaskScheduler,由它来负责分发task到worker执行

RDD依赖关系

Spark中RDD的粗粒度操作,每一次transformation都会生成一个新的RDD,这样就会建立RDD之间的前后依赖关系,在Spark中,依赖关系被定义为两种类型,分别是窄依赖和宽依赖

      • 窄依赖,父RDD的分区最多只会被子RDD的一个分区使用,
      • 宽依赖,父RDD的一个分区会被子RDD的多个分区使用

图中左边都是窄依赖关系,可以看出分区是1对1的。右边为宽依赖关系,有分区是1对多。

stage的划分

stage的划分是Spark作业调度的关键一步,它基于DAG确定依赖关系,借此来划分stage,将依赖链断开,每个stage内部可以并行运行,整个作业按照stage顺序依次执行,最终完成整个Job。实际应用提交的Job中RDD依赖关系是十分复杂的,依据这些依赖关系来划分stage自然是十分困难的,Spark此时就利用了前文提到的依赖关系,调度器从DAG图末端出发,逆向遍历整个依赖关系链,遇到ShuffleDependency(宽依赖关系的一种叫法)就断开,遇到NarrowDependency就将其加入到当前stage。stage中task数目由stage末端的RDD分区个数来决定,RDD转换是基于分区的一种粗粒度计算,一个stage执行的结果就是这几个分区构成的RDD。

图中可以看出,在宽依赖关系处就会断开依赖链,划分stage,这里的stage1不需要计算,只需要计算stage2和stage3,就可以完成整个Job。

 

From <http://wongxingjun.github.io/2015/05/25/Spark%E4%BD%9C%E4%B8%9A%E8%B0%83%E5%BA%A6%E4%B8%ADstage%E7%9A%84%E5%88%92%E5%88%86/>

 

 

An RDD can depend on zero or more other RDDs. For example when you say x = y.map(…), xwill depend on y. These dependency relationships can be thought of as a graph.

You can call this graph a lineage graph, as it represents the derivation of each RDD. It is also necessarily a DAG, since a loop is impossible to be present in it.

Narrow dependencies, where a shuffle is not required (think map and filter) can be collapsed into a single stage. Stages are a unit of execution, and they are generated by the DAGScheduler from the graph of RDD dependencies. Stages also depend on each other. The DAGScheduler builds and uses this dependency graph (which is also necessarily a DAG) to schedule the stages.

 

Each RDD maintains a pointer to one or more parent along with the metadata about what type of relationship it has with the parent. For example, when we call val b = a.map() on a RDD, the RDD bjust keeps a reference (and never copies) to its parent a, that’s a lineage.

And when the driver submits the job, the RDD graph is serialized to the worker nodes so that each of the worker nodes apply the series of transformations (like, map filter and etc..) on different partitions. Also, this RDD lineage will be used to recompute the data if some failure occurs.

To display the lineage of an RDD, Spark provides a debug method toDebugString() method. Consider the following example,

val input = sc.textFile(“log.txt”)
val splitedLines = input.map(line => line.split(” “))
.map(words => (words(0), 1))
.reduceByKey{(a,b) => a + b}

Executing toDebugString() on splitedLines RDD, will output the following,

(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[5] at map at <console>:24 []
|  MapPartitionsRDD[4] at map at <console>:23 []
|  log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
|  log.txt HadoopRDD[0] at textFile at <console>:21 []

 

From <http://stackoverflow.com/questions/30699530/in-apache-spark-how-does-lineage-get-passed-down-in-rdds>

 

Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.

Spark translates the RDD transformations into something called DAG (Directed Acyclic Graph) and starts the execution,

At high level, when any action is called on the RDD, Spark creates the DAG and submits to the DAG scheduler.

      • The DAG scheduler divides operators into stages of tasks. A stage is comprised of tasks based on partitions of the input data. The DAG scheduler pipelines operators together. For e.g. Many map operators can be scheduled in a single stage. The final result of a DAG scheduler is a set of stages.
      • The Stages are passed on to the Task Scheduler.The task scheduler launches tasks via cluster manager.(Spark Standalone/Yarn/Mesos). The task scheduler doesn’t know about dependencies of the stages.
      • The Worker executes the tasks on the Slave.

Let’s come to how Spark builds the DAG.

At high level, there are two transformations that can be applied onto the RDDs, namely narrow transformation and wide transformation. Wide transformations basically result in stage boundaries.

Narrow transformation – doesn’t require the data to be shuffled across the partitions. for example, Map, filter and etc..

wide transformation – requires the data to be shuffled for example, reduceByKey and etc..

Let’s take an example of counting how many log messages appear at each level of severity,

Following is the log file that starts with the severity level,

INFO I’m Info message
WARN I’m a Warn message
INFO I’m another Info message

and create the following scala code to extract the same,

val input = sc.textFile(“log.txt”)
val splitedLines = input.map(line => line.split(” “))
.map(words => (words(0), 1))
.reduceByKey{(a,b) => a + b}

This sequence of commands implicitly defines a DAG of RDD objects (RDD lineage) that will be used later when an action is called. Each RDD maintains a pointer to one or more parent along with the metadata about what type of relationship it has with the parent. For example, when we call val b = a.map() on a RDD, the RDD b keeps a reference to its parent a, that’s a lineage.

To display the lineage of an RDD, Spark provides a debug method toDebugString() method. For example executing toDebugString() on splitedLines RDD, will output the following,

(2) ShuffledRDD[6] at reduceByKey at <console>:25 []
+-(2) MapPartitionsRDD[5] at map at <console>:24 []
|  MapPartitionsRDD[4] at map at <console>:23 []
|  log.txt MapPartitionsRDD[1] at textFile at <console>:21 []
|  log.txt HadoopRDD[0] at textFile at <console>:21 []

The first line (from bottom) shows the input RDD. We created this RDD by calling sc.textFile(). See below more diagrammatic view of the DAG graph created from the given RDD.

Once the DAG is build, Spark scheduler creates a physical execution plan. As mentioned above, the DAG scheduler splits the graph into multiple stages, the stages are created based on the transformations. The narrow transformations will be grouped (pipe-lined) together into a single stage. So for our example, Spark will create two stage execution as follows,

The DAG scheduler then submit the stages into the task scheduler. The number of tasks submitted depends on the number of partitions present in the textFile. Fox example consider we have 4 partitions in this example, then there will be 4 set of tasks created and submitted in parallel provided if there are enough slaves/cores. Below diagram illustrates this in bit more detail,

For more detailed information i suggest you to go through the following youtube videos where the Spark creators give in depth details about the DAG and execution plan and lifetime.

 

From <http://stackoverflow.com/questions/30691385/internal-work-of-spark/30691654#30691654>

 

 

 

两种依赖的的区别,从两  个方面来说比较有用。

      1. 首先,窄依赖可以进行pipeline操作,即,允许在单个集群节点上流水线式执行,这个节点可以计算所有父级分区。

对应的,宽依赖需要所有的父RDD数据可用并且数据已经通过类MapReduce的操作shuffle完成。

      1. 其次,在窄依赖中,节点失败后的恢复更加高效。

窄依赖中,只有丢失的父级分区需要重新计算,并且这些丢失的父级分区可以并行地在不同节点上重新计算。与此相反,在宽依赖的继承关系中,单个失败的节点可能导致一个RDD的所有先祖RDD中的一些分区丢失,导致计算的重新执行。

 

 

 

 

 

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s