登陆

大数据开发Spark的Transformation与Action开发实战(第三十一篇)

admin 2022-11-24 12人围观 ,发现0个评论

携手创作,共同成长!这是我参与「掘金日新计划 · 12 月更文挑战」的第8天,点击查看活动详情

一、创建RDD的三种方式

  1. RDD是Spark编程的核心,在进行Spark编程时,首要任务是创建一个初识的RDD

  2. Spark提供了三种创建RDD的方式:集合、本地文件、HDFS文件

    • 集合:用于测试
    • 本机文件:临时去处理一些存储了大量数据的文件
    • HDFS文件:常用的生产环境处理方式
1.1、使用集合创建RDD
  1. 通过SparkContext的parallelize()方法将集合转化为RDD
  2. 可以通过parallelize()方法的第二个参数来设置RDD的partition数量,Spark会为每一个partition运行一个task来进行处理
package com.strivelearn.scala
​
import org.apache.spark.{SparkConf, SparkContext}
​
/**
 * @author strivelearn
 * @version CreateRddByArray.java, 2022年11月19日
 */
object CreateRddByArrayScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByArrayScala")
      .setMaster("local")
    val context = new SparkContext(conf)
​
    //创建集合。这个也是在driver进程中执行。只有调用spark的api的时候才是在集群中执行
    val array = Array(1, 2, 3, 4, 5)
    //parallelize、reduce是在集群中执行的
    val rdd = context.parallelize(array)
    //对集合中的数据求和
    val sum = rdd.reduce(_ + _)
    //注意:这行代码是在driver进程中执行的
    println(sum)
  }
}
复制代码

java代码执行

package com.strivelearn.java;
​
import java.util.Arrays;
import java.util.List;
​
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.ReduceFunction;
​
/**
 * @author xys
 * @version CreateRddByArrayJava.java, 2022年11月19日
 */
public class CreateRddByArrayJava {
    public static void main(String[] args) {
        //1.创建sparkContext
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("CreateRddByArrayJava");
        sparkConf.setMaster("local");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        //创建集合
        List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
        JavaRDD<Integer> rdd = javaSparkContext.parallelize(integers);
        Integer sum = rdd.reduce(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        System.out.println("sum = " + sum);
    }
}
复制代码
1.2、使用本地文件和HDFS文件创建RDD
  1. 通过SparkContext的textFile方法,可以针对本地文件或者HDFS文件创建RDD
  2. textFile方法支持针对目录、压缩文件以及通配符创建RDD
  3. Spark默认会为HDFS文件的每一个Block创建一个partition也可以通过textFile()的第二个参数手动设置分区数量,只能比Block数量多,不能比Block数量少
scala代码
package com.strivelearn.scala
​
import org.apache.spark.{SparkConf, SparkContext}
​
/**
 * @author strivelearn
 * @version CreateRddByFileScala.java, 2022年11月19日
 */
object CreateRddByFileScala {
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val conf = new SparkConf()
    conf.setAppName("CreateRddByFileScala")
      .setMaster("local")
    val context = new SparkContext(conf)
​
    var path = "/Users/strivelearn/Desktop/rdd.txt"
    //path = "hdfs://bigdata01:9000/test/hello.txt"
    //读取文件数据,可以在textFile中指定生成的RDD的分区数量
    val rdd = context.textFile(path, 2)
    //获取每一行数据的长度,计算文件内数据的总长度
    val length = rdd.map(_.length).reduce(_ + _)
    println(length)
    context.stop()
  }
}
复制代码
java
package com.strivelearn.java;
​
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
​
/**
 * @author strivelearn
 * @version CreateRddByFileJava.java, 2022年11月20日
 */
public class CreateRddByFileJava {
    public static void main(String[] args) {
        //1.创建sparkContext
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("CreateRddByFileJava");
        sparkConf.setMaster("local");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        String path = "/Users/strivelearn/Desktop/rdd.txt";
        JavaRDD<String> stringJavaRDD = javaSparkContext.textFile(path, 2);
        JavaRDD<Integer> map = stringJavaRDD.map(line -> line.length());
        Integer sumLength = map.reduce((x, y) -> x + y);
        System.out.println(sumLength);
        javaSparkContext.stop();
    }
}
复制代码

二、Transformation和Action

transformation相当于java的stream流的中间方法

action相当于java的stream流的终结方法

2.1、介绍
  1. Spark支持两种RDD操作:Transformation和Action

  2. Transformation会针对已有的RDD创建一个新的RDD。

    Transformation是针对RDD中数据的一个转换操作,常见的有map、flatmap、filter等

  3. Action主要对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并且还可以把结果返回给Driver程序

  4. Transformation的特性就是lazy

  5. Action的特性:执行Action操作才会触发一个Spark Job的运行,从而触发这个Action之前所有的Transformation的执行

2.2、常用的Transformation介绍
Transformation介绍
map将rdd中的每个元素进行处理,一进一出
filter对rdd中每个元素进行判断,返回true则保留
flatMap与map类似,但是每个元素都可以返回一个或者多个新元素
groupByKey根据key进行分组,每个key对应一个Iterable
reduceByKey对每个相同key对应的value进行reduce操作
sortByKey对每个相同key对应的value进行排序操作(全局排序)
join对两个包含<key,value>对的RDD进行join操作
distinct对rdd中的元素进行全局去重
Scala代码
package com.strivelearn.scala
​
import org.apache.spark.{SparkConf, SparkContext}
​
/**
 * @author strivelearn
 * @version TransformationOpScala.java, 2022年11月20日
 */
object TransformationOpScala {
  def main(args: Array[String]): Unit = {
    val context = createSparkContext()
    //    mapOp(context)
    //    filterOp(context)
    //    flatMapOp(context)
    //    groupByKeyOp(context)
    //    reduceByKeyOp(context)
    joinOp(context)
  }
​
  /**
   * 对集合的每个元素乘以2
   *
   * @param sparkContext
   */
  def mapOp(sparkContext: SparkContext): Unit = {
    val dataRDD = sparkContext.parallelize(Array(1, 2, 3, 4, 5))
    dataRDD.map(d => d * 2).foreach(d => println(d))
  }
​
  /**
   * 将行拆分为单词
   *
   * @param sparkContext
   */
  def flatMapOp(sparkContext: SparkContext): Unit = {
    val dataRDD = sparkContext.parallelize(Array("good good study", "day day up"))
    dataRDD.flatMap(d => d.split(" ")).foreach(d => println(d))
  }
​
  /**
   * 过滤出集合的偶数。满足条件则保留
   *
   * @param sparkContext
   */
  def filterOp(sparkContext: SparkContext): Unit = {
    val dataRDD = sparkContext.parallelize(Array(1, 2, 3, 4, 5))
    dataRDD.filter(d => d % 2 == 0).foreach(println(_));
  }
​
  /**
   * 分组
   * 输出结果:
   * Area: JP 5,
   * Area: CN 2,3,
   * Area: US 1,4,
   *
   * @param sparkContext
   */
  def groupByKeyOp(sparkContext: SparkContext): Unit = {
    val dataRDD = sparkContext.parallelize(Array((1, "US"), (2, "CN"), (3, "CN"), (4, "US"), (5, "JP")))
    //将dataRDD的Tup的key、value互调位置
    dataRDD.map(tup => ((tup._2, tup._1))).groupByKey().foreach(tup => {
      val area = tup._1
      print("Area: " + area + " ")
      //获取同一个大区的所有的id
      for (elem <- tup._2) {
        print(elem + ",")
      }
      println()
    })
  }
​
  /**
   * 根据key进行reduce相加
   * 输出结果
   * (JP,1)
   * (CN,2)
   * (US,2)
   *
   * @param sparkContext
   */
  def reduceByKeyOp(sparkContext: SparkContext): Unit = {
    val dataRDD = sparkContext.parallelize(Array((1, "US"), (2, "CN"), (3, "CN"), (4, "US"), (5, "JP")))
    dataRDD.map(tup => (tup._2, 1)).reduceByKey((x, y) => x + y).foreach(println(_))
  }
​
  /**
   * join操作
   * 输出结果
   * (1,(US,400))
   * (3,(CN,200))
   * (2,(CN,300))
   *
   * @param sparkContext
   */
  def joinOp(sparkContext: SparkContext): Unit = {
    val dataRDD = sparkContext.parallelize(Array((1, "US"), (2, "CN"), (3, "CN"), (4, "US"), (5, "JP")))
    val dataRDD2 = sparkContext.parallelize(Array((1, 400), (2, 300), (3, 200)))
    val rdd = dataRDD.join(dataRDD2)
    rdd.foreach(println)
  }
​
  /**
   * 创建SparkContext
   *
   * @return
   */
  def createSparkContext(): SparkContext = {
    val conf = new SparkConf()
    conf.setAppName("CreateRddByFileScala")
      .setMaster("local")
    new SparkContext(conf)
  }
}
复制代码
Java代码
package com.strivelearn.java;
​
import java.util.Arrays;
​
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
​
import scala.Tuple2;
import scala.Tuple3;
​
/**
 * The type Transformation op java.
 *
 * @author strivelearn
 * @version TransformationOpJava.java, 2022年11月20日
 */
public class TransformationOpJava {
    /**
     * The entry point of application.
     *
     * @param args the input arguments
     */
    public static void main(String[] args) {
        JavaSparkContext sparkContext = getSparkContext();
        //        mapOp(sparkContext);
        //        filterOp(sparkContext);
        //        flatMapOp(sparkContext);
        //        groupByKeyOp(sparkContext);
        //        groupByKeyOp2(sparkContext);
        //        reduceOp(sparkContext);
        //        sortByKeyOp(sparkContext);
        //        joinOp(sparkContext);
        distinctOp(sparkContext);
    }
​
    /**
     * Distinct op.
     * 输出
     * Japan
     * CN
     * US
     *
     * @param sparkContext the spark context
     */
    private static void distinctOp(JavaSparkContext sparkContext) {
        Tuple2<Integer, String> t1 = new Tuple2<>(1, "US");
        Tuple2<Integer, String> t2 = new Tuple2<>(2, "CN");
        Tuple2<Integer, String> t3 = new Tuple2<>(3, "US");
        Tuple2<Integer, String> t4 = new Tuple2<>(4, "Japan");
        JavaRDD<Tuple2<Integer, String>> dataRDD = sparkContext.parallelize(Arrays.asList(t1, t2, t3, t4));
        dataRDD.map(tuple -> tuple._2).distinct().foreach(res -> System.out.println(res));
​
    }
​
    /**
     * Join op.
     * 输出
     * (4,(250,Japan))
     * (1,(300,US))
     * (3,(500,US))
     * (2,(200,CN))
     *
     * @param sparkContext the spark context
     */
    private static void joinOp(JavaSparkContext sparkContext) {
        Tuple2<Integer, Integer> t1 = new Tuple2<>(1, 300);
        Tuple2<Integer, Integer> t2 = new Tuple2<>(2, 200);
        Tuple2<Integer, Integer> t3 = new Tuple2<>(3, 500);
        Tuple2<Integer, Integer> t4 = new Tuple2<>(4, 250);
​
        Tuple2<Integer, String> t5 = new Tuple2<>(1, "US");
        Tuple2<Integer, String> t6 = new Tuple2<>(2, "CN");
        Tuple2<Integer, String> t7 = new Tuple2<>(3, "US");
        Tuple2<Integer, String> t8 = new Tuple2<>(4, "Japan");
        JavaRDD<Tuple2<Integer, Integer>> dataRDD = sparkContext.parallelize(Arrays.asList(t1, t2, t3, t4));
        JavaRDD<Tuple2<Integer, String>> dataRDD2 = sparkContext.parallelize(Arrays.asList(t5, t6, t7, t8));
        JavaPairRDD<Integer, Integer> dataPairRDD = dataRDD.mapToPair(tuple -> new Tuple2<>(tuple._1, tuple._2));
        JavaPairRDD<Integer, String> dataPairRDD2 = dataRDD2.mapToPair(tuple -> new Tuple2<>(tuple._1, tuple._2));
        dataPairRDD.join(dataPairRDD2).foreach(res -> System.out.println(res));
    }
​
    /**
     * Sort by key op.
     * 输出结果
     * (500,3)
     * (300,1)
     * (250,4)
     * (200,2)
     *
     * @param sparkContext the spark context
     */
    private static void sortByKeyOp(JavaSparkContext sparkContext) {
        Tuple2<Integer, Integer> t1 = new Tuple2<>(1, 300);
        Tuple2<Integer, Integer> t2 = new Tuple2<>(2, 200);
        Tuple2<Integer, Integer> t3 = new Tuple2<>(3, 500);
        Tuple2<Integer, Integer> t4 = new Tuple2<>(4, 250);
        JavaRDD<Tuple2<Integer, Integer>> dataRDD = sparkContext.parallelize(Arrays.asList(t1, t2, t3, t4));
        dataRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)).sortByKey(false).foreach(res -> System.out.println(res));
    }
​
    /**
     * Reduce op.
     * 输出
     * (Japan,1)
     * (CN,1)
     * (US,2)
     *
     * @param sparkContext the spark context
     */
    private static void reduceOp(JavaSparkContext sparkContext) {
        Tuple2<Integer, String> t1 = new Tuple2<>(1, "US");
        Tuple2<Integer, String> t2 = new Tuple2<>(2, "CN");
        Tuple2<Integer, String> t3 = new Tuple2<>(3, "US");
        Tuple2<Integer, String> t4 = new Tuple2<>(4, "Japan");
        JavaRDD<Tuple2<Integer, String>> dataRDD = sparkContext.parallelize(Arrays.asList(t1, t2, t3, t4));
        dataRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, 1)).reduceByKey((x, y) -> x + y).foreach(res -> System.out.println(res));
    }
​
    /**
     * Group by key op 2.
     * 输出
     * Area=Japan:<4,female>
     * Area=CN:<2,female><3,male>
     * Area=US:<1,male>
     *
     * @param sparkContext the spark context
     */
    private static void groupByKeyOp2(JavaSparkContext sparkContext) {
        Tuple3<Integer, String, String> t1 = new Tuple3<>(1, "US", "male");
        Tuple3<Integer, String, String> t2 = new Tuple3<>(2, "CN", "female");
        Tuple3<Integer, String, String> t3 = new Tuple3<>(3, "CN", "male");
        Tuple3<Integer, String, String> t4 = new Tuple3<>(4, "Japan", "female");
        JavaRDD<Tuple3<Integer, String, String>> dataRDD = sparkContext.parallelize(Arrays.asList(t1, t2, t3, t4));
        dataRDD.mapToPair(tuple -> new Tuple2<>(tuple._2(), new Tuple2(tuple._1(), tuple._3()))).groupByKey().foreach(res -> {
            String area = res._1;
​
            System.out.print("Area=" + area + ":");
            //获取同一个大区所有的对应用户id
            Iterable<Tuple2> tuple2s = res._2;
            for (Tuple2 tuple2 : tuple2s) {
                System.out.print("<" + tuple2._1 + "," + tuple2._2 + ">");
            }
            System.out.println();
        });
    }
​
    /**
     * Group by key op.
     * 输出
     * Area=Japan:4,
     * Area=CN:2,
     * Area=US:1,3,
     *
     * @param sparkContext the spark context
     */
    private static void groupByKeyOp(JavaSparkContext sparkContext) {
        Tuple2<Integer, String> t1 = new Tuple2<>(1, "US");
        Tuple2<Integer, String> t2 = new Tuple2<>(2, "CN");
        Tuple2<Integer, String> t3 = new Tuple2<>(3, "US");
        Tuple2<Integer, String> t4 = new Tuple2<>(4, "Japan");
        JavaRDD<Tuple2<Integer, String>> dataRDD = sparkContext.parallelize(Arrays.asList(t1, t2, t3, t4));
        dataRDD.mapToPair(tup -> new Tuple2<>(tup._2, tup._1)).groupByKey().foreach(res -> {
            String area = res._1;
            System.out.print("Area=" + area + ":");
            //获取同一个大区所有的对应用户id
            Iterable<Integer> ids = res._2;
            for (Integer id : ids) {
                System.out.print(id + ",");
            }
            System.out.println();
        });
    }
​
    /**
     * Flat map op.
     *
     * @param sparkContext the spark context
     */
    private static void flatMapOp(JavaSparkContext sparkContext) {
        JavaRDD<String> dataRDD = sparkContext.parallelize(Arrays.asList("good good study", "day day up"));
        dataRDD.flatMap(line -> Arrays.stream(line.split(" ")).iterator()).foreach(res -> System.out.println(res));
    }
​
    /**
     * Filter op.
     *
     * @param sparkContext the spark context
     */
    private static void filterOp(JavaSparkContext sparkContext) {
        JavaRDD<Integer> dataRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
        dataRDD.filter(integer -> integer % 2 == 0).foreach(res -> System.out.println(res));
    }
​
    /**
     * Map op.
     *
     * @param sparkContext the spark context
     */
    private static void mapOp(JavaSparkContext sparkContext) {
        JavaRDD<Integer> dataRDD = sparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6));
        dataRDD.map(integer -> integer * 2).foreach(res -> System.out.println(res));
    }
​
    /**
     * Gets spark context.
     *
     * @return the spark context
     */
    private static JavaSparkContext getSparkContext() {
        //1.创建sparkContext
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("TransformationOpJava");
        sparkConf.setMaster("local");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        return javaSparkContext;
    }
}
复制代码
2.3、常用的Action介绍
算子(Action)介绍
reduce将RDD中所有元素进行聚合操作
collect将RDD中所有元素获取到本地客户端(Driver)
take(n)获取RDD中前n个元素
count获取RDD中元素总数
saveAsTextFile将RDD中元素保存到文件中,对每个元素调用toString
countByKey对每个key对应的值进行count计数
foreach遍历RDD中的每个元素
请发表您的评论
请关注微信公众号
微信二维码
不容错过
Powered By Z-BlogPHP