Harp-DAAL for High-Performance Big Data Computing
The Key to Simultaneously Boosting Productivity and Performance
Large-scale data analytics is revolutionizing many business and scientific domains. And easy-to-use, scalable parallel techniques are vital to process big data and gain meaningful insights. This article introduces a novel high-performance computing (HPC)-cloud convergence framework named Harp-DAAL and demonstrates how the combination of big data (Hadoop*) and HPC techniques can simultaneously boost productivity and performance.
Harp* is a distributed Hadoop-based framework that orchestrates node synchronization1. Harp uses Intel® Data Analytics Acceleration Library (Intel® DAAL)2 for its highly-optimized kernels on Intel® Xeon® and Xeon Phi™ processor architectures. This way, the high-level API of big data tools can be combined with intranode, fine-grained parallelism that’s optimized for HPC platforms.
We illustrate this framework in detail with K-means clustering, a compute-bound algorithm used in image clustering. We also show the broad applicability of Harp-DAAL by discussing the performance of three other big data algorithms:
- Subgraph Counting by color coding
- Matrix Factorization
- Latent Dirichlet Allocation
These algorithms share characteristics such as load imbalance, irregular structure, and communication issues that create performance challenges.
The categories in Figure 1 illustrate a classification of data-intensive computation into five models that map into five distinct system architectures. It starts with Sequential, followed by centralized batch architectures corresponding exactly to the three forms of MapReduce: Map-Only, MapReduce, and Iterative MapReduce. Category five is the classic Message Passing Interface (MPI) model.
Harp brings Hadoop users the benefits of supporting all five classes of data-intensive computation, from naturally parallel to machine learning and simulation. It expands the applicability of Hadoop (with a Harp plugin) to more classes of big data applications, especially complex data analytics such as machine learning and graph analysis. The design consists of a modular software stack with native kernels (with Intel DAAL) to effectively utilize scale-up servers for machine learning and data analytics applications. Harp-DAAL shows how simulations and big data can use common programming environments with a runtime based on a rich set of collective operations and libraries.
Interfacing Harp and Intel DAAL
Intel DAAL provides a native C/C++ API but also provides interfaces to higher-level programming languages such as Java* and Python*. Harp is written in Java and extended from the Hadoop ecosystem, so Java was the natural choice to interface Harp and Intel DAAL.
In Harp-DAAL, data is stored in a hierarchical data structure called Harp-Table, which consists of tagged partitions. Each partition contains a partition ID (metadata) and a user-defined serializable Java object such as a primitive array. When doing communication, data is transferred among distributed cluster nodes via Harp collective communication operations. When doing local computation, data moves from Harp-Table (JVM heap memory) to Intel DAAL native kernels (off-JVM heap memory). Copying data between a Java object and C/C++ allocated memory space is unavoidable. Figure 2 illustrates two approaches to this data copy:
- Direct Bulk Copy: If an Intel DAAL kernel allocates a continuous memory space for dense problems, Harp-DAAL will launch a bulk copy operation between a Harp-Table and a native memory address.
- Multithreading Irregular Copy: If an Intel DAAL kernel uses an irregular and sparse data structure―which means the data will be stored in non-consecutive memory segments―Harp-DAAL performs a second copy operation using Java/OpenMP threads, where the threads transfer data segments concurrently.
Applying Harp-DAAL to Data Analytics
K-means is a widely-used and relatively simple clustering algorithm that provides a clear example of how to use Harp-DAAL. K-means uses cluster centers to model data and converges quickly via iterative refinement. K-means clustering was performed on a large image dataset from Flickr*, which includes 100 million images, each with 4,096 dimensional deep features extracted using a deep convolutional neural network model trained on ImageNet*. Data preprocessing includes format transformation and dimensionality reduction from 4,096 to 128 using Principal Component Analysis (PCA). (The Harp-DAAL tutorial contains a description of the data preparation.)3
Harp-DAAL provides modular Java functions for developers to customize the K-means algorithm as well as tuning parameters for end users. The programming model consists of map functions linked by collectives. The K-means example takes seven steps.
Step 1: Load Training Data (Feature Vectors) and Model Data (Cluster Centers)
Use this function to load training data from the Hadoop Distributed File System (HDFS):
// create a pointArray List<double[]> pointArrays = LoadTrainingData();
Similarly, create a Harp table object cenTable and load centers from HDFS. Since centers are requested by all the mappers, the master mapper will load and broadcast them to all other mappers. Different center initialization methods can be supported in this fashion:
// create a table to hold cluster centers Table<DoubleArray> cenTable = new Table<>(0, new DoubleArrPlus()); if (this.isMaster()) { createCenTable(cenTable); loadCentroids(cenTable); } // Bcast centers to other mappers bcastCentroids(cenTable, this.getMasterID());
Step 2: Convert Training Data from Harp to Intel DAAL
The training data loaded from HDFS is stored in the Java heap memory. To invoke the Intel DAAL kernel, this step converts the data to an Intel DAAL NumericTable, which includes allocating the native memory for the NumericTable and copying data from pointArrays to trainingdata_daal.
// convert training data from Harp to DAAL NumericTable trainingdata_daal = convertTrainData(pointArrays);
Step 3: Create and Set Up an Intel DAAL K-means Kernel
Intel DAAL provides Java APIs to invoke native kernels for K-means on each node. It is called with:
- A specification of the input training data object
- The number of centers
- The number of threads to be used by the thread scheduler (Intel® Threading Building Blocks and Intel® Math Kernel Library)
// create a DAAL K-means kernel object DistributedStep1Local kmeansLocal = new DistributedStep1Local(daal_Context, Double.class, Method.defaultDense, this.numCentroids); // set up input training data kmeansLocal.input.set(InputId.data, trainingdata_daal); // specify the number of threads used by DAAL kernel Environment.setNumberOfThreads(numThreads); // create cenTable on DAAL side NumericTable cenTable_daal = createCenTableDAAL();
Step 4: Convert Center Format from Harp to Intel DAAL
The centers are stored in the Harp table cenTable for inter-process (mapper) communication. The centers are converted to Intel DAAL format at each iteration.
// Convert center format from Harp to DAAL convertCenTableHarpToDAAL(cenTable, cenTable_daal);
Step 5: Local Computation by Intel DAAL Kernel
Call Intel DAAL K-means kernels of local computation at each iteration.
// specify cluster centers to DAAL kernel kmeansLocal.input.set(InputId.inputCentroids, cenTable_daal); // first step of local computation by using DAAL kernels to get a partial result PartialResult pres = kmeansLocal.compute();
Step 6: Inter-Mapper Communication
Harp-DAAL K-means uses an AllReduce computation model where each mapper keeps a local copy of the whole model data (cluster centers). However, Harp provides different communication operations to synchronize model data among mappers:
- regroup & allgather (default)
- allreduce
- broadcast & reduce
- push & pull
In a regroup & allgather operation, it first combines the same center from different mappers and redistributes them to mappers by a specified order. After averaging the centers, an allgather operation makes every mapper get a complete copy of the averaged centers.
comm_regroup_allgather(cenTable, pres);
In an allreduce operation, the centers are reduced and copied to every mapper. Then, on each mapper, an average operation is applied to the centers:
comm_allreduce(cenTable, pres);
At the end of each iteration, call printTable to check the clustering result:
// for iteration i, print the first ten centers of the first ten dimensions printTable(cenTable, 10, 10, i);
Step 7: Release Memory and Store Cluster Centers
After all of the iterations, release the memory allocated for Intel DAAL and for the Harp table object. The center values are stored on HDFS as the output:
// free memory and record time cenTable_daal.freeDataMemory(); trainingdata_daal.freeDataMemory(); // Write out the cluster centers if (this.isMaster()) { KMUtil.storeCentroids(this.conf, this.cenDir, cenTable, this.cenVecSize, “output”); } cenTable.release();
Performance Results
The performance for Harp-DAAL is illustrated by the results for four applications<sup>4,5</sup> with different algorithmic features:
- K-means: A dense clustering algorithm with regular memory access
- MF-SGD (Matrix Factorization for Stochastic Gradient Descent): A dense recommendation algorithm with irregular memory access and large model data
- Subgraph Counting: A sparse graph algorithm with irregular memory access
- Latent Dirichlet Allocation (LDA): A sparse, topic modeling algorithm with large model data and irregular memory access
The testbed has two clusters:
- One with Intel Xeon E5 2670 processors and the InfiniBand Interconnect*
- One with Intel Xeon Phi 7250 processors and the Intel® Omni-Path Interconnect
In Figure 3, Harp-DAAL achieves around a 30x speedup over Spark* for K-means on 30 nodes using its highly vectorized kernels from Intel Math Kernel Library, which is part of Intel DAAL. MF-SGD was run on up to 30 nodes and achieved a 3x speedup over NOMAD*, a state-of-the-art MPI C/C++ solution. The benefits come from Harp’s rotation collective operation that accelerates the communication of the big model data in the recommender system.
Harp-DAAL subgraph counting on 16 Intel Xeon E5 processor-based nodes has a 1.5x to 4x speedup over MPI-Fascia for large subtemplates with billion-edged Twitter graph data. The performance improvement comes from node-level pipeline overlapping of computation and communication. Single-node thread concurrency improved by neighbor list partitioning of the graph vertex.
Figure 4 shows that Harp LDA achieves better convergence and speedup over other state-of-the-art MPI implementations such as LightLDA* and NomadLDA*5. This advantage comes from two optimizations for parallel efficiency:
- Harp adopts the rotation computation model for inter-node communication of the latest model, and at the same time utilizes timer control to reduce the overhead of synchronization.
- At the intra-node level, a dynamic scheduling mechanism is developed to mitigate load imbalance.
The current Harp-DAAL system provides 13 distributed data analytics and machine learning algorithms leveraging the local computation kernels like K-means from the Intel DAAL 2018 release. In addition, Harp-DAAL is developing its own data-intensive kernels. This includes the large-scale subgraph counting algorithm given above, which can process a social network Twitter graph with billions of edges and subtemplates of 10 vertices in 15 minutes. The Harp-DAAL framework and machine learning algorithms are publicly accessible5 so you can download the software, explore the tutorials, and apply Harp-DAAL to other data-intensive applications.
References
1. Harp code repository, https://github.com/DSC-SPIDAL/harp
2. Intel DAAL, https://github.com/intel/daal
3. Harp-DAAL tutorial, https://github.com/DSC-SPIDAL/harp/tree/tutorial
4. Langshi Chen et. al, Benchmarking Harp-DAAL: High Performance Hadoop on KNL Clusters, in the
Proceedings of the 10th IEEE International Conference on Cloud Computing (IEEE Cloud 2017), June 25-30, 2017.
5. Bo Peng et. al, HarpLDA+: Optimizing Latent Dirichlet Allocation for Parallel Efficiency, the IEEE Big Data 2017 conference, December 11-14, 2017.
Software and workloads used in performance tests may have been optimized for performance only on Intel microprocessors. Performance tests, such as SYSmark and MobileMark, are measured using specific computer systems, components, software, operations and functions. Any change to any of those factors may cause the results to vary. You should consult other information and performance tests to assist you in fully evaluating your contemplated purchases, including the performance of that product when combined with other products. For more complete information visit http://www.intel.com/performance.
Intel’s compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.