东华大学网络教育学院《大数据》课程大作业代做案例
大作业
添加微信好友, 获取更多信息
复制微信号
一、提交材料
一份电子版报告。
要求:正文篇幅不超过10页,不少于5页;文件格式为doc、docx或者pdf;文件的大小不超过3MB。
封面模板见附件一(下一页)。
二、报告内容
任务一:
1. 以自己的姓名拼音创建新用户(zyf),安装Hadoop环境(参考实验指导书)。实验中所有截图的命令提示符都应为该用户,以此作为个人独立完成大作业的标志之一。
2. 在HDFS中上传5个(每个不小于100KB)文本文件。
3. 基于Hadoop的MapReduce或者Spark统计以上5个文件中的单词词频(wordcount),结果保存到HDFS中,截图频率最高的50个单词的统计情况。
任务二:
4. 阐述生活或者工作中一个大数据的应用场景,并且提出技术上的详细的解决方案。
三、评分标准
满分100,具体分配如下。
序号 | 评价指标 | 分值分布 | 比例 |
1 | 报告内容完整 | 共4个内容。缺一个扣25分,不完整15分,比较完整20分,完整25分,超额完成附加10分 | 70% |
2 | 报告规范 | 共4个内容。不规范15,比较规范20,规范25 | 30% |
附件一:“大数据”课程大作业封面模板
“大数据”课程大作业
课程名称:大数据
学生姓名:
学生学号:
提交时间:2022-12-10
大数据实验指导书
实验要求
1)每位同学需要准备一台具有至少12G运行内存(16G为佳)的个人电脑才可以进行本课程设置的平时实验以及期末实验;
2)每次作业需自己独立完成,并撰写报告,实验报告并不设置模板,可自行组织,报告内容主要包括:实验目的、实验步骤,实验结果。
1. 实验1:HDP sandbox安装实验
1.1 实验背景
Hortonworks数据平台是一款基于Apache Hadoop的是开源数据平台,提供大数据云存储,大数据处理和分析等服务。该平台是专门用来应对多来源和多格式的数据,并使其处理起来能变成简单、更有成本效益。HDP还提供了一个开放,稳定和高度可扩展的平台,使得更容易地集成Apache Hadoop的数据流业务与现有的数据架构。该平台包括各种的Apache Hadoop项目以及Hadoop分布式文件系统(HDFS)、MapReduce、Pig、Hive、HBase、Zookeeper和其他各种组件,使Hadoop的平台更易于管理,更加具有开放性以及可扩展性。
1.2 实验目的
1. 学习HDP sandbox在虚拟机上的安装流程;
2. 为未来所有大数据课程实验部署大数据环境。
1.3 实验步骤
1. 从老师机器上拷贝hdp sandbox的2.5.0 virtual box版本:HDP_2.5_virtualbox.ova。
2. 从网上下载virtual box(vbox)虚拟机软件,并安装,下载网址:https://www.virtualbox.org/wiki/Downloads。
3. 打开vbox,并选择管理->导入虚拟电脑。
4. 路径选择HDP_2.5_virtualbox.ova所在路径,并配置虚拟机属性,其中CPU至少为4核,内存至少为8192MB,并选择一个大容量的磁盘中的某一个文件夹作为虚拟机的主文件夹。导入即可,导入时间和启动服务时间很长,慢慢等待。
5. 右键点击已经导入的虚拟机,并进行设置,确保网卡设置为NAT,并保存。
6. 双击启动虚拟机,等待虚拟机配置完毕并显示界面:
7. 打开浏览器,打开地址http://127.0.0.1:8888/,出现以下界面。关闭浏览器的弹窗拦截功能,并打开右侧的QUICK LINKS,可以在打开的页面看到关键路径的链接地址以及用户名密码信息。本课程实验主要用到的有三个:
Ambari:hadoop组件资源管理与控制可视化工具,连接地址是http://127.0.0.1:8080,用户名和密码为raj_ops
Zeppelin:网页编程IDE,http://127.0.0.1:9995
SSH:远程终端连接,使用xshell或者secureCRT远程登录虚拟机的linux系统,登录地址为127.0.0.1:2222,初始用户名(密码)为root(hadoop),或者可以直接网页登录终端http://127.0.0.1:4200。第一次登录需要修改密码。
8. 点击Ambari的地址或者在浏览器进入http://127.0.0.1:8080可以看到Ambari登录界面,输入用户名密码登录看到hadoop组件安装与开启情况,以及hadoop集群资源使用
9. 使用xshell/secureCRT或者网页SSH连接虚拟机主机,并修改初始密码,新的密码要稍微复杂一些的,有英文有数字,大于8个字符,可以连接到hdp sandbox虚拟机主机中
10. 输入命令 hdfs dfs -ls /展示hdfs的根目录,出现列出部分文件与文件夹,说明hdp安装成功
2. 实验2:HDFS基础操作实验
2.1 实验背景
HDFS 是 Hadoop Distribute File System 的简称,意为:Hadoop 分布式文件系统。是 Hadoop 核心组件之一,作为最底层的分布式存储服务而存在。HDFS使用Master和Slave结构对集群进行管理。一般一个 HDFS 集群只有一个 Namenode 和一定数目的Datanode组成。Namenode 是 HDFS 集群主节点,Datanode 是 HDFS 集群从节点,两种角色各司其职,共同协调完成分布式的文件存储服务。
1)HDFS集群包括,NameNode和DataNode以及Secondary Namenode。
2)NameNode负责管理整个文件系统的元数据,以及每一个路径(文件)所对应的数据块信息。
3)DataNode 负责管理用户的文件数据块,每一个数据块都可以在多个datanode上存储多个副本。
4)Secondary NameNode用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照。最主要作用是辅助namenode管理元数据信息。
HDFS的基本命令格式为:
命令:hdfs dfs -cmd <args>
注:cmd就是具体的命令,cmd前面的“-”千万不能省略。
Hdp sandbox已经设置了环境变量,则可以在任意的路径下可直接使用hdfs,否则进入hadoop安装路径HADOOP_HOME后,使用如下指令:./bin/hdfs dfs -cmd <args>
2.2 实验目的
1. 学习HDFS基本操作。
2. 学会如何上传与下载HDFS文件。
2.3 实验步骤
1. 列出文件目录
命令格式为hdfs dfs -ls 路径
2. 在HDFS中创建文件夹
命令:hdfs dfs -mkdir 文件夹名称
命令:级联创建一个文件夹,即类似这样一个目录:/mybook/input,则 hdfs fs -mkdir -p 文件夹名称
3. 上传文件至HDFS
命令:hdfs dfs -put 源路径 目标存放路径
4. 从HDFS上下载文件
命令:hdfs dfs -get HDFS文件路径 本地存放路径
5. 查看HDFS上某个文件的内容
命令:hdfs dfs -text(或cat) HDFS上的文件存放路径
6. 统计目录下各文件的大小
命令:hdfs dfs -du (-h) 目录路径
7. 删除HDFS上某个文件或文件夹
命令:hdfs dfs -rm 文件存放文件
hdfs dfs -rm -r 文件存放文件
8. 使用help命令寻求帮助
命令:hdfs dfs -help 命令
9. 查看hdfs容量使用情况
命令:hdfs dfs -df -h
10. 查看hdfs中某个文件大小
命令:hdfs dfs -du -h -s 路径
3. 实验3:Spark基础实验
3.1 实验背景
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。
Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。
Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。
3.2 实验目的
1. 以wordcount为例,学习spark程序开发基础;
2. 了解spark-shell使用;
3. 了解zeppelin使用。
3.3 实验步骤
1. 在ssh中输入spark-shell,进入spark-shell交互命令行开发spark程序,出现scala>说明已经载入完成。
2. 首先学习如何使用spark加载本地文件并计算wordcount。输入val textFile = sc.textFile("file:///root/input_wordcount.txt")读取本地文本文件。
3. 上面代码中,val后面的是变量textFile,而sc.textFile()中的这个textFile是sc的一个方法名称,这个方法用来加载文件数据。这两个textFile不是一个东西,不要混淆。实际上,val后面的是变量textFile,你完全可以换个变量名称,比如,val lines = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)。这里使用相同名称,就是有意强调二者的区别。注意,要加载本地文件,必须采用“file:///”开头的这种格式。执行上上面这条命令以后,并不会马上显示结果,因为,Spark采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。所以,下面我们执行一条“行动”类型的语句,就可以看到结果:输入textFile.first(),查看文件第一行信息
4. first()是一个“行动”(Action)类型的操作,会启动真正的计算过程,从文件中加载数据到变量textFile中,并取出第一行文本。屏幕上会显示很多反馈信息,这里不再给出,你可以从这些结果信息中,找到word.txt文件中的第一行的内容。正因为Spark采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句,spark-shell也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来,比如:val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")
5. 上面我们使用了一个根本就不存在的word123.txt,执行上面语句时,spark-shell根本不会报错,因为,没有遇到“行动”类型的first()操作之前,这个加载操作时不会真正执行的。然后,我们执行一个“行动”类型的操作first(),如下:textFile.first()
执行上面语句后,你会发现,会返回错误信息,因为,这个word123.txt文件根本就不存在。
6. 好了,现在我们可以练习一下如何把textFile变量中的内容再次写回到另外一个文本文件wordback.txt中:textFile.saveAsTextFile("file:///root/writeback")
7. 上面的saveAsTextFile()括号里面的参数是保存文件的路径,不是文件名。saveAsTextFile()是一个“行动”(Action)类型的操作,所以,马上会执行真正的计算过程,从word.txt中加载数据到变量textFile中,然后,又把textFile中的数据写回到本地文件目录“/root/writeback/”下面,现在让我们切换到Linux Shell命令提示符窗口中,执行下面命令:ls /root/writeback
8. 可以看到一半的内容被写入了part-00000中。读取hdfs文件类似,我们尝试读取实验3中上传的/input_wordcount.txt,输入命令:
val textFile = sc.textFile("hdfs://sandbox.hortonworks.com:8020/input_wordcount.txt")
textFile.first()
9. 可以看到读取结果与本地文件一致。需要注意的是,sc.textFile(“hdfs://sandbox.hortonworks.com:8020/input_wordcount.txt”)中,“hdfs://sandbox.hortonworks.com:8020/”是前面介绍Hadoop安装内容时确定下来的端口地址8020以及域名。实际上,也可以省略不写,如下两条语句都是等价的:
val textFile = sc.textFile("hdfs://sandbox.hortonworks.com:8020/input_wordcount.txt")
val textFile = sc.textFile("/input_wordcount.txt")
10. 下面,我们再把textFile的内容写回到HDFS文件系统中:textFile.saveAsTextFile("/writeback")
11. 执行上面命令后,文本内容会被写入到HDFS文件系统的“/writeback”目录下,我们可以切换到Linux Shell命令提示符窗口查看一下:
hdfs dfs -ls /writeback
hdfs dfs -cat /writeback/part-00000
12. 结果与本地文件写回结果一致。下面我们在spark-shell写一个wordcount的程序
val textFile = sc.textFile("hdfs://sandbox.hortonworks.com:8020/input_wordcount.txt")
val wordCount = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCount.collect()
可以看到最后结果中输出了每个词的词频。上面只给出了代码,省略了执行过程中返回的结果信息,因为返回信息很多。
下面简单解释一下上面的语句。
textFile包含了多行文本内容,textFile.flatMap(line => line.split(“ ”))会遍历textFile中的每行文本内容,当遍历到其中一行文本内容时,会把文本内容赋值给变量line,并执行Lamda表达式line => line.split(“ ”)。line => line.split(“ ”)是一个Lamda表达式,左边表示输入参数,右边表示函数里面执行的处理逻辑,这里执行line.split(“ ”),也就是针对line中的一行文本内容,采用空格作为分隔符进行单词切分,从一行文本切分得到很多个单词构成的单词集合。这样,对于textFile中的每行文本,都会使用Lamda表达式得到一个单词集合,最终,多行文本,就得到多个单词集合。textFile.flatMap()操作就把这多个单词集合“拍扁”得到一个大的单词集合。
然后,针对这个大的单词集合,执行map()操作,也就是map(word => (word, 1)),这个map操作会遍历这个集合中的每个单词,当遍历到其中一个单词时,就把当前这个单词赋值给变量word,并执行Lamda表达式word => (word, 1),这个Lamda表达式的含义是,word作为函数的输入参数,然后,执行函数处理逻辑,这里会执行(word, 1),也就是针对输入的word,构建得到一个tuple,形式为(word, 1),key是word,value是1(表示该单词出现1次)。
程序执行到这里,已经得到一个RDD,这个RDD的每个元素是(key, value)形式的tuple。最后,针对这个RDD,执行reduceByKey((a, b) => a + b)操作,这个操作会把所有RDD元素按照key进行分组,然后使用给定的函数(这里就是Lamda表达式:(a, b) => a + b),对具有相同的key的多个value进行reduce操作,返回reduce后的(key, value),比如(“hadoop”, 1)和(“hadoop”, 1),具有相同的key,进行reduce以后就得到(“hadoop”,2),这样就计算得到了这个单词的词频。
13. 我们也可以使用zeppelin进行spark命令的交互,使用浏览器打开http://127.0.0.1:9995/
14. 这里有zeppelin的各种已写入的样例notebook可以学习。我们点击Create new note,notename输入wordcount,点击create note,可以看到新的notebook
15. 由于zeppelin可以写入多种语言,所以需要写spark的时候我们必须在一个cell的第一行写上%spark,在第二行开始写相关代码。那么我们将wordcount的代码写入zeppelin
16. 点击右侧的三角形,或者按住shift+enter执行当前cell,点击后会进入运行状态
17. 当状态变为时,当前cell运行完成,可以看到wordcount结果
18. zeppelin可以进行一些代码的尝试,也可以运行大规模代码,要增加cell的话可以把鼠标放到cell下面,看到+并点击
19. 到此,spark基础开发实验已经完成。如果同学们想要学习如何编译一个spark应用程序,可以前往http://dblab.xmu.edu.cn/blog/931-2/,学习四、Spark独立应用程序编程。
实验4:Spark进阶实验
4.1 实验背景
RDD(Resilient Distributed Datasets,弹性分布式数据集)是一个分区的只读记录的集合。RDD只能通过在稳定的存储器或其他RDD的数据上的确定性操作来创建。我们把这些操作称作变换以区别其他类型的操作。例如 map、filter和join。
RDD在任何时候都不需要被“物化”(进行实际的变换并最终写入稳定的存储器上)。实际上,一个RDD有足够的信息描述着其如何从其他稳定的存储器上的数据生成。它有一个强大的特性:从本质上说,若RDD失效且不能重建,程序将不能引用该RDD。而用户可以控制RDD的其他两个方面:持久化和分区。用户可以选择重用哪个RDD,并为其制定存储策略(比如:内存存储)。也可以让RDD中的数据根据记录的key分布到集群的多个机器。 这对位置优化来说是有用的,比如可用来保证两个要join的数据集都使用了相同的哈希分区方式。
Spark 编程接口,对编程人员通过对稳定存储上的数据进行变换操作(如map和filter)。而得到一个或多个RDD。然后可以调用这些RDD的actions(动作)类的操作。这类操作的目是返回一个值或是将数据导入到存储系统中。动作类的操作如count(返回数据集的元素数),collect(返回元素本身的集合)和save(输出数据集到存储系统)。Spark直到RDD第一次调用一个动作时才真正计算RDD。
还可以调用RDD的persist(持久化)方法来表明该RDD在后续操作中还会用到。默认情况下,Spark会将调用过persist的RDD存在内存中。但若内存不足,也可以将其写入到硬盘上。通过指定persist函数中的参数,用户也可以请求其他持久化策略(如Tachyon)并通过标记来进行persist,比如仅存储到硬盘上或是在各机器之间复制一份。最后,用户可以在每个RDD上设定一个持久化的优先级来指定内存中的哪些数据应该被优先写入到磁盘。 缓存有个缓存管理器,Spark里被称作block manager。注意,这里还有一个误区是,很多人认为调用了cache或者persist的那一刻就是在缓存了,这是完全不对的,真正的缓存执行指挥在action被触发。
4.2 实验目的
1. 进一步学习spark-shell使用;
2. 学习Spark RDD基础api使用。
4.3 实验步骤
1. RDD创建
RDD可以通过两种方式创建:
* 第一种:读取一个外部数据集。比如,从本地文件加载数据集,或者从HDFS文件系统、HBase、Cassandra、Amazon S3等外部数据源中加载数据集。Spark可以支持文本文件、SequenceFile文件(Hadoop提供的 SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件)和其他符合Hadoop InputFormat格式的文件。
* 第二种:调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
2. 从文件系统中加载数据创建RDD
Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是本地文件系统的地址,或者是分布式文件系统HDFS的地址,或者是Amazon S3的地址等等。
下面请切换回spark-shell窗口,看一下如何从文件系统中加载数据:
1)本地文件系统创建RDD:
scala> val lines = sc.textFile("file:///input_wordcount.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///input_wordcount.txt MapPartitionsRDD[12] at textFile at <console>:27
2)Hdfs上创建RDD:
scala> val lines = sc.textFile("hdfs:///input_wordcount.txt")
从执行结果反馈信息可以看出,lines是一个String类型的RDD,或者我们以后可以简单称为RDD[String],也就是说,这个RDD[String]里面的元素都是String类型。
3.通过并行集合(数组)创建RDD
可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。
下面请在spark-shell中操作:
scala>val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)
scala>val rdd = sc.parallelize(array)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29
从执行结果信息可以看出,rdd是一个Int类型的RDD。
上面使用数组来创建,或者,也可以从列表中创建:
scala>val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)
scala>val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29
从执行结果信息可以看出,rdd是一个Int类型的RDD。
4.RDD操作
RDD被创建好以后,在后续使用过程中一般会发生两种操作:
* 转换(Transformation):基于现有的数据集创建一个新的数据集。
* 行动(Action):在数据集上进行运算,返回计算值。
1)转换操作
对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
下面列出一些常见的转换操作(Transformation API):
* filter(func):筛选出满足函数func的元素,并返回一个新的数据集
* map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
* flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
* groupByKey():应用于(K, V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
* reduceByKey(func):应用于(K, V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
2)行动操作
行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
下面列出一些常见的行动操作(Action API):
* count() 返回数据集中的元素个数
* collect() 以数组的形式返回数据集中的所有元素
* first() 返回数据集中的第一个元素
* take(n) 以数组的形式返回数据集中的前n个元素
* reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
* foreach(func) 将数据集中的每个元素传递到函数func中运行*
4.惰性机制
这里给出一段简单的代码来解释Spark的惰性机制。
scala> val lines = sc.textFile("/input_wordcount.txt")
scala> val lineLengths = lines.map(s => s.length)
scala> val totalLength = lineLengths.reduce((a, b) => a + b)
上面第一行首先从外部文件data.txt中构建得到一个RDD,名称为lines,但是,由于textFile()方法只是一个转换操作,因此,这行代码执行后,不会立即把data.txt文件加载到内存中,这时的lines只是一个指向这个文件的指针。
第二行代码用来计算每行的长度(即每行包含多少个单词),同样,由于map()方法只是一个转换操作,这行代码执行后,不会立即计算每行的长度。
第三行代码的reduce()方法是一个“动作”类型的操作,这时,就会触发真正的计算。这时,Spark会把计算分解成多个任务在不同的机器上执行,每台机器运行位于属于它自己的map和reduce,最后把结果返回给Driver Program。
实例
下面我们举几个实例加深了解。
请在spark-shell下执行下面操作。
下面是一个关于filter()操作的实例。
scala> val lines = sc.textFile("/input_wordcount.txt")
lines: org.apache.spark.rdd.RDD[String] = /input_wordcount.txt MapPartitionsRDD[16] at textFile at <console>:27
scala> lines.filter(line => line.contains("hadoop")).count()
res1: Long = 3 //这是执行返回的结果
上面的代码中,lines就是一个RDD。lines.filter()会遍历lines中的每行文本,并对每行文本执行括号中的匿名函数,也就是执行Lamda表达式:line => line.contains(“Spark”),在执行Lamda表达式时,会把当前遍历到的这行文本内容赋值给参数line,然后,执行处理逻辑line.contains(“Spark”),也就是只有当改行文本包含“Spark”才满足条件,才会被放入到结果集中。最后,等到lines集合遍历结束后,就会得到一个结果集,这个结果集中包含了所有包含“Spark”的行。最后,对这个结果集调用count(),这是一个行动操作,会计算出结果集中的元素个数。
这里再给出另外一个实例,我们要找出文本文件中单行文本所包含的单词数量的最大值,代码如下:
scala> val lines = sc.textFile("/input_wordcount.txt")
scala> lines.map(line => line.split(" ").size).reduce((a,b) => if (a>b) a else b)
上面代码中,lines是一个RDD,是String类型的RDD,因为这个RDD里面包含了很多行文本。lines.map(),是一个转换操作,之前说过,map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集,所以,lines.map(line => line.split(“ ”).size)会把每行文本都传递给匿名函数,也就是传递给Lamda表达式line => line.split(” “).size中的line,然后执行处理逻辑line.split(” “).size。line.split(” “).size这个处理逻辑的功能是,对line文本内容进行单词切分,得到很多个单词构成的集合,然后,计算出这个集合中的单词的个数。因此,最终lines.map(line => line.split(” “).size)转换操作得到的RDD,是一个整型RDD,里面每个元素都是整数值(也就是单词的个数)。最后,针对这个RDD[Int],调用reduce()行动操作,完成计算。reduce()操作每次接收两个参数,取出较大者留下,然后再继续比较,例如,RDD[Int]中包含了1,2,3,4,5,那么,执行reduce操作时,首先取出1和2,把a赋值为1,把b赋值为2,然后,执行大小判断,保留2。下一次,让保留下来的2赋值给a,再从RDD[Int]中取出下一个元素3,把3赋值给b,然后,对a和b执行大小判断,保留较大者3.依此类推。最终,reduce()操作会得到最大值是4。
实际上,如果我们把上面的 lines.map(line => line.split(” “).size).reduce((a,b) => if (a>b) a else b)分开逐步执行,你就可以更加清晰地发现每个步骤生成的RDD的类型。
scala> lines.map(line => line.split(" "))
res8: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[19] at map at <console>:30
//从上面执行结果可以发现,lines.map(line => line.split(" "))返回的结果是一个Array[String]类型的RDD,也就是说,这个RDD中的每个元素都是一个Array[String](一行文本被切分成多个单词后就是保存在这个数组中)
scala> lines.map(line => line.split(" ").size)
res9: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[20] at map at <console>:30
//从上面执行结果信息可以发现,lines.map(line => line.split(" ").size)得到的RDD是Int类型的RDD,这个RDD中的每个元素都是一个整数值(也就是一行文本包含的单词数)
scala> lines.map(line => line.split(" ").size).reduce((a,b) => if (a>b) a else b)
res10: Int = 5
5.持久化
前面我们已经说过,在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果整个Spark程序中只有一次行动操作,这当然不会有什么问题。但是,在一些情形下,我们需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
比如,下面就是多次计算同一个DD的例子:
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29
scala> println(rdd.count()) //行动操作,触发一次真正从头到尾的计算
3
scala> println(rdd.collect().mkString(",")) //行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
上面代码执行过程中,前后共触发了两次从头到尾的计算。
实际上,可以通过持久化(缓存)机制避免这种重复计算的开销。可以使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。
persist()的圆括号中包含的是持久化级别参数,比如,persist(MEMORY_ONLY)表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上。一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。
例子如下:
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29
scala> rdd.cache() //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
scala> println(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
3
scala> println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
Hadoop,Spark,Hive
最后,可以使用unpersist()方法手动地把持久化的RDD从缓存中移除。
5.分区
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心(core)数目。
对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:
*本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N;
*Apache Mesos:默认的分区数为8;
*Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值。
因此,对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism,比如:
scala>val array = Array(1,2,3,4,5)
array: Array[Int] = Array(1, 2, 3, 4, 5)
scala>val rdd = sc.parallelize(array,2) #设置两个分区
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29
对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。
如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)。
6.打印元素
在实际编程中,我们经常需要把RDD中的元素打印输出到屏幕上(标准输出stdout),一般会采用语句rdd.foreach(println)或者rdd.map(println)。当采用本地模式(local)在单机上执行时,这些语句会打印出一个RDD中的所有元素。但是,当采用集群模式执行时,在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver Program中,因此,任务控制节点Driver Program中的stdout是不会显示打印语句的这些输出内容的。为了能够把所有worker节点上的打印输出信息也显示到Driver Program中,可以使用collect()方法,比如,rdd.collect().foreach(println),但是,由于collect()方法会把各个worker节点上的所有RDD元素都抓取到Driver Program中,因此,这可能会导致内存溢出。因此,当你只需要打印RDD的部分元素时,可以采用语句rdd.take(100).foreach(println)。
7.其他命令
distinct 去除RDD内的重复数据
scala > var a = sc.parallelize(List("Gnu","Cat","Rat","Dog","Gnu","Rat"),2);
scala > a.distinct.collect
foreach遍历RDD内的数据
scala > var b = sc.parallelize(List("cat","dog","tiger","lion","gnu","crocodile","ant","whale","dolphin","spider"),3)
scala > b.foreach(x=>println(x+"s are yummy"))
first 取的RDD中的第一个数据
scala > var c=sc.parallelize(List("dog","Cat","Rat","Dog"),2)
scala > c.first
max 取得RDD中的最大的数据
scala > var d=sc.parallelize(10 to 30)
scala > d.max
scala > var e = sc.parallelize(List((10,"dog"),(20,"cat"),(30,"tiger"),(18,"lion")))
scala > e.max
intersection 返回两个RDD重叠的数据
scala > var f = sc.parallelize(1 to 20)
scala > var g = sc.parallelize(10 to 30)
scala > var h = f.intersection(g)
scala > h.collect
本文链接:https://daizuozuoye8.com/?id=656
转载声明:本站发布文章及版权归原作者所有,转载本站文章请注明文章来源!
请发表您的评论