20 March 2017 spark scala

Apache Spark hides the underlying dataset partitions for massive data parallelism. This makes it tricky to compute statistics in sliding windows across partitions. Though Spark MLlib provides efficient sliding window APIs, pyspark is unable to access the functionality yet. One way to implement sliding windows is to duplicate data items at positions covered by a sliding window. Then call groupBy() on RDD to group data items for each sliding window starting at each data item position. Unfortunately, the performance is insanely slow. On the other hand, the sliding window provided by RDDFunctions in Scala is fast and handles partitions correctly. The idea is to collect tail seqences in the following one or more partitions for the current partition to iterate. The following Python script computes the increasing runs of length K=15 in sliding windows of step size 1. The generated test dataset sequence ranging from 0 to 79. The number of increasing runs across partitions should be 80 - (K-1) = 66. If the sliding window across partitions are not handled, the results would be zero for partition size 80/20=4 less than the sliding window size.

from pyspark import SparkConf, SparkContext
import subprocess, sys, random
conf = SparkConf()
sc = SparkContext(conf = conf)
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

P = 20   # partitions
N = 80   # generated numbers
K = 15   # sliding window size
manual = False
for i in range(len(sys.argv)):
    if(sys.argv[i] == '-p' and i + 1 < len(sys.argv)): P = int(sys.argv[i+1])
    if(sys.argv[i] == '-n' and i + 1 < len(sys.argv)): N = int(sys.argv[i+1])
    if(sys.argv[i] == '-k' and i + 1 < len(sys.argv)): K = int(sys.argv[i+1])
    if(sys.argv[i] == '-m'): manual = True
print '(P, N, K, manual, testing) = ({}, {}, {}, {}, {})'.format(P, N, K, manual, testing)

def toFloat(n):
    try: return float(n)
    except: ValueError
    return 0

def isUp(window):
    return all(x<y for x, y in zip(window, window[1:]))

def toTail(itr):
    # Generate the tail sequence up to the sliding window size - 1
    tail = []
    for x in itr:
        if(len(tail) == K - 1):
    yield tail

def sliding(i, itr):
    # Aggregate the tail for the current partition from the collected tails
    j = i + 1
    tail = []
    while(j < len(tails) and len(tail) < K - 1):
        tail.extend(tails[j][:K - 1 - len(tail)])
        j = j + 1

    # Compose sliding windows including the full tail
    window = []
    for x in itr:
        if(len(window) == K):
            yield window
    for x in tail:
        if(len(window) == K):
            yield window

# Generate a test dateset of flaots
rdd = sc.parallelize(range(N), P).map(lambda n: toFloat(n))
    # Hand-made sliding windows using groupBy()
    def window(xi, k):
        x, i = xi
        return [(i - offset, i, x) for offset in xrange(k)]
    runs = (rdd.
            flatMap(lambda xi: window(xi, K)).
            groupBy(lambda ix: ix[0]).
            mapValues(lambda vals: [x for (pos, i, x) in sorted(vals)]).
            filter(lambda x: len(x) == K).
    # Partitions with tails to yield full sliding windows
    tails = rdd.mapPartitions(toTail).collect()
    runs = rdd.mapPartitionsWithIndex(sliding).filter(isUp)

print '######### Up runs[{}] = {} in {} partitions ##########'.format(K, runs.count(), rdd.getNumPartitions())

Performance Comparison with Scala

Apache Spark is maninly implemented in Scala. Python support is through Py4j binding. Even though running a Python script on Spark with spark-submit is straightforward, the performance difference with Scala could be significant when calling groupBy() or join() on RDD. Here are the benchmark results of counting the number of increasing runs of length 3 in an EEG dataset with 7450336 float numbers on an 8-core machine with 8GB RAM. Running a Python script through spark-submit could be faster than running a Scala script through spark-shell by saving some JVM/Spark startup overhead but growing slower when computing cross-partition operations. On the other hand, running a packaged Scala app through spark-submit is twice faster than its python counterpart, making it desirable for production use. Nonetheless, it takes some prep. work to package a Scala app.

Running uprun.py through spark-submit in 4 partitions
  • Sliding windows with full tails: 18.508s

  • Sliding windows using groupBy(): 10m44.039s

Running uprun.scala in a packaged jar through spark-submit in 4 partitions
  • Sliding windows by RDDFunctions: 9.184s

  • Sliding windows using groupBy(): 3m21.722s

Running uprun.scala through spark-shell in 4 partitions
  • Sliding windows by RDDFunctions: 21.759s

  • Sliding windows using groupBy(): 3m34.009s

The Scala app counterpart is listed below for completeness.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.rdd.RDDFunctions

object UpRun {
  def main(args: Array[String]) {
      val conf = new SparkConf().setAppName("Up Run Couting")
      val sc = new SparkContext(conf)
      var manual = false
      var testing = false
      var P =  4
      var N = 80
      var K =  3
      for(i <- 0 until args.length) {
          if(args(i) == "-p" && i+1 < args.length ) P = args(i+1).toInt
          if(args(i) == "-n" && i+1 < args.length ) N = args(i+1).toInt
          if(args(i) == "-k" && i+1 < args.length ) K = args(i+1).toInt
          if(args(i) == "-m") manual = true
          if(args(i) == "-t") testing = true
      printf("(P, N, K, manual, testing) = (%d, %d, %d, %s, %s)\n", P, N, K, manual, testing)
      val rdd = if(testing) sc.parallelize(1 to N, P).map(_.toFloat)
                else sc.textFile("/user/hadoop/eegsample", P).flatMap(_.split("\\s+")).map(_.toFloat)
      var count:Long = 0
      if(manual) {
          /* Hand-made sliding windows in Scala */
         count = rdd.zipWithIndex.
                 flatMap { case (x, i) => (0 until K).map(offset => (i - offset, i, x)) }.
                 groupBy { case (pos, _, _) => pos }.
                 mapValues(_.toList.sorted.map { case (_, _, x) => x }).
                 filter(_.size == K).
                 filter(w => (w zip w.drop(1)).forall { case (v1, v2) => v1 < v2 }).
     } else {
         count = RDDFunctions.fromRDD(rdd).
                 filter(w => (w zip w.drop(1)).forall { case (v1, v2) => v1 < v2 }).
     printf("######### Up runs[%d] = %d in %d partitions ##########\n", K, count, rdd.getNumPartitions)

comments powered by Disqus