实训笔记——Spark计算框架
Spark计算框架
一、Spark的概述
Spark是一个分布式的计算框架,是Hadoop的MapReduce的优化解决方案。Hadoop的MR存在两大核心问题:1、无法进行迭代式计算 2、MR程序是基于磁盘运算,运算效率不高
Spark主要解决了Hadoop的MR存在的问题,Spark是基于内存运算的一种迭代式计算框架
Spark还有一个思想 one stack to rule them all(一栈式解决方案),Spark内置了很多子组件,子组件可以应用于不同的计算场景下,Spark SQL(结构化数据查询)、Spark Streaming(准实时计算)、Spark MLlib(算法)、Spark GraphX(图计算)、Spark R,以上这些子组计都是基于Spark Core开发的。
Spark之所以可以实现基于内存的迭代式计算,主要也是因为Spark Core中的一个核心数据抽象RDD
二、Spark的特点
- 计算快速
- 易用
- 通用
- 兼容
三、Spark的安装部署(安装部署Spark的Cluster Manager-资源调度管理器的)
3.1 本地安装–无资源管理器
3.2 Spark的自带独立调度器Standalone
3.2.1 主从架构的软件
3.2.2 Master/worker
3.2.3 伪分布、完全分布、HA高可用
3.3 Hadoop的YARN
3.4 Apache的Mesos
3.5 K8S容器技术
【注意】:我们在安装部署Spark的资源管理器的同时,也可以安装一个Spark的job history
四、Spark程序的部署运行
Spark部署运行和MR程序的部署运行方式一致的,需要将我们编写的Spark程序打包成为一个jar包,放到我们的Spark集群中,然后通过Spark相关命令启动运行Spark程序即可
spark-submit
--class 全限定类名
--master 运行的资源管理器
--deploy-mode 部署运行的模式
--num-executors 只在yarn模式下使用 指定executor的数量
--executor-cores 指定每一个executor具备多少个CPU内核,一个内核可以运行一个TASK
--executor-memory 每一个executor占用的内存
jar包路径
main函数的args参数列表
五、Spark集群运行中三个核心角色和一些名词
5.1 Driver驱动程序
编写的Spark程序,Spark程序中SparkContext,负责任务调度
5.2 Cluster Manager(资源管理器)
负责分配资源给Spark程序运行的,Spark支持很多资源管理器,YARN、Spark的Standalone、Apache的Mesos
5.3 Executor(执行器)
Spark申请资源的最小单位,每一个executor包含了内存、CPU Core
5.4 Task(任务)
每一个executor内部可以同时启动多个任务,Task就是Spark程序运行的最小单位,一个executor可以运行多少个task取决于cpu core
假如Spark程序总共有100个任务,一般分配30个左右task。
5.5 DAGScheduler
记录RDD之间的依赖关系的,也是用来划分stage阶段
5.6 Stage阶段
核心就是用来划分shuffle阶段的,一个stage阶段可能包含多分RDD的计算的,因此一个stage中包含多个Task的。Spark程序在运行的时候,一个stage的任务调度运行的
5.7 TaskScheduler
任务调度器,Driver驱动程序分配任务给task运行的
5.8 application
Spark应用程序,一个Spark程序可以包含多个job
5.9 job
遇到一个action算子,算子之前的依赖链上的RDD组成一个job
5.10 RDD
rdd就是Spark程序运行的核心,在Spark程序中,无外乎就三种操作:创建RDD、转化RDD、从RDD中获取结果/将结果输出保存
六、Spark的核心基础Spark Core
Spark Core是Spark计算框架的核心基础,Spark中子组件都是基于Spark Core封装而来的。
Spark Core中包含了Spark的运行调度机制、Spark的迭代式计算、基于内存的运算机制
6.1 Spark Core中最核心的有两个概念
6.1.1 SparkContext
SparkContext:Spark的上下文对象,Spark程序的提交运行,任务分配等等都是由SparkContext来完成的。
6.1.2 RDD
RDD:也是Spark最核心最重要的概念,也是Spark中最基础的数据抽象(spark处理的所有数据都会封装称为RDD然后进行处理)
6.2 RDD的属性(RDD具备的一些特征)
6.2.1 一组分区(一组切片)
RDD可分区的数据集,RDD内部的数据是以分区的形式存在,每一个分区的数据可以存储在不同的节点上
6.2.2 一个计算每一个分区(切片)数据的compute函数
RDD计算的时候每一个分区的数据是并行计算的,通过一个函数将计算逻辑封装在分区数据上运行计算
6.2.3 一个用来记录RDD依赖关系的列表
记录RDD的依赖关系,容错机制
6.2.4 一个分区机制(RDD必须得是键值对类型的RDD)
分区器只对键值对类型的RDD生效
6.2.5 一个用来记录分区位置的列表
如果计算程序和数据不在同一个节点上,会把数据移动到计算节点
6.3 RDD的弹性的体现
6.3.1 存储的弹性
RDD数据可以在内存和磁盘之间自由切换
6.3.2 计算的弹性
RDD在计算的时候,stage、task都有可能计算失败,如果失败了stage和task都会进行特定次数的重试,默认重试4次
6.3.3 容错的弹性
RDD计算中如果数据丢失,可以根据依赖链重新计算
6.3.4 分片的弹性
RDD计算中,我们可以根据实际情况,在代码中动态的调整分片
6.4 RDD的特点
6.4.1 分区
6.4.2 只读
RDD是只读的,不可变的,RDD一旦创建,内部不能改变了,只能根据RDD计算返回一个新的RDD,而原有的RDD不受任务的干扰
6.4.3 依赖
-
宽依赖:父RDD的一个分区数据被子RDD的多个分区同时使用,一般在shuffle算子中才会出现
-
窄依赖:父RDD的分区数据只能给子RDD的一个分区
依赖是Spark程序划分stage的核心依据,stage划分规则是从上一个宽依赖算子到下一个宽依赖算子之间的操作都属于同一个stage.
6.4.4 缓存
6.4.5 检查点
6.5 RDD的分类
RDD数据集,内部可以存放各种各样的数据类型,根据存储的数据类型不同,将RDD分为两类:数值类型的RDD(RDD)、键值对类型的RDD(PairRDD)
数值类型的RDD存放的数据类型可以是任何类型,包括键值对类型 RDD[String]、RDD[People]
键值对类型的RDD指的是数据集中存放的数据类型是一个二元组 是一种比较特殊的数值类型的RDD RDD[(String,Int)]、RDD[(Int,(String,Int))],
键值对类型的RDD有它自己独特的一些算子操作,同时键值对类型的RDD可以使用数值类型RDD的所有操作
6.6 RDD的编程
在Spark中,对数据操作其实就是对RDD的操作,对RDD的操作无外乎三种:1、创建RDD 2、从已有的RDD转换得到一个新的RDD 3、从已有的RDD得到相应的结果
RDD的编程方式主要分为两种:命令行编程方式(spark-shell–数据科学、算法研究)、API编程方法(数据处理 java scala python R)
6.6.1 RDD的创建操作
将数据源的数据转换称为Spark中的RDD,RDD的创建主要分为三种:1、从外部存储设备创建RDD(HDFS、Hive、HBase、Kafka、本地文件系统…)2、Scala|Java集合中创建RDD 3、从已有的RDD转换成为一个新的RDD(RDD的转换算子)
-
从集合中创建RDD
函数名 说明 parallelize(Seq[T],num)
makeRDD(Seq[T],num)
底层就是parallelize函数的实现了 makeRDD(Seq[(T, Seq[String])])
这种方式创建的RDD是带有分区编号的 ,集合创建的RDD的分区数就是指定的分区数 parallelize(Seq[T],num)
和makeRDD(Seq[T],num)
:都可以传递一个第二个参数,第二个参数代表的是RDD的并行度(RDD的分区数),默认分区数就是master中设置的cpu核数 -
从外部存储创建RDD
textFile()
wholeTextFile()
6.6.2 RDD的转换操作(转换算子)
RDD之所以可以实现迭代式操作,就是因为RDD中提供了很多算子,算子之间进行操作时,会记录算子之间的依赖关系
RDD中具备一个转换操作的算子,转换算子是用来从一个已有的RDD经过某种操作得到一个新的RDD的,转换算子是惰性计算规则,只有当RDD遇到行动算子,转换算子才会去执行。
6.6.3 算子
算子:就是Spark已经给我们封装好的一些计算规则,只不过这些计算规则内部还需要传入计算逻辑,代码层面上,算子就是需要传入函数的函数。Spark提供了80+个算子。
6.6.4 数值型RDD的转换算子(通用算子)
函数名 | 说明 |
---|---|
map(f:T=>U) | 算子–一对一算子 |
mapPartitions(f:Iterator[T]=>Iterator[U]) | 算子—一对一算子,一个分区的数据统一执行一次map操作 |
mapPartitionsWithIndex(f:(Index,Iterator[T])=>Iterator[U]) | 一对一算子,和mapPartitions算子的逻辑一模一样的,只不过就是多了一个分区编号。 |
filter(f:T=>Boolean) | 算子—过滤算子,对原有RDD的每一个算子应用一个f函数,如果函数返回true,那么数据保留,如果返回false,那么数据舍弃 |
flatmap(f: T => TraversableOnce[U]):RDD[U] | 一对多的算子,一条输入数据可以被映射成为0个或多个数据,最后函数的返回值必须是一个集合类型,最好得到的RDD的类型就是集合元素的类型 |
sample (boolean是否为有放回的抽样,抽取比例,种子-底层抽样算法使用默认值) | 数据量越大,抽取的数据越精准,数据量越小,抽取的数据偏差越大 |
union(RDD[T]):RDD[T] | 将两个RDD中所有数据组合成为一个新的RDD然后返回 |
intersection(RDD[T]):RDD[T] | 将两个RDD取交集返回 |
distinct([numPartitions]))(implicat ordering = null) | 对RDD元素去重,借助元素的equals方法去重的,第二个隐式参数的目的是为了去重之后对数据分区进行排序,如果没有排序规则,不排序了 |
cartesian(RDD[U]) | 生成笛卡尔乘积,在T和U类型的RDD上,列出T和U的所有组合情况,返回一个新的RDD[(T,U)] |
sortBy(T=>U,asc:Boolean=true)(implicit ordering[U]) | 将RDD中T类型转换成为U类型然后对RDD进行排序,返回的还是RDD[T] 【注意】U必须能排序的,两种方式:实现Ordered接口,定义一个隐式类是Ordering[U]的子类 当然我们也可以手动在sortBy函数的第二个括号中传递一个Ordering的匿名内部类 |
6.6.5 键值对类型RDD的转换算子
函数名 | 说明 |
---|---|
groupByKey([numPartitions]) | 根据RDD的键值对数据的key值把Value数据聚合到一起,然后返回一个新的RDD,新的RDD也是kv类型,v变成集合类型 |
join(RDD[(K,W)]) | 和另外一个键值对RDD做inner join操作,返回RDD[(K,(V,W))] |
leftOutJoinrightOutJoinfullOutJoin(RDD(K,W)) 和另外一个RDD做外连接操作 | 左连接:返回RDD[(K,(V,Option[W]))] 右连接:返回RDD[(K,(Option[V],W))] 全外连接:返回RDD[(K,(Option[V],Option[W]))] Option是为了防止空指针异常的,Option的取值有两种:None、Some,如果Option包含的数据不为Null,那么使用Some将数据封装,然后我们可以使用get方法获取里面的值,如果数据为Null,那么使用None将数据封装,不能使用get获取数据 |
cogroup(RDD[(K,W)]) | 返回一个 RDD[(K, (Iterable, Iterable)) ] 将两个RDD中所有key值相同的数据全部聚合到一块,RDD1中相同的Value组成Iterable[V] RDD2中相同的value组成Iterable[W] |
mapValues(f: V => U) :RDD[(K,U)] | 针对KV类型的RDD只对v操作返回一个新的类型,由新的类型和原有的key组成一个新的RDD |
reduceByKey(func: (V, V) => V) | reduceByKey=groupBykey+reduce操作,函数输入数据有两个,输出有一个,输出类型和输出类型是同一个类型 输入的两个v:第一个v是上一次聚合的结果 第二v是本次要聚合的value 输出的v就是本次聚合的结果 |
``combineByKey( createCombiner: V => C,mergeValue (C, V) => C,mergeCombiners: (C, C) => C)` | combiner也是根据key值聚合value,只不过value如何聚合,是什么样的聚合逻辑,我们要通过三个函数说明(比reduceByKey的功能要强大): createCombiner:V=>C 将key值对应得value数据先进行初始化操作,返回一个新的类型 mergeValue:(C,V)=>C 每一个分区都会单独执行一个mergeValue函数,通过mergeValue函数将当前分区的key的value值和刚刚创建的初始值做计算 得到当前分区下的唯一的计算结果,结算结果的类型必须和初始化之后的类型保持一致 mergeCombiners:(C,C)=>C 将所有分区当前key值计算出来的结果C 再进行一次全局的聚合,得到唯一的结果,结果就是我们这个combineByKey的计算结果 返回RDD[(K,C)] |
aggregateByKey(zeroValue:U)(mergerValue(U,V)=>U,mergerCombiner :(U,U)=>U) | aggreGateByKey算子和CombineByKey算子实现的效率是一样的,区别在于初始值不一样的,combineBykey的初始值是根据函数计算来的,是根据每一个分区的一个真实的value数据计算得来的,而aggregateByKey的初始值是我们随意给的。 |
foldByKey(zeroValue:V)(f:(V,V)=>V)) | 相当于是aggregateByKey的简化版,当aggregateByKey的mergeValue和mergeCombiner函数的计算逻辑一致,并且zerovalue初始化类型的值和原先RDD的value的类型一致的时候,就可以使用foldByKey简化。 |
sortByKey(asc:Boolean=true) | 根据键值对kv的key进行排序,默认升序排序, 【注意】key值必须实现了Ordered比较器接口,如果想让排序规则准确,那么你的Ordered接口中排序逻辑必须得是升序逻辑 |
partitionBy(分区器) |
七、【补充】Scala的比较器问题
Java中存在两个比较器用于比较Java类的大小关系,Java的比较器有两个Comparable,Comparator,区别在于Comparable是让Java类必须实现的,Comparator是在使用比较器的时候使用匿名内部类的形式传递比较规则的。
Scala也是面向对象的,Scala中也存在类的概念,类在有些情况下也是必须能比较大小的。Scala也给我们提供了两个比较器,两个比较器是Java两个比较器的子接口。 Ordered 是Comparable的子接口 Ordering 是Comparator的子接口
八、相关代码
package com.sxuek.create
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 1、创建RDD
* 【注意】如果我们想要编写Spark程序,我们必须先创建一个SparkContext,因为Spark程序的提交运行、RDD的创建操作都是由SparkContext完成的。
*/
object ScalaDemo01 {
def main(args: Array[String]): Unit = {
//1、创建SparkContext
val sparkConf:SparkConf = new SparkConf().setAppName("create-rdd").setMaster("local[5]")
val sc:SparkContext = new SparkContext(sparkConf)
//2、集合中创建RDD
// var rdd:RDD[Int] = sc.parallelize(1 until 100)
// println(rdd.getNumPartitions)
// var rdd:RDD[Int] = sc.makeRDD(1 to 100)
// val rdd:RDD[List[Int]] = sc.makeRDD(Array((List(1,2,3),List("node1","node2")),(List(4,5,6),List("node2","node3"))))
// rdd.collect().foreach(println)
//3、从外部存储创建RDD 外部文件必须得是text file 只能读取一个文件
// val rdd1:RDD[String] = sc.textFile("hdfs://single:9000/wc.txt")
// println(rdd1.getNumPartitions)
// rdd1.collect().foreach(println)
val rdd2:RDD[(String,String)] = sc.wholeTextFiles("hdfs://single:9000/user/hive/warehouse/data_analy.db/ods_user_behavior")
rdd2.collect().foreach(println)
//4、关闭SparkContext
sc.stop()
}
}
package com.sxuek.transformation
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 数值型的RDD转换算子
*
*/
object Demo01 {
def main(args: Array[String]): Unit = {
val conf:SparkConf = new SparkConf().setAppName("transformation").setMaster("local[*]")
val sc:SparkContext = new SparkContext(conf)
//创建RDD
val rdd:RDD[String] = sc.makeRDD(List("spark","scala","spark","hadoop"))
/**
* 1、map算子,一对一算子
* 对原有RDD的每一个数据应用一个函数,经过函数计算得到一个新的返回值,新的返回值组成一个新的RDD
* 原有的RDD一个数据通过这个算子返回一个数据
*/
// val rdd1:RDD[(String,Int)] = rdd.map((_,1))
// val rdd1:RDD[(String,Int)] = rdd.mapPartitions(list=>{
// import scala.collection.mutable.ArrayBuffer
// var ab:ArrayBuffer[(String,Int)] = ArrayBuffer()
// for (elem <- list) {
// ab.+=((elem,1))
// }
// ab.iterator
// })
// val rdd1:RDD[(String,Int)] = rdd.mapPartitionsWithIndex((index:Int,list:Iterator[String])=>{
// println(s"现在是第$index 分区的数据,分区数据为${list.mkString(",")}")
// import scala.collection.mutable.ArrayBuffer
// var ab:ArrayBuffer[(String,Int)] = ArrayBuffer()
// for (elem <- list) {
// ab.+=((elem,1))
// }
// ab.iterator
// })
// rdd1.collect().foreach(println)
/**
* 2、filter过滤算子
*/
var rdd1:RDD[String] = rdd.filter((word:String)=>{!word.startsWith("h")})
rdd1.collect().foreach(println)
sc.stop()
}
}
package com.sxuek.transformation
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo02 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("transformation").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val rdd:RDD[String] = sc.makeRDD(Array("spark flink scala","java zookeeper azkaban"))
/**
* flatmap算子
*/
val rdd1:RDD[String] = rdd.flatMap(_.split(" "))
/**
* 从flatmap算子计算结果中,抽取50%的数据
*/
val rdd2:RDD[String] = rdd1.sample(false,0.5)
rdd2.collect().foreach(println)
/**
* union算子
*/
val rdd3:RDD[String] = sc.makeRDD(Array("spark","hadoop","storm","spark"))
val rdd4:RDD[String] = rdd1.union(rdd3)
rdd4.collect().foreach(println)
/**
* intersection
*/
val rdd5:RDD[String] = rdd1.intersection(rdd3)
rdd5.collect().foreach(println)
/**
* distinct([numPartitions]))
*/
val rdd6:RDD[String] = rdd3.distinct()
rdd6.collect().foreach(println)
//scalaBean的去重
val rdd7:RDD[People] = sc.makeRDD(Array(People("zs",20),People("ls",30),People("zs",20)))
val rdd8:RDD[People] = rdd7.distinct(12)((a:People,b:People)=>{
if(a.age == b.age && a.name == b.name){
return 0
}else{
return 1
}
})
rdd8.collect().foreach(println)
// Thread.sleep(1000000000)
sc.stop()
}
}
class People(var name:String,var age:Int) extends Serializable{
override def toString: String = s"People(name=$name,age=$age)"
}
object People{
def apply(name:String,age:Int):People={
new People(name,age)
}
}
package com.sxuek.transformation
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo03 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("kv-transformation").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//创建一个kv键值对rdd
val rdd:RDD[(String,Int)] = sc.makeRDD(Array(("spark",1),("flink",1),("spark",1)))
val rdd1:RDD[(String,Iterable[Int])] = rdd.groupByKey()
rdd1.collect().foreach(println)
/**
* reduceBykey
*/
var rdd2:RDD[(String,Int)] = rdd.reduceByKey((a:Int,b:Int)=>{a+b})
rdd2.collect().foreach(println)
val rdd3:RDD[(String,Int)] = sc.makeRDD(Array(("zs",80),("zs",90),("ls",70),("ls",85)))
val rdd4:RDD[(String,Int)] = rdd3.reduceByKey((a:Int,b:Int)=>{
if(a>b){
a
}else{
b
}
})
rdd4.collect().foreach(println)
sc.stop()
}
}
package com.sxuek.transformation
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Demo04 {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("kv-transformation").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
//创建一个KV RDD
// val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("zs", 80), ("ls", 80), ("ls", 92), ("zs", 90),("zs",75),("ls",72)))
/**
* combineByKey实现类似于ReduceByKey的效果
*/
// val rdd1:RDD[(String,Int)] = rdd.combineByKey((a:Int)=>a,(a:Int,b:Int)=>{
// if(a>b){
// a
// }else{
// b
// }
// },(a:Int,b:Int)=>{
// if (a > b) {
// a
// } else {
// b
// }
// })
// rdd1.collect().foreach(println)
/**
* combineByKey计算科目总成绩以及科目的数量
*/
// val rdd2:RDD[(String,(Int,Int))] = rdd.combineByKey((a:Int)=>(a,1),(a:(Int,Int),b:Int)=>{
// (a._1+b,a._2+1)
// },(a:(Int,Int),b:(Int,Int))=>{
// (a._1+b._1,a._2+b._2)
// })
// val rdd3:RDD[(String,Double)] = rdd2.mapValues((a:(Int,Int))=>{a._1.toDouble/a._2.toDouble})
// rdd3.collect().foreach(println)
/**
*aggregateByKey算子
*/
// val rdd4:RDD[(String,(Int,Int))] = rdd.aggregateByKey((0,0))((a:(Int,Int),b:Int)=>{(a._1+b,a._2+1)},(a:(Int,Int),b:(Int,Int))=>{(a._1+b._1,a._2+b._2)})
// val rdd5:RDD[(String,Double)] = rdd4.map((data:(String,(Int,Int)))=>{
// (data._1,data._2._1.toDouble/data._2._2.toDouble)
// })
// rdd5.collect().foreach(println)
// val rdd6:RDD[(String,Int)] = rdd.aggregateByKey(Int.MaxValue)((a:Int,b:Int)=>{a.min(b)},(a:Int,b:Int)=>a.min(b))
// rdd6.collect().foreach(println)
/**
* foldByKey
*/
// val rdd7:RDD[(String,Int)] = rdd.foldByKey(Int.MaxValue)(_ min _)
// rdd7.collect().foreach(println)
// case class Student(name:String,age:Int)
case class Student(name:String,age:Int) extends Ordered[Student]{
override def compare(that: Student): Int = {
if(age>that.age){
return 1
}else{
return -1;
}
}
}
// val rdd: RDD[(Student, Int)] = sc.makeRDD(Array((Student("zs",20),1),(Student("ls",30),2)))
// val rdd8:RDD[(Student,Int)] = rdd.sortByKey(true)
// rdd8.collect().foreach(println)
val rdd: RDD[(Int, Student)] = sc.makeRDD(Array((1,Student("zs",20)),(2,Student("ls",30))))
// implicit class StudentOrdering(stu:Student) extends Ordering[Student] {
// override def compare(x: Student, y: Student): Int = {
// if(x.age > y.age){
// return 1
// }else{
// -1
// }
// }
// }
val rdd2:RDD[(Int,Student)] = rdd.sortBy((data:(Int,Student))=>{data._2})
rdd2.collect().foreach(println)
sc.stop()
}
}
package com.sxuek.create;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.util.Arrays;
import java.util.List;
public class JavaDemo01 {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("java-rdd").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1,2,3,4,5));
List<Integer> collect = javaRDD.collect();
for (Integer integer : collect) {
System.out.println(integer);
}
jsc.stop();
}
}
package com.sxuek.create;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.Arrays;
import java.util.List;
public class JavaDemo02 {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("java-rdd").setMaster("local[*]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1,2,3,4,5));
JavaRDD<Integer> javaRDD1 = javaRDD.map(a->{return a*3;});
jsc.stop();
}
}