metalus

This project aims to make writing Spark applications easier by abstracting the effort to assemble the driver into reusable steps and pipelines.

View project on GitHub

Documentation Home

Pipeline Drivers

Pipeline drivers are the entry point for any Metalus application. A default driver is provided for batch processing. The pipeline driver chosen requires a DriverSetup to configure the application prior to execution.

Default Pipeline Driver

The DefaultPipelineDriver is provided for most batch processing.

Command line parameters

Required parameters:

  • driverSetupClass - This class will handle all of the initial setup such as building out pipelines, creating the PipelineContext.

Optional Parameters:

  • maxRetryAttempts - [number] The number of times data will attempt to process before failing. Default is 0.
  • terminateAfterFailures - [boolean] After processing has been retried, fail the process. Default is false.
  • streaming-job - [boolean] This flag tells the driver to continually run the execution plan, restarting after each execution. This is used for connector based streaming.

DriverSetup

The DriverSetup is invoked by the chosen driver class with a map containing the application command line parameters from the ‘spark-submit’ command. The DriverSetup will then be responsible for creating the SparkSession, PipelineContext and execution plan. When executing the ‘spark-submit’ class, one application parameter is required, driverSetupClass which is used to initialize the DriverSetup implementation.

This flow demonstrates how the chosen driver interacts with the DriverSetup:

Default Driver Flow

There are no special instructions for creating the SparkSession. Both the SparkSession and SparkConf are required by the PipelineContext.

CredentialProvider

The DriverSetup is responsible for providing a CredentialProvider that may be used by the driver to obtain any required credentials.

Logging Parameters

There are several command line parameters provided to help control the application log levels:

  • logLevel - This parameter will set the log level for all Metalus classes. Standard log4j levels are used. Default is INFO.
  • rootLogLevel - This parameter controls the global logging level. Default is WARN.
  • customLogLevels - This parameter allows controlling log levels for different packages. Example:

shell script --customLogLevels com.test:INFO,com.another.test:DEBUG,org.apache:WARN

handleExecutionResult

This function will be called by the drivers to parse the results and determine if the execution was successful. The base functionality calls this utility.

DriverSetup - Streaming

Using Spark Streaming API, additional drivers provide streaming functionality. As data is consumed, it is converted to a DataFrame and the pipelines are executed to process the data. Application developers will need to create a step that processes the DataFrame to perform additional conversions that may be required before processing with existing steps.

In addition to the basic DriverSetup functions mentioned in the Metalus Pipeline Core, streaming applications should override the refreshExecutionPlan function. This function will be called prior to invoking the execution plan and gives the application a chance to reset any values in the context prior to processing data.

This flow demonstrates how the chosen driver interacts with the DriverSetup:

Streaming Driver Flow

Streaming Data Parsers

The default behavior of the provided streaming drivers when creating a data collection is to populate three fields: topic, key and value. The pipeline is responsible for retrieving the value for each row and parsing. This behavior can be overridden by using the StreamDataParsers trait. This trait provides two functions: canParse and parseRDD. Both functions take the RDD provided by the streaming driver. The canParse function is used by the driver to identify which parser to use when more than one parser is available. The parseRDD function is called to create the DataFrame. Additional parsers are provided by using the streaming-parsers command line parameter and providing a comma separated list containing the fully qualified classname of the parsers to use.