vendredi 11 septembre 2015

Spark: split rows and accumulate

I have this code:

val rdd = sc.textFile(sample.log")
val splitRDD = rdd.map(r => StringUtils.splitPreserveAllTokens(r, "\\|"))
val rdd2 = splitRDD.filter(...).map(row => createRow(row, fieldsMap))
sqlContext.createDataFrame(rdd2, structType).save(
    org.apache.phoenix.spark, SaveMode.Overwrite, Map("table" -> table, "zkUrl" -> zkUrl))

def createRow(row: Array[String], fieldsMap: ListMap[Int, FieldConfig]): Row = {
    //add additional index for invalidValues
    val arrSize = fieldsMap.size + 1
    val arr = new Array[Any](arrSize)
    var invalidValues = ""
    for ((k, v) <- fieldsMap) {
      val valid = ...
      var value : Any = null
      if (valid) {
        value = row(k)
        // if (v.code == "SOURCE_NAME") --> 5th column in the row
        // sourceNameCount = row(k).split(",").size
      } else {
        invalidValues += v.code + " : " + row(k) + " | "
      }
      arr(k) = value
    }
    arr(arrSize - 1) = invalidValues
    Row.fromSeq(arr.toSeq)
}

fieldsMap contains the mapping of the input columns: (index, FieldConfig). Where FieldConfig class contains "code" and "dataType" values.

TOPIC -> (0, v.code = "TOPIC", v.dataType = "String")
GROUP -> (1, v.code = "GROUP")
SOURCE_NAME1,SOURCE_NAME2,SOURCE_NAME3 -> (4, v.code = "SOURCE_NAME")

This is the sample.log:

TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME1,SOURCE_NAME2,SOURCE_NAME3|
SOURCE_TYPE1,SOURCE_TYPE2,SOURCE_TYPE3|SOURCE_COUNT1,SOURCE_COUNT2,SOURCE_COUNT3|
DEST_NAME1,DEST_NAME2,DEST_NAME3|DEST_TYPE1,DEST_TYPE2,DEST_TYPE3|
DEST_COUNT1,DEST_COUNT2,DEST_COUNT3|

The goal is to split the input (sample.log), based on the number of source_name(s).. In the example above, the output will have 3 rows:

TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME1|SOURCE_TYPE1|SOURCE_COUNT1|
|DEST_NAME1|DEST_TYPE1|DEST_COUNT1|

TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME2|SOURCE_TYPE2|SOURCE_COUNT2|
DEST_NAME2|DEST_TYPE2|DEST_COUNT2|

TOPIC|GROUP|TIMESTAMP|STATUS|SOURCE_NAME3|SOURCE_TYPE3|SOURCE_COUNT3|
|DEST_NAME3|DEST_TYPE3|DEST_COUNT3|

This is the new code I am working on (still using createRow defined above):

      val rdd2 = splitRDD.filter(...).flatMap(row => {

        val srcName = row(4).split(",")
        val srcType = row(5).split(",")
        val srcCount = row(6).split(",")

        val destName = row(7).split(",")
        val destType = row(8).split(",")
        val destCount = row(9).split(",")

        var newRDD: ArrayBuffer[Row] = new ArrayBuffer[Row]()

        //if (srcName != null) {
        println("\n\nsrcName.size: " + srcName.size + "\n\n")
        for (i <- 0 to srcName.size - 1) {
          // missing column: destType can sometimes be null

          val splittedRow: Array[Any] = Array(Row.fromSeq(Seq((row(0), row(1), row(2), row(3), 
            srcName(i), srcType(i), srcCount(i), destName(i), "", destCount(i)))))

          newRDD = newRDD ++ Seq(createRow(splittedRow, fieldsMap))
        }
        //}

        Seq(Row.fromSeq(Seq(newRDD)))

    })



via Chebli Mohamed

Aucun commentaire:

Enregistrer un commentaire