Access S3 Bucket with Apache Spark

30 March 2017

Assume hadoop-2.7.3 is installed via brew on macOS. For single machine use, the configurations may be set statically. For multiple users, the configurations may be set pragmatically. The latest url scheme to access an S3 bucket begins with s3a://.

Static Configration

Step1

Tell spark where hadoop configuration files are.

/usr/local/Cellar/apache-spark/2.1.0/libexec/conf/spark-env.sh
export HADOOP_HOME=/usr/local/Cellar/hadoop/2.7.3/libexec
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
Step2

Set S3 endpoint and access key for Hadoop

/usr/local/Cellar/hadoop/2.7.3/libexec/etc/hadoop/core-site.xml
<configuration>
    <property>
        <name>fs.s3a.endpoint</name>
        <description>AWS S3 endpoint to connect to. An up-to-date list is
            provided in the AWS Documentation: regions and endpoints. Without this
            property, the standard region (s3.amazonaws.com) is assumed.
        </description>
        <value>s3.us-east-2.amazonaws.com</value>
    </property>
    <property>
        <name>fs.s3a.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
        <description>The implementation class of the S3A Filesystem</description>
    </property>
    <property>
        <name>fs.s3a.access.key</name>
        <description>AWS access key ID.</description>
        <value>**********</value>
    </property>
    <property>
        <name>fs.s3a.secret.key</name>
        <description>AWS secret key.</description>
        <value>**********</value>
    </property>
</configuration>

Alternatively, set the access key in the user’s environment.

bash_profile
export AWS_ACCESS_KEY_ID=***************
export AWS_SECRET_ACCESS_KEY=************
Step3

Set CLASSPATH of AWSClient and enable signature version V4 Support through spark-shell

The V4 support is for those S3 endpoints such Ohio.

bash_profile
export HADOOP_HOME=/usr/local/Cellar/hadoop/2.7.3/libexec
export AWS_CLASSPATH="$HADOOP_HOME/share/hadoop/tools/lib/hadoop-aws-2.7.3.jar"
export AWS_CLASSPATH="$AWS_CLASSPATH:$HADOOP_HOME/share/hadoop/tools/lib/aws-java-sdk-1.7.4.jar"
export AWS_CLASSPATH="$AWS_CLASSPATH:$HADOOP_HOME/share/hadoop/tools/lib/guava-11.0.2.jar"
alias spark-shell="spark-shell --driver-class-path="$AWS_CLASSPATH" --driver-java-options "-Dcom.amazonaws.services.s3.enableV4=true""

Pragmatic Configuration

The AWS_CLASSPATH above still needs to be passed to spark-shell. Run spark-shell, enable V4 support and set the S3 access/secret key and endpoint as follows.

System.setProperty("com.amazonaws.services.s3.enableV4", "true")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")
sc.hadoopConfiguration.set("fs.s3a.access.key", "**********")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "**********")

Read RDD from S3 Bucket

Finally, count a RDD by reading from a created S3 bucket.

val rdd = sc.textFile("s3a://cs5630s17/part00001.gz")
rdd.count

Counting Increasing Runs in Sliding Windows on Spark

20 March 2017

Apache Spark hides the underlying dataset partitions for massive data parallelism. This makes it tricky to compute statistics in sliding windows across partitions. Though Spark MLlib provides efficient sliding window APIs, pyspark is unable to access the functionality yet. One way to implement sliding windows is to duplicate data items at positions covered by a sliding window. Then call groupBy() on RDD to group data items for each sliding window starting at each data item position. Unfortunately, the performance is insanely slow. On the other hand, the sliding window provided by RDDFunctions in Scala is fast and handles partitions correctly. The idea is to collect tail seqences in the following one or more partitions for the current partition to iterate. The following Python script computes the increasing runs of length K=15 in sliding windows of step size 1. The generated test dataset sequence ranging from 0 to 79. The number of increasing runs across partitions should be 80 - (K-1) = 66. If the sliding window across partitions are not handled, the results would be zero for partition size 80/20=4 less than the sliding window size.

from pyspark import SparkConf, SparkContext
import subprocess, sys, random
conf = SparkConf()
sc = SparkContext(conf = conf)
logger = sc._jvm.org.apache.log4j
logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )

P = 20   # partitions
N = 80   # generated numbers
K = 15   # sliding window size
manual = False
for i in range(len(sys.argv)):
    if(sys.argv[i] == '-p' and i + 1 < len(sys.argv)): P = int(sys.argv[i+1])
    if(sys.argv[i] == '-n' and i + 1 < len(sys.argv)): N = int(sys.argv[i+1])
    if(sys.argv[i] == '-k' and i + 1 < len(sys.argv)): K = int(sys.argv[i+1])
    if(sys.argv[i] == '-m'): manual = True
print '(P, N, K, manual, testing) = ({}, {}, {}, {}, {})'.format(P, N, K, manual, testing)

def toFloat(n):
    try: return float(n)
    except: ValueError
    return 0

def isUp(window):
    return all(x<y for x, y in zip(window, window[1:]))

def toTail(itr):
    # Generate the tail sequence up to the sliding window size - 1
    tail = []
    for x in itr:
        tail.append(x)
        if(len(tail) == K - 1):
            break
    yield tail

def sliding(i, itr):
    # Aggregate the tail for the current partition from the collected tails
    j = i + 1
    tail = []
    while(j < len(tails) and len(tail) < K - 1):
        tail.extend(tails[j][:K - 1 - len(tail)])
        j = j + 1

    # Compose sliding windows including the full tail
    window = []
    for x in itr:
        window.append(x)
        if(len(window) == K):
            yield window
            window.pop(0)
    for x in tail:
        window.append(x)
        if(len(window) == K):
            yield window
            window.pop(0)

# Generate a test dateset of flaots
rdd = sc.parallelize(range(N), P).map(lambda n: toFloat(n))
if(manual):
    # Hand-made sliding windows using groupBy()
    def window(xi, k):
        x, i = xi
        return [(i - offset, i, x) for offset in xrange(k)]
    runs = (rdd.
            zipWithIndex().
            flatMap(lambda xi: window(xi, K)).
            groupBy(lambda ix: ix[0]).
            mapValues(lambda vals: [x for (pos, i, x) in sorted(vals)]).
            sortByKey().
            values().
            filter(lambda x: len(x) == K).
            filter(isUp))
else:
    # Partitions with tails to yield full sliding windows
    tails = rdd.mapPartitions(toTail).collect()
    runs = rdd.mapPartitionsWithIndex(sliding).filter(isUp)

print '######### Up runs[{}] = {} in {} partitions ##########'.format(K, runs.count(), rdd.getNumPartitions())

Performance Comparison with Scala

Apache Spark is maninly implemented in Scala. Python support is through Py4j binding. Even though running a Python script on Spark with spark-submit is straightforward, the performance difference with Scala could be significant when calling groupBy() or join() on RDD. Here are the benchmark results of counting the number of increasing runs of length 3 in an EEG dataset with 7450336 float numbers on an 8-core machine with 8GB RAM. Running a Python script through spark-submit could be faster than running a Scala script through spark-shell by saving some JVM/Spark startup overhead but growing slower when computing cross-partition operations. On the other hand, running a packaged Scala app through spark-submit is twice faster than its python counterpart, making it desirable for production use. Nonetheless, it takes some prep. work to package a Scala app.

Running uprun.py through spark-submit in 4 partitions
  • Sliding windows with full tails: 18.508s

  • Sliding windows using groupBy(): 10m44.039s

Running uprun.scala in a packaged jar through spark-submit in 4 partitions
  • Sliding windows by RDDFunctions: 9.184s

  • Sliding windows using groupBy(): 3m21.722s

Running uprun.scala through spark-shell in 4 partitions
  • Sliding windows by RDDFunctions: 21.759s

  • Sliding windows using groupBy(): 3m34.009s

The Scala app counterpart is listed below for completeness.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.rdd.RDDFunctions

object UpRun {
  def main(args: Array[String]) {
      val conf = new SparkConf().setAppName("Up Run Couting")
      val sc = new SparkContext(conf)
      var manual = false
      var testing = false
      var P =  4
      var N = 80
      var K =  3
      for(i <- 0 until args.length) {
          if(args(i) == "-p" && i+1 < args.length ) P = args(i+1).toInt
          if(args(i) == "-n" && i+1 < args.length ) N = args(i+1).toInt
          if(args(i) == "-k" && i+1 < args.length ) K = args(i+1).toInt
          if(args(i) == "-m") manual = true
          if(args(i) == "-t") testing = true
      }
      printf("(P, N, K, manual, testing) = (%d, %d, %d, %s, %s)\n", P, N, K, manual, testing)
      val rdd = if(testing) sc.parallelize(1 to N, P).map(_.toFloat)
                else sc.textFile("/user/hadoop/eegsample", P).flatMap(_.split("\\s+")).map(_.toFloat)
      var count:Long = 0
      if(manual) {
          /* Hand-made sliding windows in Scala */
         count = rdd.zipWithIndex.
                 flatMap { case (x, i) => (0 until K).map(offset => (i - offset, i, x)) }.
                 groupBy { case (pos, _, _) => pos }.
                 mapValues(_.toList.sorted.map { case (_, _, x) => x }).
                 sortByKey().
                 values.
                 filter(_.size == K).
                 filter(w => (w zip w.drop(1)).forall { case (v1, v2) => v1 < v2 }).
                 count
     } else {
         count = RDDFunctions.fromRDD(rdd).
                 sliding(K).
                 filter(w => (w zip w.drop(1)).forall { case (v1, v2) => v1 < v2 }).
                 count
     }
     printf("######### Up runs[%d] = %d in %d partitions ##########\n", K, count, rdd.getNumPartitions)
     sc.stop()
  }
}

Scripting Scala with Apache Spark

07 March 2017

You might be used to running python scripts through the spark-submit command. It is straightforward for prototyping and testing though running python applications through 'pyspark' is not supported as of Spark 2.0. On the other hand, Scala scripts may be packaged as standalone applications for spark-submit to accept. This might be a bit overwhelming for running simple Scala scripts even with build tools such as sbt or Gradle. It happens the spark-shell provides ways to load and evaluate a Scala script.

Method 1: load within the Spark shell

Type the built-in command starting with ':' to load the Scala script as follows:

scala>:load script.scala

Method 2: run spark-shell with options to specify script and arguements

Type the following command line to run a Scala script with command line arguments.

$> spark-shell -i script.scala --conf spark.driver.args="arg1 arg2 ..."

Option -i specifies the script path. Option --conf allows to specify parameters to pass to the SparkConf instance. spark.driver.args is a built-in property that is recognizable for use.

Method 3: define shell function to run with command line arguments

A shortcut to run a Spark script in Scala with command line arguments is through the system shell. Edit .bash_profile to add function spark-scala to capture the target Scala script with command line arguments as follows.

bash_profile
function spark-scala {
  spark-shell -i "$0" --conf spark.driver.args="$@"
}

The Spark script in Scala then retrieves the command line arguments via the SparkConf property spark.driver.args.

val args = sc.getConf.get("spark.driver.args").split("\\s+")
printf("args=%s\n", args.mkString(", "))
// ...

In this way, the scala script can be executed by spark-shell as a python script submitted by spark-submit with command line arguments passed.

$> spark-scala script.scala -k 3
args=-k, 3

Gotcha after Upgrading to Android Studio 2.2.x

09 November 2016

There are two issues you may encounter when upgrading from 2.1.x to 2.2.x. Simply disable and enable the plugins to fix them. Follow the links for details.

Monsoon Power Tool Web Service for Scripting

09 November 2016

monsoonpm

Monsoon Power Monitor is commonly used by researchers and engineers to measure energy consumption at high frequency. However, the lack of support for Mac OS X and Linux is a big turnoff and inconvenient. The monsoon.py included in AOSP is problematic on Mac OS X. The serial port access over USB blocks indefinitely every so often until reboot. The official support for Windows only renders virtual machine and web service as alternatives. Unfortunately, accessing the device through a virtual machine is not guaranteed to work. For example, the device enumeration API may return nothing, unlike on a real Windows machine. Therefore, setting up the web service on real Windows for remote scripting would be the expedient solution.

monsoonws

The Monsoon PowerTool Web Service for scripting I created is hosted on GitHub. This project contains the server-side web service in C# and a Java client. There is also a setup subproject for installation on Windows. The Java client can be built with Gradle to test the remote web service and shows output as follows.

Test Java client output
PowerToolServiceTest > testPowerToolServiceSampling STANDARD_OUT
    Found connected Monsoon monitor of serial 6325
        Instant main channel: samples=14355, current=0.248630.2, voltage=3.690500.2
        Instant main channel: samples=24255, current=0.218130.2, voltage=3.690500.2

View build.gradle for details to set the remote WSDL URL.

The Web Service APIs are based on PowerTool 4.0.5.2. while the client APIs depend on the actual stub code generation by Apache CXF. Refer to the official developer’s guide for the API overview.

Parade

15 November 2015

DevDraft just held the Sept. challenge 2015 with an intriguing algorithmic problem. After days of contemplation, my solution reaches 100% code correctness and perfect algorithmic problem solving for runtime efficiency.

Problem

A nearby city has recently undergone a massive revitalization effort and, in order to celebrate and attract economic investment, is going to throw a parade. The mayor plans to deploy a number of security forces for the days leading up to the parade to keep the parade route free of vandalism. However, the budget is limited, so the mayor wants to make sure the security is deployed in such a way as to maximize effectiveness.

You are given a list of integers representing the threat of vandalism occurring on the city blocks along the parade route—​0 means vandalism will not occur on a block, and greater integers indicate a greater danger of vandalism occurring. The parade is planned to move in a straight line and pass by every block exactly once. You are also given several security forces, each of which can patrol a number of adjacent blocks, totally nullifying the threat on the blocks they patrol. The forces come in different types with different patrol lengths; for example, an officer on bike can patrol farther than an officer on foot. The forces are represented by a list of pairs of integers, where the first integer is the number of adjacent blocks a type of force can patrol and the second is how many forces are available of that type.

The number of forces available is limited so you must place them strategically to minimize the sum of threat levels of all blocks that are not patrolled. Because the minimum threat level may be achieved by multiple arrangements of security, we ask that you output only the minimum total threat level that can be achieved, and not the positions of the forces.

C++ Implementation

Originally, the implementation is in Groovy but the runtime performance can be 100X slower than its C++ counterpart. Therefore, I decided to submit the C++ version though many Groovy constructs are not supported by the C++ STL and must be reimplemented. As expected, the rewards pay off.

The following code in C++ implements a branch and bound dynamic programming algorithm to solve the problem. More information about the solution analysis and complexity is available on the repo.


Older posts are available in the archive.