import org.apache.spark.{SparkContext, SparkConf}/** * Created by spark on 15-1-19. * 根据key对K-V类型的RDD进行排序获得新的RDD */object SortByKey { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(List("dog","cat","owl","gnu","ant")) val b = sc.parallelize(1 to a.count().toInt) val c = a.zip(b) //asc c.sortByKey(true).collect().foreach(print) //desc c.sortByKey(false).collect().foreach(print) }}/** * Created by spark on 15-1-19. * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的 */object Subtract { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 10) val b = sc.parallelize(1 to 3) //45678910 //a.subtract(b).collect().foreach(print) val c = sc.parallelize(1 to 10) val d = sc.parallelize(List(1,2,3,11)) //45678910 c.subtract(d).collect().foreach(print) }}/** * Created by spark on 15-1-19. * RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1 key中存在的,RDD2 key中不存在的 */object SubtractByKey { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(List("dog","he","word","hello")) val b = a.keyBy(_.length) val c = sc.parallelize(List("cat","first","everyone")) val d = c.keyBy(_.length) //(2,he)(4,word) b.subtractByKey(d).collect().foreach(print) }}/** * Created by spark on 15-1-19. * sumApprox没有出现我希望的结果 */object SumAndSumApprox { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000000) val b = a.sum() val c = a.sumApprox(0L,0.9).getFinalValue() println(b + " *** " + c) }}/** * Created by spark on 15-1-19. * 取出RDD的前n个元素,以数组的形式返回 */object Take { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000000) //12345678910 a.take(10).foreach(print) }}/** * Created by spark on 15-1-19. * 对RDD元素进行升序排序 * 取出前n个元素并以数组的形式放回 */object TakeOrdered { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(List("ff","aa","dd","cc")) //aacc a.takeOrdered(2).foreach(print) }}/** * Created by spark on 15-1-19. * 数据取样 */object TakeSample { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 10000) /** * 9048 5358 5216 7301 6303 6179 6151 5304 8115 3869 */ a.takeSample(true , 10 , 1).foreach(println) }}/** * Created by spark on 15-1-19. * debug 详情信息显示 */object ToDebugString { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 9) val b = sc.parallelize(1 to 3) val c = a.subtract(b) c.toDebugString }}/** * Created by spark on 15-1-19. * 获得前几个最大值 */object Top { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 1000) val c = a.top(10) /** *1000 999 998 997 996 995 994 993 992 991 */ c.foreach(println) }}/** * Union == ++ 把两个RDD合并为一个新的RDD */object Union { def main(args: Array[String]) { val conf = new SparkConf().setAppName("spark-demo").setMaster("local") val sc = new SparkContext(conf) //import org.apache.spark.SparkContext._ val a = sc.parallelize(1 to 3) val b = sc.parallelize(3 to 5) val c = a.union(b) val d = a ++ b /** *123345 */ c.collect().foreach(print) /** *123345 */ d.collect().foreach(print) }}
--Java
package com.demo.sparkWordCount;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.VoidFunction;import akka.japi.Function;import scala.Tuple2;/* * Ming Z M LI * */public class FunctionDemo { /* * create Context */ public static JavaSparkContext createContext() { SparkConf sparkConf = new SparkConf().setAppName("FunctionDemo").setMaster("local[*]"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); return ctx; } public static void main(String[] args) { demo5(); } /* * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的 */ public static void demo2() { JavaSparkContext ctx = createContext(); Listlist1 = new ArrayList (); list1.add("hello1"); list1.add("hello2"); list1.add("hello3"); list1.add("hello4"); List list2 = new ArrayList (); list2.add("hello3"); list2.add("hello4"); list2.add("world5"); list2.add("world6"); JavaRDD a = ctx.parallelize(list1); JavaRDD b = ctx.parallelize(list2); a.subtract(b).foreach(new VoidFunction () { public void call(String t) throws Exception { System.out.println(t.toString()); } }); } /** * Created by spark on 15-1-19. RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1 * key中存在的,RDD2 key中不存在的 foreach 结果带key (4, bird) (5, hello) (3, cat) output * - (4,bird) (4,bird) */ public static void demo3() { JavaSparkContext ctx = createContext(); JavaRDD a = ctx.parallelize(new ArrayList (Arrays.asList("cat", "hello", "bird", "bird"))); JavaRDD b = ctx.parallelize(new ArrayList (Arrays.asList("cat", "hello", "testing"))); JavaPairRDD c = a.keyBy(new org.apache.spark.api.java.function.Function () { public Integer call(String v1) throws Exception { return v1.length(); } }); // c.foreach(new VoidFunction >(){ // // public void call(Tuple2 t) throws Exception { // // TODO Auto-generated method stub // System.out.println("("+t._1+", "+t._2+")"); // } // }); JavaPairRDD d = b.keyBy(new org.apache.spark.api.java.function.Function () { public Integer call(String v1) throws Exception { return v1.length(); } }); c.subtract(d).foreach(new VoidFunction >() { public void call(Tuple2 t) throws Exception { // TODO Auto-generated method stub System.out.println("(" + t._1 + ", " + t._2 + ")"); } }); } /** * 取出RDD的前n个元素,以数组的形式返回 */ public static void demo4() { JavaSparkContext ctx = createContext(); JavaRDD a = ctx.parallelize(new ArrayList (Arrays.asList("1", "4", "2", "3"))); List b = a.take(3); for (String c : b) { System.out.println(c); } } /** * 获得前几个最大值 output - hello 3 */ public static void demo5() { JavaSparkContext ctx = createContext(); JavaRDD a = ctx.parallelize(new ArrayList (Arrays.asList("1", "hello", "2", "3"))); List b = a.top(2); for (String c : b) { System.out.println(c); } } }