本文共 1890 字,大约阅读时间需要 6 分钟。
实际开发过程中遇到了需要实现选择性聚合的场景,即对于某一个key对应的数据,将满足条件的记录进行聚合,不满足条件的则不进行聚合。
使用spark处理这种计算场景时,想到了使用combineByKey算子,先将输入数据中的value映射成含一个元素的ArrayBuffer(scala中相当于java中的ArrayList),然后在聚合时对满足聚合条件的记录聚合后覆盖这一个ArrayBuffer,不满足条件的待聚合的两条记录都填入ArrayBuffer。最后调用flatMap将ArrayBuffer中的元素分拆。
比如下面的代码实现了对某个字段聚合时,按照时间条件进行选择性的聚合:
val rdd1 = sc.textFile(dayDayDir).union(sc.textFile(thisDayDir)) .map(line => line.split("\\|")) .filter(arr => if(arr.length != 14 || !arr(3).substring(0, 8).equals(lastDay)) false else true) .map(arr => (arr(0), arr)) .reduceByKey( (pure, after) => reduceSession(pure, after)) .map(tup => (tup._2(13), tup._2)) .combineByKey( x => ArrayBuffer(x), (x:ArrayBuffer[Array[String]],y) => combineMergeValue(x, y), (x:ArrayBuffer[Array[String]],y:ArrayBuffer[Array[String]]) => combineMergeCombiners(x, y)) .flatMap(tup => arrToStr(tup._2))def combineMergeValue(x:ArrayBuffer[Array[String]], y:Array[String]) : ArrayBuffer[Array[String]] = { var outList = x.clone() var outarr = y.clone() var flag = true for(i <- 0 until outList.length){ if(checkTime(outList(i)(3), outList(i)(4), y(3), y(4))) { outarr = reduceSession(outList(i), y) outList(i) = outarr flag = false } } if(flag) { outList += y } outList}def combineMergeCombiners(x:ArrayBuffer[Array[String]], y:ArrayBuffer[Array[String]]) : ArrayBuffer[Array[String]] = { var outList = x.clone(); for(i <- 0 until y.length){ var outarr = y(i).clone() var flag = true for(j <- 0 until outList.length){ if(checkTime(outList(j)(3), outList(j)(4), y(i)(3), y(i)(4))) { outarr = reduceSession(outList(j), y(i)) outList(j) = outarr flag = false } } if(flag) { outList += y(i) } } outList}
转载于:https://blog.51cto.com/11091005/2120619