博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java_spark简单例子
阅读量:5808 次
发布时间:2019-06-18

本文共 7775 字,大约阅读时间需要 25 分钟。

import org.apache.spark.{SparkContext, SparkConf}/** * Created by spark on 15-1-19. * 根据key对K-V类型的RDD进行排序获得新的RDD */object SortByKey {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    import org.apache.spark.SparkContext._    val a = sc.parallelize(List("dog","cat","owl","gnu","ant"))    val b = sc.parallelize(1 to a.count().toInt)    val c = a.zip(b)    //asc    c.sortByKey(true).collect().foreach(print)    //desc    c.sortByKey(false).collect().foreach(print)  }}/** * Created by spark on 15-1-19. * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的 */object Subtract {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    import org.apache.spark.SparkContext._    val a = sc.parallelize(1 to 10)    val b = sc.parallelize(1 to 3)    //45678910    //a.subtract(b).collect().foreach(print)    val c = sc.parallelize(1 to 10)    val d = sc.parallelize(List(1,2,3,11))    //45678910    c.subtract(d).collect().foreach(print)  }}/** * Created by spark on 15-1-19. * RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1 key中存在的,RDD2 key中不存在的 */object SubtractByKey {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    import org.apache.spark.SparkContext._    val a = sc.parallelize(List("dog","he","word","hello"))    val b = a.keyBy(_.length)    val c = sc.parallelize(List("cat","first","everyone"))    val d = c.keyBy(_.length)    //(2,he)(4,word)    b.subtractByKey(d).collect().foreach(print)  }}/** * Created by spark on 15-1-19. * sumApprox没有出现我希望的结果 */object SumAndSumApprox {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    import org.apache.spark.SparkContext._    val a = sc.parallelize(1 to 1000000)    val b = a.sum()    val c = a.sumApprox(0L,0.9).getFinalValue()    println(b + " *** " + c)  }}/** * Created by spark on 15-1-19. * 取出RDD的前n个元素,以数组的形式返回 */object Take {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    //import org.apache.spark.SparkContext._    val a = sc.parallelize(1 to 1000000)    //12345678910    a.take(10).foreach(print)  }}/** * Created by spark on 15-1-19. * 对RDD元素进行升序排序 * 取出前n个元素并以数组的形式放回 */object TakeOrdered {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    //import org.apache.spark.SparkContext._    val a = sc.parallelize(List("ff","aa","dd","cc"))    //aacc    a.takeOrdered(2).foreach(print)  }}/** * Created by spark on 15-1-19. * 数据取样 */object TakeSample {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    //import org.apache.spark.SparkContext._    val a = sc.parallelize(1 to 10000)    /**     * 9048        5358        5216        7301        6303        6179        6151        5304        8115        3869     */    a.takeSample(true , 10 , 1).foreach(println)  }}/** * Created by spark on 15-1-19. * debug 详情信息显示 */object ToDebugString {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    //import org.apache.spark.SparkContext._    val a = sc.parallelize(1 to 9)    val b = sc.parallelize(1 to 3)    val c = a.subtract(b)    c.toDebugString  }}/** * Created by spark on 15-1-19. * 获得前几个最大值 */object Top {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    //import org.apache.spark.SparkContext._    val a = sc.parallelize(1 to 1000)    val c = a.top(10)    /**     *1000      999      998      997      996      995      994      993      992      991     */    c.foreach(println)   }}/** * Union == ++ 把两个RDD合并为一个新的RDD */object Union {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("spark-demo").setMaster("local")    val sc = new SparkContext(conf)    //import org.apache.spark.SparkContext._    val a = sc.parallelize(1 to 3)    val b = sc.parallelize(3 to 5)    val c = a.union(b)    val d = a ++ b    /**     *123345     */    c.collect().foreach(print)    /**     *123345     */    d.collect().foreach(print)  }}

 --Java

package com.demo.sparkWordCount;import java.util.ArrayList;import java.util.Arrays;import java.util.List;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.VoidFunction;import akka.japi.Function;import scala.Tuple2;/* * Ming Z M LI * */public class FunctionDemo {    /*     * create Context     */    public static JavaSparkContext createContext() {        SparkConf sparkConf = new SparkConf().setAppName("FunctionDemo").setMaster("local[*]");        JavaSparkContext ctx = new JavaSparkContext(sparkConf);        return ctx;    }    public static void main(String[] args) {        demo5();    }    /*     * RDD1.subtract(RDD2):返回一个新的RDD,内容是:RDD1中存在的,RDD2中不存在的     */    public static void demo2() {        JavaSparkContext ctx = createContext();        List
list1 = new ArrayList
(); list1.add("hello1"); list1.add("hello2"); list1.add("hello3"); list1.add("hello4"); List
list2 = new ArrayList
(); list2.add("hello3"); list2.add("hello4"); list2.add("world5"); list2.add("world6"); JavaRDD
a = ctx.parallelize(list1); JavaRDD
b = ctx.parallelize(list2); a.subtract(b).foreach(new VoidFunction
() { public void call(String t) throws Exception { System.out.println(t.toString()); } }); } /** * Created by spark on 15-1-19. RDD1.subtractByKey(RDD2):返回一个新的RDD,内容是:RDD1 * key中存在的,RDD2 key中不存在的 foreach 结果带key (4, bird) (5, hello) (3, cat) output * - (4,bird) (4,bird) */ public static void demo3() { JavaSparkContext ctx = createContext(); JavaRDD
a = ctx.parallelize(new ArrayList
(Arrays.asList("cat", "hello", "bird", "bird"))); JavaRDD
b = ctx.parallelize(new ArrayList
(Arrays.asList("cat", "hello", "testing"))); JavaPairRDD
c = a.keyBy(new org.apache.spark.api.java.function.Function
() { public Integer call(String v1) throws Exception { return v1.length(); } }); // c.foreach(new VoidFunction
>(){ // // public void call(Tuple2
t) throws Exception { // // TODO Auto-generated method stub // System.out.println("("+t._1+", "+t._2+")"); // } // }); JavaPairRDD
d = b.keyBy(new org.apache.spark.api.java.function.Function
() { public Integer call(String v1) throws Exception { return v1.length(); } }); c.subtract(d).foreach(new VoidFunction
>() { public void call(Tuple2
t) throws Exception { // TODO Auto-generated method stub System.out.println("(" + t._1 + ", " + t._2 + ")"); } }); } /** * 取出RDD的前n个元素,以数组的形式返回 */ public static void demo4() { JavaSparkContext ctx = createContext(); JavaRDD
a = ctx.parallelize(new ArrayList
(Arrays.asList("1", "4", "2", "3"))); List
b = a.take(3); for (String c : b) { System.out.println(c); } } /** * 获得前几个最大值 output - hello 3 */ public static void demo5() { JavaSparkContext ctx = createContext(); JavaRDD
a = ctx.parallelize(new ArrayList
(Arrays.asList("1", "hello", "2", "3"))); List
b = a.top(2); for (String c : b) { System.out.println(c); } } }

 

转载于:https://www.cnblogs.com/MarchThree/p/5059649.html

你可能感兴趣的文章
Android图片添加水印图片并把图片保存到文件存储
查看>>
C#字符串的不变性
查看>>
前端路由简介以及vue-router实现原理
查看>>
比特币系统采用的公钥密码学方案和ECDSA签名算法介绍——第二部分:代码实现(C语言)...
查看>>
分享15款很实用的 Sass 和 Compass 工具
查看>>
AMD优势: 与众不同 选择丰富
查看>>
玩转高性能超猛防火墙nf-HiPAC
查看>>
简单按日期查询mysql某张表中的记录数
查看>>
自动化部署之jenkins发布PHP项目
查看>>
C/C++编程可用的Linux自带工具
查看>>
如何判断webview是不是滑到底部
查看>>
海贼王十大悲催人物
查看>>
org.hibernate.MappingException: No Dialect mapping for JDBC type: -1 搞定!
查看>>
热点热词新闻资讯API开放接口(永久免费开放)
查看>>
8.1_Linux习题和作业
查看>>
11.排序算法_6_归并排序
查看>>
Redis redis-cli 命令列表
查看>>
.NET框架设计—常被忽视的框架设计技巧
查看>>
BigDecimal 舍入模式(Rounding mode)介绍
查看>>
开源 免费 java CMS - FreeCMS1.2-标签 infoSign
查看>>