Spark修炼之道(进阶篇)——Spark入门到精通:第六节 Spark编程模型(三)

  • 时间:
  • 浏览:0


* Return the number of elements in the RDD.


def count(): Long



* Return a fixed-size sampled subset of this RDD in an array


* @param withReplacement whether sampling is done with replacement

* @param num size of the returned sample

* @param seed seed for the random number generator

* @return sample of specified size in an array


// TODO: rewrite this without return statements so we can wrap it in a scope

def takeSample(

withReplacement: Boolean,

num: Int,

seed: Long = Utils.random.nextLong): Array[T]


foreach办法 遍历RDD中所有的元素

// Actions (launch a job to return a value to the user program)





* Applies a function f to all elements of this RDD.


def foreach(f: T => Unit): Unit



* Return the first element in this RDD.


def first()




将RDD保存到文件,本地模式时保指在本地文件,集群模式指或者 在Hadoop基础上则保指在HDFS上


* Save this RDD as a text file, using string representations of elements.


def saveAsTextFile(path: String): Unit

(2)aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])





* Repartition the RDD according to the given partitioner and, within each resulting partition,

* sort records by their keys.


* This is more efficient than calling repartition and then sorting within each partition

* because it can push the sorting down into the shuffle machinery.


def repartitionAndSortWithinPartitions(partitioner: Partitioner): RDD[(K, V)]


* Returns the first k (smallest) elements from this RDD as defined by the specified

* implicit Ordering[T] and maintains the ordering. This does the opposite of [[top]].

* For example:

* {{{

* sc.parallelize(Seq(10, 4, 2, 12, 3)).takeOrdered(1)

* // returns Array(2)


* sc.parallelize(Seq(2, 3, 4, 5, 6)).takeOrdered(2)

* // returns Array(2, 3)

* }}}


* @param num k, the number of elements to return

* @param ord the implicit ordering for T

* @return an array of top elements


def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]


(7) countByKey()



* Count the number of elements for each key, collecting the results to a local Map.


* Note that this method should only be used if the resulting map is expected to be small, as

* the whole thing is loaded into the driver’s memory.

* To handle very large results, consider using rdd.mapValues(_ => 1L).reduceByKey(_ + _), which

* returns an RDD[T, Long] instead of a map.


def countByKey(): Map[K, Long]

(6) takeOrdered(n, [ordering])


* Take the first num elements of the RDD. It works by first scanning one partition, and use the

* results from that partition to estimate the number of additional partitions needed to satisfy

* the limit.


* @note due to complications in the internal implementation, this method will raise

* an exception if called on an RDD of Nothing or Null.


def take(num: Int): Array[T]

从输出结果来看,seqOp函数起作用了,但comineOp函数并越来越起作用,在Spark 1.5、1.4及1.3八个 版本中测试,结果也有一样的。这篇文章给出了aggregateByKey的使用,其Spark版本是1.1,其返回结果符合预期。另一方其实是版本原因造成的,具体上边有时间再来分析。






* Return an array that contains all of the elements in this RDD.


def collect(): Array[T]

(6) saveAsTextFile(path)




(5) takeSample(withReplacement, num, [seed])

(1) reduce(func)

reduce采样累加或关联操作减少RDD中元素的数量,其办法 定义如下:


* Reduces the elements of this RDD using the specified commutative and

* associative binary operator.


def reduce(f: (T, T) => T): T


本小节将介绍常用的action操作,前面使用的collect办法 便是并不是action,它返回RDD中所有的数据元素,办法 定义如下:

aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了八个 中立的初始值。其函数定义如下:


* Aggregate the values of each key, using given combine functions and a neutral “zero value”.

* This function can return a different result type, U, than the type of the values in this RDD,

* V. Thus, we need one operation for merging a V into a U and one operation for merging two U’s,

* as in scala.TraversableOnce. The former operation is used for merging values within a

* partition, and the latter is used for merging values between partitions. To avoid memory

* allocation, both of these functions are allowed to modify and return their first argument

* instead of creating a new U.


def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,

combOp: (U, U) => U): RDD[(K, U)]

seq: 1 3

seq: 3 2

seq: 3 4

seq: 1 3

seq: 3 4