实训笔记——Spark计算框架

2023-09-18 23:12:51

实训笔记——Spark计算框架

Spark计算框架

一、Spark的概述

Spark是一个分布式的计算框架,是Hadoop的MapReduce的优化解决方案。Hadoop的MR存在两大核心问题:1、无法进行迭代式计算 2、MR程序是基于磁盘运算,运算效率不高

Spark主要解决了Hadoop的MR存在的问题,Spark是基于内存运算的一种迭代式计算框架

Spark还有一个思想 one stack to rule them all(一栈式解决方案),Spark内置了很多子组件,子组件可以应用于不同的计算场景下,Spark SQL(结构化数据查询)、Spark Streaming(准实时计算)、Spark MLlib(算法)、Spark GraphX(图计算)、Spark R,以上这些子组计都是基于Spark Core开发的。

Spark之所以可以实现基于内存的迭代式计算,主要也是因为Spark Core中的一个核心数据抽象RDD

二、Spark的特点

  1. 计算快速
  2. 易用
  3. 通用
  4. 兼容

三、Spark的安装部署(安装部署Spark的Cluster Manager-资源调度管理器的)

3.1 本地安装–无资源管理器

3.2 Spark的自带独立调度器Standalone

3.2.1 主从架构的软件
3.2.2 Master/worker
3.2.3 伪分布、完全分布、HA高可用

3.3 Hadoop的YARN

3.4 Apache的Mesos

3.5 K8S容器技术

【注意】:我们在安装部署Spark的资源管理器的同时,也可以安装一个Spark的job history

四、Spark程序的部署运行

Spark部署运行和MR程序的部署运行方式一致的,需要将我们编写的Spark程序打包成为一个jar包,放到我们的Spark集群中,然后通过Spark相关命令启动运行Spark程序即可

spark-submit         
	--class   全限定类名        
	--master  运行的资源管理器       
	--deploy-mode  部署运行的模式          
	--num-executors   只在yarn模式下使用  指定executor的数量      
	--executor-cores   指定每一个executor具备多少个CPU内核,一个内核可以运行一个TASK      
	--executor-memory   每一个executor占用的内存      
	jar包路径      
	main函数的args参数列表

五、Spark集群运行中三个核心角色和一些名词

5.1 Driver驱动程序

编写的Spark程序,Spark程序中SparkContext,负责任务调度

5.2 Cluster Manager(资源管理器)

负责分配资源给Spark程序运行的,Spark支持很多资源管理器,YARN、Spark的Standalone、Apache的Mesos

5.3 Executor(执行器)

Spark申请资源的最小单位,每一个executor包含了内存、CPU Core

5.4 Task(任务)

每一个executor内部可以同时启动多个任务,Task就是Spark程序运行的最小单位,一个executor可以运行多少个task取决于cpu core

假如Spark程序总共有100个任务,一般分配30个左右task。

5.5 DAGScheduler

记录RDD之间的依赖关系的,也是用来划分stage阶段

5.6 Stage阶段

核心就是用来划分shuffle阶段的,一个stage阶段可能包含多分RDD的计算的,因此一个stage中包含多个Task的。Spark程序在运行的时候,一个stage的任务调度运行的

5.7 TaskScheduler

任务调度器,Driver驱动程序分配任务给task运行的

5.8 application

Spark应用程序,一个Spark程序可以包含多个job

5.9 job

遇到一个action算子,算子之前的依赖链上的RDD组成一个job

5.10 RDD

rdd就是Spark程序运行的核心,在Spark程序中,无外乎就三种操作:创建RDD、转化RDD、从RDD中获取结果/将结果输出保存

六、Spark的核心基础Spark Core

Spark Core是Spark计算框架的核心基础,Spark中子组件都是基于Spark Core封装而来的。

Spark Core中包含了Spark的运行调度机制、Spark的迭代式计算、基于内存的运算机制

6.1 Spark Core中最核心的有两个概念

6.1.1 SparkContext

SparkContext:Spark的上下文对象,Spark程序的提交运行,任务分配等等都是由SparkContext来完成的。

6.1.2 RDD

RDD:也是Spark最核心最重要的概念,也是Spark中最基础的数据抽象(spark处理的所有数据都会封装称为RDD然后进行处理)

6.2 RDD的属性(RDD具备的一些特征)

6.2.1 一组分区(一组切片)

RDD可分区的数据集,RDD内部的数据是以分区的形式存在,每一个分区的数据可以存储在不同的节点上

6.2.2 一个计算每一个分区(切片)数据的compute函数

RDD计算的时候每一个分区的数据是并行计算的,通过一个函数将计算逻辑封装在分区数据上运行计算

6.2.3 一个用来记录RDD依赖关系的列表

记录RDD的依赖关系,容错机制

6.2.4 一个分区机制(RDD必须得是键值对类型的RDD)

分区器只对键值对类型的RDD生效

6.2.5 一个用来记录分区位置的列表

如果计算程序和数据不在同一个节点上,会把数据移动到计算节点

6.3 RDD的弹性的体现

6.3.1 存储的弹性

RDD数据可以在内存和磁盘之间自由切换

6.3.2 计算的弹性

RDD在计算的时候,stage、task都有可能计算失败,如果失败了stage和task都会进行特定次数的重试,默认重试4次

6.3.3 容错的弹性

RDD计算中如果数据丢失,可以根据依赖链重新计算

6.3.4 分片的弹性

RDD计算中,我们可以根据实际情况,在代码中动态的调整分片

6.4 RDD的特点

6.4.1 分区
6.4.2 只读

RDD是只读的,不可变的,RDD一旦创建,内部不能改变了,只能根据RDD计算返回一个新的RDD,而原有的RDD不受任务的干扰

6.4.3 依赖
  1. 宽依赖:父RDD的一个分区数据被子RDD的多个分区同时使用,一般在shuffle算子中才会出现

  2. 窄依赖:父RDD的分区数据只能给子RDD的一个分区

    依赖是Spark程序划分stage的核心依据,stage划分规则是从上一个宽依赖算子到下一个宽依赖算子之间的操作都属于同一个stage.

6.4.4 缓存
6.4.5 检查点

6.5 RDD的分类

RDD数据集,内部可以存放各种各样的数据类型,根据存储的数据类型不同,将RDD分为两类:数值类型的RDD(RDD)、键值对类型的RDD(PairRDD)

数值类型的RDD存放的数据类型可以是任何类型,包括键值对类型 RDD[String]、RDD[People]

键值对类型的RDD指的是数据集中存放的数据类型是一个二元组 是一种比较特殊的数值类型的RDD RDD[(String,Int)]、RDD[(Int,(String,Int))],

键值对类型的RDD有它自己独特的一些算子操作,同时键值对类型的RDD可以使用数值类型RDD的所有操作

6.6 RDD的编程

在Spark中,对数据操作其实就是对RDD的操作,对RDD的操作无外乎三种:1、创建RDD 2、从已有的RDD转换得到一个新的RDD 3、从已有的RDD得到相应的结果

RDD的编程方式主要分为两种:命令行编程方式(spark-shell–数据科学、算法研究)、API编程方法(数据处理 java scala python R)

6.6.1 RDD的创建操作

将数据源的数据转换称为Spark中的RDD,RDD的创建主要分为三种:1、从外部存储设备创建RDD(HDFS、Hive、HBase、Kafka、本地文件系统…)2、Scala|Java集合中创建RDD 3、从已有的RDD转换成为一个新的RDD(RDD的转换算子)

  1. 从集合中创建RDD

    函数名说明
    parallelize(Seq[T],num)
    makeRDD(Seq[T],num)底层就是parallelize函数的实现了
    makeRDD(Seq[(T, Seq[String])])这种方式创建的RDD是带有分区编号的 ,集合创建的RDD的分区数就是指定的分区数

    parallelize(Seq[T],num)makeRDD(Seq[T],num):都可以传递一个第二个参数,第二个参数代表的是RDD的并行度(RDD的分区数),默认分区数就是master中设置的cpu核数

  2. 从外部存储创建RDD

textFile()

wholeTextFile()

6.6.2 RDD的转换操作(转换算子)

RDD之所以可以实现迭代式操作,就是因为RDD中提供了很多算子,算子之间进行操作时,会记录算子之间的依赖关系

RDD中具备一个转换操作的算子,转换算子是用来从一个已有的RDD经过某种操作得到一个新的RDD的,转换算子是惰性计算规则,只有当RDD遇到行动算子,转换算子才会去执行。

6.6.3 算子

算子:就是Spark已经给我们封装好的一些计算规则,只不过这些计算规则内部还需要传入计算逻辑,代码层面上,算子就是需要传入函数的函数。Spark提供了80+个算子。

6.6.4 数值型RDD的转换算子(通用算子)
函数名说明
map(f:T=>U)算子–一对一算子
mapPartitions(f:Iterator[T]=>Iterator[U])算子—一对一算子,一个分区的数据统一执行一次map操作
mapPartitionsWithIndex(f:(Index,Iterator[T])=>Iterator[U])一对一算子,和mapPartitions算子的逻辑一模一样的,只不过就是多了一个分区编号。
filter(f:T=>Boolean)算子—过滤算子,对原有RDD的每一个算子应用一个f函数,如果函数返回true,那么数据保留,如果返回false,那么数据舍弃
flatmap(f: T => TraversableOnce[U]):RDD[U]一对多的算子,一条输入数据可以被映射成为0个或多个数据,最后函数的返回值必须是一个集合类型,最好得到的RDD的类型就是集合元素的类型
sample(boolean是否为有放回的抽样,抽取比例,种子-底层抽样算法使用默认值)数据量越大,抽取的数据越精准,数据量越小,抽取的数据偏差越大
union(RDD[T]):RDD[T]将两个RDD中所有数据组合成为一个新的RDD然后返回
intersection(RDD[T]):RDD[T]将两个RDD取交集返回
distinct([numPartitions]))(implicat ordering = null)对RDD元素去重,借助元素的equals方法去重的,第二个隐式参数的目的是为了去重之后对数据分区进行排序,如果没有排序规则,不排序了
cartesian(RDD[U])生成笛卡尔乘积,在T和U类型的RDD上,列出T和U的所有组合情况,返回一个新的RDD[(T,U)]
sortBy(T=>U,asc:Boolean=true)(implicit ordering[U])将RDD中T类型转换成为U类型然后对RDD进行排序,返回的还是RDD[T] 【注意】U必须能排序的,两种方式:实现Ordered接口,定义一个隐式类是Ordering[U]的子类 当然我们也可以手动在sortBy函数的第二个括号中传递一个Ordering的匿名内部类
6.6.5 键值对类型RDD的转换算子
函数名说明
groupByKey([numPartitions])根据RDD的键值对数据的key值把Value数据聚合到一起,然后返回一个新的RDD,新的RDD也是kv类型,v变成集合类型
join(RDD[(K,W)])和另外一个键值对RDD做inner join操作,返回RDD[(K,(V,W))]
leftOutJoinrightOutJoinfullOutJoin(RDD(K,W))和另外一个RDD做外连接操作左连接:返回RDD[(K,(V,Option[W]))]
右连接:返回RDD[(K,(Option[V],W))]
全外连接:返回RDD[(K,(Option[V],Option[W]))]
Option是为了防止空指针异常的,Option的取值有两种:None、Some,如果Option包含的数据不为Null,那么使用Some将数据封装,然后我们可以使用get方法获取里面的值,如果数据为Null,那么使用None将数据封装,不能使用get获取数据
cogroup(RDD[(K,W)])返回一个 RDD[(K, (Iterable, Iterable)) ] 将两个RDD中所有key值相同的数据全部聚合到一块,RDD1中相同的Value组成Iterable[V] RDD2中相同的value组成Iterable[W]
mapValues(f: V => U) :RDD[(K,U)]针对KV类型的RDD只对v操作返回一个新的类型,由新的类型和原有的key组成一个新的RDD
reduceByKey(func: (V, V) => V)reduceByKey=groupBykey+reduce操作,函数输入数据有两个,输出有一个,输出类型和输出类型是同一个类型 输入的两个v:第一个v是上一次聚合的结果 第二v是本次要聚合的value 输出的v就是本次聚合的结果
``combineByKey( createCombiner: V => C,mergeValue (C, V) => C,mergeCombiners: (C, C) => C)`combiner也是根据key值聚合value,只不过value如何聚合,是什么样的聚合逻辑,我们要通过三个函数说明(比reduceByKey的功能要强大):
createCombiner:V=>C 将key值对应得value数据先进行初始化操作,返回一个新的类型
mergeValue:(C,V)=>C 每一个分区都会单独执行一个mergeValue函数,通过mergeValue函数将当前分区的key的value值和刚刚创建的初始值做计算 得到当前分区下的唯一的计算结果,结算结果的类型必须和初始化之后的类型保持一致
mergeCombiners:(C,C)=>C 将所有分区当前key值计算出来的结果C 再进行一次全局的聚合,得到唯一的结果,结果就是我们这个combineByKey的计算结果 返回RDD[(K,C)]
aggregateByKey(zeroValue:U)(mergerValue(U,V)=>U,mergerCombiner:(U,U)=>U)aggreGateByKey算子和CombineByKey算子实现的效率是一样的,区别在于初始值不一样的,combineBykey的初始值是根据函数计算来的,是根据每一个分区的一个真实的value数据计算得来的,而aggregateByKey的初始值是我们随意给的。
foldByKey(zeroValue:V)(f:(V,V)=>V))相当于是aggregateByKey的简化版,当aggregateByKey的mergeValue和mergeCombiner函数的计算逻辑一致,并且zerovalue初始化类型的值和原先RDD的value的类型一致的时候,就可以使用foldByKey简化。
sortByKey(asc:Boolean=true)根据键值对kv的key进行排序,默认升序排序, 【注意】key值必须实现了Ordered比较器接口,如果想让排序规则准确,那么你的Ordered接口中排序逻辑必须得是升序逻辑
partitionBy(分区器)

七、【补充】Scala的比较器问题

Java中存在两个比较器用于比较Java类的大小关系,Java的比较器有两个Comparable,Comparator,区别在于Comparable是让Java类必须实现的,Comparator是在使用比较器的时候使用匿名内部类的形式传递比较规则的。

Scala也是面向对象的,Scala中也存在类的概念,类在有些情况下也是必须能比较大小的。Scala也给我们提供了两个比较器,两个比较器是Java两个比较器的子接口。 Ordered 是Comparable的子接口 Ordering 是Comparator的子接口

八、相关代码

package com.sxuek.create

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 1、创建RDD
 * 【注意】如果我们想要编写Spark程序,我们必须先创建一个SparkContext,因为Spark程序的提交运行、RDD的创建操作都是由SparkContext完成的。
 */
object ScalaDemo01 {
  def main(args: Array[String]): Unit = {
    //1、创建SparkContext
    val sparkConf:SparkConf = new SparkConf().setAppName("create-rdd").setMaster("local[5]")
    val sc:SparkContext = new SparkContext(sparkConf)

    //2、集合中创建RDD
//    var rdd:RDD[Int] = sc.parallelize(1 until 100)
//    println(rdd.getNumPartitions)
//    var rdd:RDD[Int] = sc.makeRDD(1 to 100)
//    val rdd:RDD[List[Int]] = sc.makeRDD(Array((List(1,2,3),List("node1","node2")),(List(4,5,6),List("node2","node3"))))
//    rdd.collect().foreach(println)

    //3、从外部存储创建RDD  外部文件必须得是text file  只能读取一个文件
//    val rdd1:RDD[String] = sc.textFile("hdfs://single:9000/wc.txt")
//    println(rdd1.getNumPartitions)
//    rdd1.collect().foreach(println)

    val rdd2:RDD[(String,String)] = sc.wholeTextFiles("hdfs://single:9000/user/hive/warehouse/data_analy.db/ods_user_behavior")

    rdd2.collect().foreach(println)
    //4、关闭SparkContext
    sc.stop()

  }
}
package com.sxuek.transformation

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 数值型的RDD转换算子
 *
 */
object Demo01 {
  def main(args: Array[String]): Unit = {
     val conf:SparkConf = new SparkConf().setAppName("transformation").setMaster("local[*]")
     val sc:SparkContext = new SparkContext(conf)
     //创建RDD
     val rdd:RDD[String] = sc.makeRDD(List("spark","scala","spark","hadoop"))

    /**
     * 1、map算子,一对一算子
     *    对原有RDD的每一个数据应用一个函数,经过函数计算得到一个新的返回值,新的返回值组成一个新的RDD
     *    原有的RDD一个数据通过这个算子返回一个数据
     */
//    val rdd1:RDD[(String,Int)] = rdd.map((_,1))
//    val rdd1:RDD[(String,Int)] = rdd.mapPartitions(list=>{
//      import scala.collection.mutable.ArrayBuffer
//      var ab:ArrayBuffer[(String,Int)] = ArrayBuffer()
//      for (elem <- list) {
//        ab.+=((elem,1))
//      }
//      ab.iterator
//    })
//    val rdd1:RDD[(String,Int)] = rdd.mapPartitionsWithIndex((index:Int,list:Iterator[String])=>{
//      println(s"现在是第$index 分区的数据,分区数据为${list.mkString(",")}")
//      import scala.collection.mutable.ArrayBuffer
//      var ab:ArrayBuffer[(String,Int)] = ArrayBuffer()
//      for (elem <- list) {
//        ab.+=((elem,1))
//      }
//      ab.iterator
//    })
//    rdd1.collect().foreach(println)
    /**
     * 2、filter过滤算子
     */
    var rdd1:RDD[String] = rdd.filter((word:String)=>{!word.startsWith("h")})
    rdd1.collect().foreach(println)


    sc.stop()
  }
}
package com.sxuek.transformation

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo02 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("transformation").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    val rdd:RDD[String] = sc.makeRDD(Array("spark flink scala","java zookeeper azkaban"))
    /**
     * flatmap算子
     */
    val rdd1:RDD[String] = rdd.flatMap(_.split(" "))

    /**
     * 从flatmap算子计算结果中,抽取50%的数据
     */
    val rdd2:RDD[String] = rdd1.sample(false,0.5)
    rdd2.collect().foreach(println)

    /**
     * union算子
     */
    val rdd3:RDD[String] = sc.makeRDD(Array("spark","hadoop","storm","spark"))
    val rdd4:RDD[String] = rdd1.union(rdd3)
    rdd4.collect().foreach(println)

    /**
     * intersection
     */
    val rdd5:RDD[String] = rdd1.intersection(rdd3)
    rdd5.collect().foreach(println)


    /**
     * distinct([numPartitions]))
     */
    val rdd6:RDD[String] = rdd3.distinct()
    rdd6.collect().foreach(println)

    //scalaBean的去重

    val rdd7:RDD[People] = sc.makeRDD(Array(People("zs",20),People("ls",30),People("zs",20)))
    val rdd8:RDD[People] = rdd7.distinct(12)((a:People,b:People)=>{
      if(a.age == b.age && a.name == b.name){
        return 0
      }else{
        return 1
      }
    })
    rdd8.collect().foreach(println)



    //    Thread.sleep(1000000000)
    sc.stop()
  }
}
class People(var name:String,var age:Int) extends Serializable{
  override def toString: String = s"People(name=$name,age=$age)"
}
object People{
  def apply(name:String,age:Int):People={
    new People(name,age)
  }
}
package com.sxuek.transformation

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo03 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("kv-transformation").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    //创建一个kv键值对rdd
    val rdd:RDD[(String,Int)] = sc.makeRDD(Array(("spark",1),("flink",1),("spark",1)))
    val rdd1:RDD[(String,Iterable[Int])] = rdd.groupByKey()
    rdd1.collect().foreach(println)

    /**
     * reduceBykey
     */
    var rdd2:RDD[(String,Int)] = rdd.reduceByKey((a:Int,b:Int)=>{a+b})
    rdd2.collect().foreach(println)
    val rdd3:RDD[(String,Int)] = sc.makeRDD(Array(("zs",80),("zs",90),("ls",70),("ls",85)))
    val rdd4:RDD[(String,Int)] = rdd3.reduceByKey((a:Int,b:Int)=>{
      if(a>b){
         a
      }else{
         b
      }
    })
    rdd4.collect().foreach(println)
    sc.stop()
  }
}

package com.sxuek.transformation

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Demo04 {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("kv-transformation").setMaster("local[*]")
    val sc: SparkContext = new SparkContext(conf)
    //创建一个KV RDD
//    val rdd: RDD[(String, Int)] = sc.makeRDD(Array(("zs", 80), ("ls", 80), ("ls", 92), ("zs", 90),("zs",75),("ls",72)))

    /**
     * combineByKey实现类似于ReduceByKey的效果
     */
    //    val rdd1:RDD[(String,Int)] = rdd.combineByKey((a:Int)=>a,(a:Int,b:Int)=>{
//      if(a>b){
//        a
//      }else{
//        b
//      }
//    },(a:Int,b:Int)=>{
//      if (a > b) {
//        a
//      } else {
//        b
//      }
//    })
//    rdd1.collect().foreach(println)
    /**
     * combineByKey计算科目总成绩以及科目的数量
     */
//    val rdd2:RDD[(String,(Int,Int))] = rdd.combineByKey((a:Int)=>(a,1),(a:(Int,Int),b:Int)=>{
//      (a._1+b,a._2+1)
//    },(a:(Int,Int),b:(Int,Int))=>{
//      (a._1+b._1,a._2+b._2)
//    })
//    val rdd3:RDD[(String,Double)] = rdd2.mapValues((a:(Int,Int))=>{a._1.toDouble/a._2.toDouble})
//    rdd3.collect().foreach(println)

    /**
     *aggregateByKey算子
     */
//    val rdd4:RDD[(String,(Int,Int))] = rdd.aggregateByKey((0,0))((a:(Int,Int),b:Int)=>{(a._1+b,a._2+1)},(a:(Int,Int),b:(Int,Int))=>{(a._1+b._1,a._2+b._2)})
//    val rdd5:RDD[(String,Double)] = rdd4.map((data:(String,(Int,Int)))=>{
//      (data._1,data._2._1.toDouble/data._2._2.toDouble)
//    })
//    rdd5.collect().foreach(println)
//    val rdd6:RDD[(String,Int)] = rdd.aggregateByKey(Int.MaxValue)((a:Int,b:Int)=>{a.min(b)},(a:Int,b:Int)=>a.min(b))
//    rdd6.collect().foreach(println)

    /**
     * foldByKey
     */
//    val rdd7:RDD[(String,Int)] = rdd.foldByKey(Int.MaxValue)(_ min _)
//    rdd7.collect().foreach(println)
//    case class Student(name:String,age:Int)
    case class Student(name:String,age:Int) extends Ordered[Student]{
      override def compare(that: Student): Int = {
        if(age>that.age){
          return 1
        }else{
          return -1;
        }
      }
    }
//    val rdd: RDD[(Student, Int)] = sc.makeRDD(Array((Student("zs",20),1),(Student("ls",30),2)))
//    val rdd8:RDD[(Student,Int)] = rdd.sortByKey(true)
//    rdd8.collect().foreach(println)
    val rdd: RDD[(Int, Student)] = sc.makeRDD(Array((1,Student("zs",20)),(2,Student("ls",30))))
//    implicit class StudentOrdering(stu:Student) extends Ordering[Student] {
//      override def compare(x: Student, y: Student): Int = {
//        if(x.age > y.age){
//          return 1
//        }else{
//          -1
//        }
//      }
//    }
    val rdd2:RDD[(Int,Student)] = rdd.sortBy((data:(Int,Student))=>{data._2})
    rdd2.collect().foreach(println)
    sc.stop()
  }

}



package com.sxuek.create;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.util.Arrays;
import java.util.List;

public class JavaDemo01 {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("java-rdd").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1,2,3,4,5));
        List<Integer> collect = javaRDD.collect();
        for (Integer integer : collect) {
            System.out.println(integer);
        }

        jsc.stop();

    }
}
package com.sxuek.create;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.Arrays;
import java.util.List;

public class JavaDemo02 {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("java-rdd").setMaster("local[*]");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        JavaRDD<Integer> javaRDD = jsc.parallelize(Arrays.asList(1,2,3,4,5));
        JavaRDD<Integer> javaRDD1 = javaRDD.map(a->{return a*3;});

        jsc.stop();

    }
}
更多推荐

MVCC:多版本并发控制案例分析(二)

(笔记总结自b站马士兵教育课程)本文主要分析readview的案例。一、简介readview:表示事务进行快照读操作的时候产生的读视图,在该事务进行快照读的那一刻会生成一个系统当前的快照,但是此时的快照不是数据的快照,而是事务相关信息的快照。trx_listreadview生成时刻当前系统活跃的事务idup_limit

华为认证HCIP知识点

文章目录前言考试内容数据通信领域各场景通用核心知识OSPF知识点IS-IS知识点BGP知识点IGMP知识点ICMP知识点数据通信领域路由交换高阶知识总结前言本博客仅做学习笔记,如有侵权,联系后即刻更改科普:考试内容参考网址HCIP认证主要定位于中小型网络的规划、设计、配置与维护,包含网络基础、常见接口与电缆、以太网交换

数据结构学习笔记—— 排序算法总结【ヾ(≧▽≦*)o所有的排序算法考点看这一篇你就懂啦!!!】

目录一、排序算法总结(一)排序算法分类(二)表格比较二、详细分析(最重要考点!!!)(一)稳定性(二)时间复杂度(三)空间复杂度(四)比较次数(五)平均比较次数(六)排序趟数(七)根据规模选择排序算法(八)每趟确定的元素最终位置(九)存储方式的选择一、排序算法总结常用排序算法如下:#mermaid-svg-nyHNG1

驱动开发概念详解

1、什么是驱动能够驱使硬件实现特定功能的软件代码,可以根据驱动程序是否依赖于系统内核将其分为裸机驱动和系统驱动1.1裸机驱动编写的驱动代码中没有进行任何内核相关的API调用,开发者查询资料配置寄存器完成硬件相关控制,不依赖于系统内核,由开发者独立完成,相对而言比较简单。1.2系统驱动系统驱动指的是编写的驱动代码中需要调

了解JVM

一.了解JVM1.1什么是JVMJVM是JavaVirtualMachine(Java虚拟机)的缩写,是一个虚构出来的计算机,是通过在实际的计算机上仿真模拟计算机功能来实现的,JVM屏蔽了与具体操作系统平台相关的信息,Java程序只需生成在Java虚拟机上运行的字节码,就可以在多种平台上不加修改的运行。JVM在执行字节

PyTorch之张量的相关操作大全 ->(个人学习记录笔记)

文章目录Torch1.张量的创建1.1直接创建1.1.1`torch.tensor`1.1.2`torch.from_numpy(ndarray)`1.2依据数值创建1.2.1`torch.zeros`1.2.2`torch.zeros_like`1.2.3`torch.ones`1.2.4`torch.ones_li

人工智能的未来:从 Jetson 到 GPT,沙龙见闻与洞察

前言在当今数字化时代,人工智能正以惊人的速度改变着我们的生活和工作方式。从智能语音助手到自动驾驶汽车,从智能家居到医疗诊断,人工智能技术已经广泛渗透到各个行业,并为其带来了巨大的变革和创新。越来越多的行业专家、学者和从业者积极参与到人工智能与行业应用实践中,为了进一步推动人工智能的发展和应用。活动介绍本次活动是由Mic

GPT引领前沿与应用突破之GPT4科研实践技术与AI绘图

GPT对于每个科研人员已经成为不可或缺的辅助工具,不同的研究领域和项目具有不同的需求。如在科研编程、绘图领域:1、编程建议和示例代码:无论你使用的编程语言是Python、R、MATLAB还是其他语言,都可以为你提供相关的代码示例。2、数据可视化:生成各种类型的数据可视化图表,如折线图、柱状图、散点图、饼图、热力图等。提

Leetcode.2522 将字符串分割成值不超过 K 的子字符串

题目链接Leetcode.2522将字符串分割成值不超过K的子字符串rating:1605题目描述给你一个字符串sss,它每一位都是111到999之间的数字组成,同时给你一个整数kkk。如果一个字符串sss的分割满足以下条件,我们称它是一个好分割:sss中每个数位恰好属于一个子字符串。每个子字符串的值都小于等于kkk。

Mysql003:基础查询

目录:1.基本查询2.条件查询(where)3.聚合函数(count、max、min、avg、sum)4.分组查询(groupby)5.分组后查询(having)6.排序查询(orderby)7.分页查询(limit)1.基本语法SELECT字段FROM表名WHERE条件GROUPBY分组HAVING分组后条件ORDE

用动态ip登录账号的风险高不高?

使用动态ip登录账号在一定程度上提供了额外的安全保障和匿名性,但与此同时也存在一些风险和风控挑战。本文将解密使用动态ip登录账号的真相,明确安全与风险的并存之道。1、增强隐私保护:使用动态ip登录账号可以隐藏您的真实IP地址,增强个人隐私保护。这使得恶意ddos者难以追踪您的真实身份和位置,为账号安全提供一定的保障。对

热文推荐