Westfield Case Study

by Zachary Zeus
June 27, 2017

Recently Pentaho announced the release of Pentaho 7.1. Don’t let the point release numbering make you think this is a small release. This is one of the most significant releases of Pentaho Data Integration! With the introduction of the Adaptive Execution Layer (AEL) and Spark, this release leapfrogs the competition for Spark application development!

The goal of AEL is to develop visually once and execute anywhere. AEL will future proof your application from emerging engines. Today you can visually develop application for Pentaho’s native Kettle engine and a Spark cluster. As new technologies emerge AEL will be implemented for other engines so developers don’t need to re-write their application.

The goal of AEL is to develop visually once and execute anywhere.

The goal of AEL is to develop visually once and execute anywhere.

With the initial implementation of AEL with Spark, Pentaho brings the power and ease-of-use of PDI’s visual development environment to Spark. Virtually all PDI steps can run in Spark. This allows a developer to build their entire application on their desktop without having to access and debug a Spark cluster. Once you are done testing on your desktop, you simply point the application to Spark resources and run. This saves a huge amount of development and testing time! (Many vendors claim they can do this, but have you actually tried developing an application on your desktop without a Spark cluster to test on and then move that application to a Spark cluster…good luck with that…not possible with other vendors without major changes to your application!)

AEL-Spark Application Development

One of the major benefits of developing a Spark application with PDI is that you do not need access to a Spark cluster (or local mode installation of Spark) during development time. As a developer, you can build and test your entire Spark application on your desktop with sample data. Once you are done you simply point the application to Spark resources and execute. If you have ever debugged a cluster application you will realize the huge productivity boost this gives a developer.

Let’s use the following simple application to understand how you would develop a PDI application on your desktop and then execute in Spark (may not be real world application, but it will be used to demonstrate various concepts of AEL-Spark):

  1. Main Job
    1. Sets up general data input/output locations.
    2. Initiates transformations to process an load data.
    3. Look/Sort Transformation
      1. Filters the wanted data, enhances the data, and sorts it.
      2. Outputs sorted 
      3. Load RDBMS – Takes the output of the sorted data an load an RDBMS

Main PDI Job FlowMain PDI Job Flow

  • Use variables for all input and out files. In the Set variables job step, create variables that are used by all input and output files steps in the subsequent transformations. This will allow us to easily point to different files when testing on desktop and then switching to Spark cluster.
  • Executes 2 transformations in sequence. During development time, this PDI Job and all of it’s transformations will execute on the desktop.
    • Transformation A: For records that have a Product ID, it will lookup additional product info and sort final results by product name.
    • Transformation B: Loads the error records to a database

1.2 Transformation A: Lookup and Sort

1.2 Transformation A: Lookup and Sort

  • Filters record: Records that do not have product IDs are filtered out and saved in a file separate file.
  • For all records that have Product ID, we lookup the product ID and add additional product information to the row, sort the records by Product Name and store the results out.
  • Once we are done testing on our desktop, this will be the only transformation that will get configured to execute on the Spark cluster.

1.3 Transformation B: Load RDBMS

1.3 Transformation B: Load RDBMS

  • Loads the error records to a database.
  • For this discussion, this will always execute this transformation on the same engine as the client. We will not execute this on Spark because it is not a good idea to connect to a database from hundreds of Spark worker nodes at the same time.

The above PDI jobs and transformations are developed and debugged with a sample data file on a laptop without any access to Spark technology by configuring the following:

  • Use variables for all input and out files. In the Set variables job step, create variables and use them for all input and output file steps in the subsequent transformations. During development, all of these should resolve to a file name on any VFS (Pentaho’s virtual file system) supported source or destination. This includes the following steps:
    • Transformation A:
      • Tex file in: Raw Data
      • Tex file in: Product Info
      • Text file out: Error Data
      • Text file out: Sorted Product Data
    • Transformation B: Text file in: Error Data

Set Run Configuration for the all transformations to Pentaho local (this is default setting):

Run Configuration for the all transformations to Pentaho local

Run Configuration for the all transformations to Pentaho local

Once you have completed testing on your local desktop, you then point the application to your Spark cluster and execute it by making the following changes:

Update all input and output files to reference HDFS locations:

  • Transformation A:
    • All Text file inputs (Raw Data,Product Info):  Set to a file or directory in HDFS. If set to a directory, then all files in the directory will be read in.
    • Text file outputs (Error Data,Sorted Product Data): Set to a directory in HDFS. Spark will create a part file for each spark executor that outputs a file for error data. All of the part files will be stored in the given directory.
  • Text file outputs (Error Data,Sorted Product Data): Set to a directory in HDFS. Spark will create a part file for each spark executor that outputs a file for error data. All of the part files will be stored in the given directory. 
    (Note: this behavior is a bit different then running this running it from local system and then just outputting the data to HDFS. If you were to just output data to HDFS from a transformation that is running in a Kettle engine via Spoon, Pan, Kitchen, or Pentaho server, then you would just get a single file. However, if you right out to HDFS in Spark, it will always create a folder and put part files. This is because Spark will execute multiple instances of the output step in parallel and each instance will write out to a different file in the folder you give).

Once you are ready to execute the application you add a new Run Configuration for Spark for Transformation A by right clicking on the Run Configuration in the View tab and selecting New:

The Spark host URL is the ZooKeeper host and port within the Spark Hadoop cluster.

The Spark host URL is the ZooKeeper host and port within the Spark Hadoop cluster.

The main Pentaho components of AEL-Spark are:

  • PDI Clients
    • This includes Spoon, Kitchen, and PDI Server. Pan cannot run a transformation on AEL-Spark because the Run Configurations are not available in the KTRs because Run Configurations are associated with the Transformation job entry.
    • Clients use a new AEL interface to communicate with remote clusters to execute PDI jobs and transformation. For 7.1, PDI Jobs will execute on PDI client with the existing Kettle engine and transformations that are identified to run on Spark will be sent to AEL-Spark Daemon.
    • A PDI Job can contain a mix of locally executed transformations (via Kettle engine) and Spark executed transformations.
  • AEL-Spark Daemon
    • Currently (in 7.1) there is a single instance of this process/JVM that is run at an edge not of the Hadoop/Spark cluster.
    • On startup, the Daemon registers with a ZooKeeper cluster.  ZooKeeper must be deployed as part of the Spark/Hadoop cluster. Future enhancements will allow you to configure multiple Daemons to address fault tolerance and scalability.
    • The Daemon is responsible for starting and managing AEL-Spark Engines in a separate process/JVM for each PDI transformation that is executed on the Spark Cluster.
    • The Daemon creates a two-way connections to PDI client. This connection is used to relay transformation metrics and logs from AEL-Spark Engine to PDI clients.
    • Daemon logs are local to where daemon is running, not sent back to PDI
  • AEL-Spark Engine
    • Executes a PDI transformation on Spark cluster. An instance for each PDI transformations.
    • Works as a Spark Driver in Spark Client mode to execute a transformation (see Spark Resource Manager and YARN App Models for details on Spark client and driver).
    • Parses PDI transformation and creates an execution plan to determine how each step will be executed (in parallel mode or single instance).
    • The first input step of a transformation is evaluated by the Engine to generate the initial RDD (unless the input isin HDFS…details are given below).

Note: Installation and configuration of AEL-Spark components are beyond the scope of this article. I assume that the user has setup PDI components detailed in the 7.1 docs.

AEL-Spark Execution

Prior to 7.1, there was only one engine supported by Pentaho, the Kettle engine. The Kettle engine performed two major tasks:

  • Loaded plugins for all jobs and transformations.
  • Executed the job entires and transformation steps by using threads and passing data between the entries/steps.

With AEL-Spark, Pentaho has completely re-written the transformation execution engine and data movement so that it loads the same plugins, but uses Spark to execute the plugins and manage the data between the steps.

When you begin executing a PDI Job, each entry in the job is executed in series with the Kettle engine of the PDI Client.  When that Kettle engine encounters a transformation entry that is configured to run on a Spark cluster, the following steps occur for that transformation:

  1. The PDI client connects to ZooKeeper to request an AEL-Spark Daemon.
  2. The PDI connects to the the AEL-Spark Daemon and provides the Daemon the transformation KTR and PDI application environment settings. This connection between the PDI client and AEL-Spark Daemon remains open for the duration of execution of the transformation.
  3. The AEL-Daemon then starts a new AEL-Spark Engine in a new process/JVM. The new AEL-Spark Engine is provided the transformation KTR and PDI application environment settings.
  4. The AEL-Spark Engine creates an execution plan form for the KTR. The Engine decides where to execute each PDI step based on the following characteristics:
    1. The first input step is executed within the AEL-Spark Engine:
      1. If the input step is an HDFS directory or file, then the it is given to Spark to convert it into RDD(s) and distribute the RDD partitions to the cluster.
      2. If the input step is not reading from HDFS (it can be reading from local disk, a DB, or Kafka stream), then AEL-Spark Engine will generate RDD(s) and distribute the partitions of the RDD to the Spark cluster.
    2. All other steps are then executed in one or more Spark worker nodes based:
      1. If a step is not allowed to run in parallel, then Spark will run it on only a singe worker node. This will force all the data to be brought back to that single node.
      2. If a step is allowed to run in parallel, then Spark will execute the plug in on worker nodes where the RDD partitions are cached.Note: The AEL-Spark Engine determines which steps cannot be run in parallel and must execute on a single worker node by looking at a list of plugins defined by the forceCoalesceSteps parameter in the following file on the AEL-Spark Daemon system: [AEL-SparkDaemonInstallDir]/data-integration/system/karaf/etc/org.pentho.pdi,engine.spark.cfg
        The default list only contains EE supported components that cannot be parallelized.  Users can add additional steps they do not want to run in parallel (like some custom or marketplace steps).
  5. As Spark executes all of the plugin’s, the PDI logs are written to YARN logs and sent back to the client via the AEL-Daemon connection to the client. In addition, the AEL-Daemon also collects metrics from the AEL-Spark Engine and sends them back to the client.
Portrait of Maxx Silver
Zachary Zeus

Zachary Zeus is the Founder of BizCubed. He provides the business with more than 20 years' engineering experience and a solid background in providing large financial services with data capability. He maintains a passion for providing engineering solutions to real world problems, lending his considerable experience to enabling people to make better data driven decisions.

More blog posts