Spark是当前流行的大数据内存计算框架。它是建立在一个统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streaming,SQL,Machine Learning以及Graph等,所以说spark是一种通用的大数据分析引擎。
Spark将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。其底层采用Scala这种函数式语言书写而成。之前我们学习了hadoop,hadoop的计算框架是mapreduce,而spark其中一个数据处理模型也是mapreduce。这两个mapreduce从功能上来讲差别不大,大的区别是spark和hadoop处理数据的出发点不同。Spark是内存级数据处理框架,在数据处理过程中包括产生的中间数据都是在内存中进行。相比于hadoop处理数据过程中,将中间数据暂时写进磁盘来说性能有了很大的改观。
表格 1 1 Spark vs mapreduce
Mapreduce Spark
数据存储结构:磁盘hdfs文件系统的split 使用内存构建弹性分布式数据集RDD,对数据进行运算和cache
编程范式:map+reduce DAG(有向无环图):Transformation+action
计算中间数据落磁盘,io及序列化、反序列话代价大 计算中间数据在内存中维护,存取速度是磁盘的多个数量级
Task以进程的方式维护,任务启动就有数秒 Task以线程的方式进行维护,对小数据集的读取能达到亚秒级的延迟
Spark使用内存构建RDD弹性分布式数据集,它是将数据放入内存之中进行维护的,RDD弹性分布式数据集是一个容错性、并行的数据结构,可以让用户显式的将数据存储到磁盘或者内存中,并能控制数据的分区。RDD提供了丰富的操作来操作这些数据集。spark的mapreduce算法模型就是RDD提供的算子map函数,reduce函数的计算框架来实现的。除此之外,RDD还提供了诸如join、groupBy、reduceByKey等更为方便的操作,使得spark能够非常容易的支持常见的数据运算。在这些操作中,诸如map、flatMap、filter等转换操作实现了monad模式(函数式编程的概念,表示各个函数间数据迭代处理的过程),并很好的契合了scala的集合操作,所以scala语言也就成为了spark编程相对比较好用的的一种语言。
理解RDD是spark入门的非常关键的一点。RDD可以看成能封装任何数据类型的一个对象,有transform和action两种方法。我们将需要处理的数据通过transform操作之后可以得到RDD,之后通过action操作得到计算结果。比如wordcount的例子,RDD用一个文本初始化,经过map处理转换成另一个RDD,这个map属于spark里的transform操作,将RDD封装成key,value这样的键值对,后经过reduce后就得到了结果,reduce属于action操作。
Spark生态
spark除了自己的核心库之外还提供了其他很多库,比如Spark Sql,Spark Streaming,Spark MLlib ,Spark Graphx等。
Spark Core:包含Spark的基本功能;尤其是定义RDD的API、操作以及这两者上的动作。其他Spark的库都是构建在RDD和Spark Core之上的。
Spark Sql:可以通过JDBC Api将spark数据暴露出去,而且还可以用传统的BI和可视化工具在spark数据上执行类似SQL的查询。用户还可以用Spark Sql对不同格式的数据(JSON,Parquet以及数据库等)执行ETL,将其转化,然后暴露给特定的查询。
Spark Streaming:它是基于微批量方式的计算和处理,可以用于处理实时的流式数据。它使用DStream,简单来说就是一个弹性分布式数据集(RDD)系列,处理实时数据。
Spark Graphx:是用于图形计算和并行计算的新的(alpha)Spark Api。通过一如弹性分布式属性图,一个顶点和边都带有属性的有向多重图,扩展了Spark RDD。为了支持图计算,Graphx暴露了一个基础操作符集合和一个经过优化的Pregel Api变体。此外,Graphx还包括了一个持续增长的用于简化图分析任务的图算法和构建器集合。
Spark MLlib:是一个可扩展的Spark机器学习库,由通用的学习算法和工具组成,包括二元分类、线性回归、聚类、协同过滤、梯度下降以及底层优化原语。
Spark单机模式搭建与操作
Spark的本地模式主要用于spark程序的测试和开发,所有的进程都是在虚拟机里模拟的。安装之前,确保本机上已经装好了jdk1.7。
(1) 下载安装scala -2.11.7.tgz并解压到当前目录创建的scala里
Shell Code
清单 1 1
1 $ mkdir ~/scala
$ tar zxvf scala-2.11.7.tgz –C ~/scala
(2) 下载解压程序包spark-1.5.0-bin-hadoop2.6.tgz到当前目录创建的spark里
Shell Code
清单 1 2
1 $ mkdir ~/spark
$ tar zxvf spark-1.5.0-bin-hadoop2.6.tgz –C ~/spark
(3) 添加环境变量
Shell Code
清单 1 3
$ vim ~/.bash_profile
export SCALA_HOME=/home/hadoop/scala/scala-2.11.7
export SPARK_HOME=/home/hadoop/spark/spark-1.5.0-bin-hadoop2.6
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/sbin:$HADOOP_HOME/bin:$HBASE_HOME/bin:$SPARK_HOME/bin:$SCALA_HOME/bin
$ source ~/.bash_profile
(4) 直接进入~/spark/spark-1.5.0-bin-hadoop2.6/bin下
1> 测试spark-shell :
Spark启动并运行后,可以用Spark shell连接到Spark引擎进行交互式数据分析。Spark shell支持Scala和Python两种语言。可以用spark-shell.cmd和pyspark.cmd命令分别运行Scala版本和Python版本的Spark Shell。前提是需要设置好scala或python的环境。本教程以scala语言为主。
Shell Code
清单 1 4
1 $ ./spark-shell
图 1
查看spark版本
Shell Code
清单 1 5
1 scala> sc.version
图 2
使用本地文件创建RDD
Shell Code
清单 1 6
1 scala> val testRDD=sc.textFile("/home/hadoop/spark/spark-1.5.0-bin-hadoop2.6/README.md")
图 3
testRDD调用count函数统计数据行数
Shell Code
清单 1 7
1 testRDD.count()
图 4
退出spark-shell
Shell Code
清单 1 8
1 scala> exit
图 5
2> run-example 可以直接运行spark自带的范例
Shell Code
清单 1 9
1 $ ./run-example SparkPi
图 6
3 > Spark-submit:用来提交spark任务,spark-shell其实就是调用了spark-submit,之后调用spark-class
函数来完成相关命令的。
Shell Code
清单 1 10
1
./spark-submit --class org.apache.spark.examples.SparkPi --master local --executor-memory 20m ../lib/spark-examples-1.5.0-hadoop2.6.0.jar
--class 类名 ;--master local 选择本地模式(其实在本地模式下的spark可以不加这个参数,默认为本地模式);--executor-memory 20m 设置执行器申请的内存;../lib/spark-examples-1.5.0-hadoop2.6.0.jar是你spark安装目录下的lib目录下的examples的jar包(通常写好的spark程序需要打成包放在这里调用);如果spark-submit 报错 Initial job has not accepted any resources,可以将--executor-memory设置小一点。
图 7
热点新闻
前端开发技术库