美文网首页
Accumulator

Accumulator

作者: 一个人一匹马 | 来源:发表于2019-02-21 11:25 被阅读0次

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能。但是确给我们提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

val sumAccumulator = sc.accumulator(0)
val arr = Array(1, 2, 3, 4, 5)
val rdd = sc.parallelize(arr)
rdd.foreach(num => sumAccumulator += num)
println(sumAccumulator.value)

Java版本

/**
 * 累加变量
 * @author Administrator
 *
 */
public class AccumulatorVariable {

public static void main(String[] args) {

SparkConf conf = new SparkConf()​​​​.setAppName("Accumulator").setMaster("local");

​​JavaSparkContext sc = new JavaSparkContext(conf);

​​// 创建Accumulator变量
​​// 需要调用SparkContext的accumulator()方法
​​final Accumulator<Integer> sum = sc.accumulator(0);

​​List<Integer> numberList = Arrays.asList(1, 2, 3, 4, 5);

​​JavaRDD<Integer> numbers = sc.parallelize(numberList);

​​numbers.foreach(new VoidFunction<Integer>() {

​​​private static final long serialVersionUID = 1L;

@Override
public void call(Integer t) throws Exception {
​​​​// 然后在函数内部,就可以对Accumulator变量,调用add()方法,累加值
​​​​sum.add(t);  
​​​}
​​});

// 在driver程序中,可以调用Accumulator的value()方法,获取其值
​​System.out.println(sum.value());  

​​sc.close();
​}
}

Scala版本

object AccumulatorVariable {

def main(args: Array[String]) {

val conf = new SparkConf().setAppName("AccumulatorVariable").setMaster("local")

val sc = new SparkContext(conf)

val sum = sc.accumulator(0)

val numberArray = Array(1,2,3,4,5)

val numbers = sc.parallelize(numberArray, 1)

numbers.foreach { num => sum += num }

println(sum)

  }

}

相关文章

  • Custom Accumulator in Spark 2.1

    Custom Accumulator in Spark 2.1 Accumulator can sum or co...

  • accumulator

    这是,gdb调试在SHA512下个断点 在这里就可以发现SHA512传入是的Input和input的长度,接下来c...

  • Accumulator

    Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了...

  • Spark Accumulator 使用及陷阱

    Accumulator简介 spark累加器。 只有driver能获取到Accumulator的值(使用value...

  • LongAccumulator 和 DoubleAccumula

    关键字:Accumulator 没能获得预期值,BinaryOperator 表达式如何写,Accumulator...

  • 09-flink-Accumulator(累加器)

    09-flink-Accumulator(累加器) 概念 Accumulator(累加器):累加器主要作用在用户操...

  • scan运算

    /**Applies an accumulator function over anobservable sequ...

  • reduce

    arr.reduce(callback(accumulator, currentValue[, index[, a...

  • reduce

    arr.reduce(callback[accumulator, currentValue, currentInd...

  • The field 'age_during_name' must

    The field 'age_during_name' must be an accumulator object...

网友评论

      本文标题:Accumulator

      本文链接:https://www.haomeiwen.com/subject/adcryqtx.html