博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark通过combineByKey算子实现条件性聚合的方法
阅读量:6077 次
发布时间:2019-06-20

本文共 1890 字,大约阅读时间需要 6 分钟。

实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。

使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer。最后调用flatMap将ArrayBuffer中的元素分拆。

比如下面的代码实现了对某个字段聚合时,按照时间条件进行选择性的聚合:

val rdd1 = sc.textFile(dayDayDir).union(sc.textFile(thisDayDir))    .map(line => line.split("\\|"))    .filter(arr => if(arr.length != 14 || !arr(3).substring(0, 8).equals(lastDay)) false else true)    .map(arr => (arr(0), arr))    .reduceByKey( (pure, after) => reduceSession(pure, after))    .map(tup => (tup._2(13), tup._2))    .combineByKey( x => ArrayBuffer(x),    (x:ArrayBuffer[Array[String]],y) => combineMergeValue(x, y),    (x:ArrayBuffer[Array[String]],y:ArrayBuffer[Array[String]]) => combineMergeCombiners(x, y))    .flatMap(tup => arrToStr(tup._2))def combineMergeValue(x:ArrayBuffer[Array[String]], y:Array[String])                    : ArrayBuffer[Array[String]] = {    var outList = x.clone()    var outarr = y.clone()    var flag = true    for(i <- 0 until outList.length){        if(checkTime(outList(i)(3), outList(i)(4), y(3), y(4))) {            outarr = reduceSession(outList(i), y)            outList(i) = outarr            flag = false        }    }    if(flag) {        outList += y    }    outList}def combineMergeCombiners(x:ArrayBuffer[Array[String]], y:ArrayBuffer[Array[String]])                : ArrayBuffer[Array[String]] = {    var outList = x.clone();    for(i <- 0 until y.length){    var outarr = y(i).clone()    var flag = true    for(j <- 0 until outList.length){        if(checkTime(outList(j)(3), outList(j)(4), y(i)(3), y(i)(4))) {            outarr = reduceSession(outList(j), y(i))            outList(j) = outarr            flag = false        }    }    if(flag) {        outList += y(i)    }    }    outList}

转载于:https://blog.51cto.com/11091005/2120619

你可能感兴趣的文章
Python-MacOSX下SIP引起的pip权限问题解决方案(非取消SIP机制)
查看>>
从MFQ方法到需求分析
查看>>
android.view.WindowManager$BadTokenException: Unable to add window
查看>>
HDU5012:Dice(bfs模板)
查看>>
iphone openssh
查看>>
Linux下MEncoder的编译
查看>>
Javascript中闭包(Closure)的探索(一)-基本概念
查看>>
spark高级排序彻底解秘
查看>>
ylbtech-LanguageSamples-PartialTypes(部分类型)
查看>>
福建省促进大数据发展:变分散式管理为统筹集中式管理
查看>>
开发环境、生产环境、测试环境的基本理解和区别
查看>>
tomcat多应用之间如何共享jar
查看>>
Flex前后台交互,service层调用后台服务的简单封装
查看>>
MySQL入门12-数据类型
查看>>
Windows Azure 保留已存在的虚拟网络外网IP(云服务)
查看>>
修改字符集
查看>>
HackTheGame 攻略 - 第四关
查看>>
js删除数组元素
查看>>
带空格文件名的处理(find xargs grep ..etc)
查看>>
华为Access、Hybrid和Trunk的区别和设置
查看>>