metalus

This project aims to make writing Spark applications easier by abstracting the effort to assemble the driver into reusable steps and pipelines.

View project on GitHub

Documentation Home

Pipeline Mapping

Metalus strives to make pipelines reusable by allowing pipeline step parameters to be mapped at runtime. This allows the execution flow of the pipeline to operate independently of data being used by the steps. In order to abstract the pipeline definition in a way that allows reuse without having to duplicate the metadata, Metalus has the concept of pipeline step mapping using a implementation of the PipelineStepMapper class.

This flow demonstrates the mapping flow:

Default Pipeline Mapping FLow

Special characters are allowed in the value of the parameter that the executor will use to determine where to pull the mapped value. The value may be static or dynamic. Below is a list of characters to use when the value should be dynamic:

  • ! - When the value begins with this character, the system will search the globals for the named parameter and pass that value to the step function.
  • $ - When the value begins with this character, the system will search the parameters for the named parameter and pass that value to the step function.
  • ? - When the value begins with this character, the system will search the parameters for the named parameter and pass that value to the step function. Additionally, the value returned will be recursively processed and template replacement performed.
  • @ - When the value begins with this character, the system will search the parameters for the named parameter and pass the primaryReturn value to the step function.
  • # - When the value begins with this character, the system will search the parameters for the named parameter and pass the namedReturns value to the step function. Embedded object mapping may be used here to directly access a single named value.
  • & - When the value begins with this character, the system will search the pipelineManager for the named parameter and pass the pipeline or None to the step function. This is usually used in a step-group.
  • % - When the value begins with this character, the system will search the CredentialProvider on the PipelineContext for the named credential.

The @ and # symbols are shortcuts that assume the value in parameters is a PipelineStepResponse. A special named parameter, LastStepId can be used with these to retrieve the PipelineStepResponse of the last step executed.

In addition to searching the parameters for the current pipeline, the user has the option of specifying a pipelineId in the syntax for @, ? and $ to specify any previous pipeline value. Example: @p1.StepOne

Types

The type attribute on the pipeline step parameter is used for parsing the value/defaultValue when the type cannot be determined or to provide additional information to the mapper.

  • text - The most common type when using mappings. The value will be parsed for special characters.
  • string - The value will be parsed for special characters like the “Text” type, but the toString method of the result will be called.
  • boolean - The value will be parsed as a boolean.
  • integer - The value will be parsed as an int.
  • script - This indicates that the value contains a script and should not be parsed. The value will be returned unaltered.
  • scalascript - This indicates the value contains scala code that will be compiled and run, with the result returned. More information on this syntax can be found here.
  • list - Indicates that the value is a list.
  • object - Indicates that the value contains an object. This should be stored as a map.
  • result - Indicates that this parameter should be matched against the output of a branch step. The value will contain either nothing or the nextStepId.

Additional information about step parameters can be found here.

Alternate Value Mapping

Metalus allows a step parameter mapping to have more than one value. The most common use case is having a single step resolve the different outcomes of a branch step. Depending on the path that was taken, the primary mapped value may not exist. In this case, Metalus will check the next mapped value until it finds a match or throws an error.

Example syntax:

@STEP_ONE || @STEP_TWO || !SOME_GLOBAL || default string

In the example above, Metalus will first attempt tp map the primary output of STEP_ONE, failing to find a value, it will check for the output of STEP_TWO, failing to find that it will search the globals map for SOME_GLOBAL and finally use the provided static text of default string.

Embedded Values

The user has the option to reference properties embedded as top level objects. Given an object (obj) that contains a sub-object (subObj) which contains a name, the user could access the name field using this syntax:

$obj.subObj.name

Here is the object descried as JSON:

{
	"subObj": {
		"name": "Spark"
	}
} 

Arrays and Lists Specific elements from Array and List objects can be extracted using square brackets wrapping an index. Given an object:

{
   "obj": {
      "myList": [1, 2, 3]
   }
}

The middle element could be accessed using this syntax:

$obj.myList[1]

Zero-based indexing is used. An out of bounds index will return None as the result.

Embedded Values and String Concatenation

The mapper also allows special values to be concatenated together in a parameter value. The special value must be wrapped in curly braces “{}”. As an example, given the following string

"some_string-${pipelineId.subObj.name}-another_string"

would return

"some_string-Spark-another_string"

Multiple values may be embedded as long as the resulting value is a string, boolean or number. A return value of an object will log a warning and ignore string concatenation to return the object value.

Escaping Sometimes there is a need to use a mapping character and not have it get mapped. Adding a backslash before the mapping character informs the mapper to ignore that key.

Calling methods In addition to accessing fields, methods can be called using the same syntax:

"!obj.subObj.getName"

This feature is limited to methods without parameters, and is disabled by default. To enable, set the global “extractMethodsEnabled” to “true

JSON Objects

JSON object values may also be embedded as a pipeline step value. Two attributes must be provided in the JSON, className and object. The className must be the fully qualified name of the case class to initialize and it must be on the classpath. object is the JSON object to use to initialize the case class.

Below is the syntax:

{
  "className": "com.acxiom.pipeleine.ParameterTest",
  "object": {
  	"string": "some string",
  	"num": 5
  }
}

List values may be embedded as a pipeline step value. Support for variable expansion is available for maps and objects if the className property has been set.

Syntax for a list of objects:

{
	"className": "com.acxiom.pipeleine.ParameterTest",
	"value": [
		{
			"string": "some string",
			"num": 5
		},
		{
        	"string": "some other string",
        	"num": 10
        }
	]
}

Syntax for a list of maps:

{
	"value": [
		{
			"string": "some string",
			"num": 5
		},
		{
        	"string": "some other string",
        	"num": 10
        }
	]
}

JSON objects, maps and list of maps/objects can use the special characters defined above. This allows referencing dynamic values in predefined objects. Using the application framework, note that globals cannot use @ or # since steps will not have values prior to initialization. Values may use alternate value mapping or embed class name syntax to deserialize map values as classes within a Map.

Parameter Validation

Step parameter type checking can be enabled by providing the passing the option “–validateStepParameterTypes true” as a global parameter. This validation is disabled by default.

Pipeline Context Updates

Once advanced feature of Metalus is to allow steps the ability to update PipelineContext using the secondary returns. More information may be found here.

Reserved Globals

Below is a table of reserved globals with a summary of their usage.

Global Use
lastStepId lastStepId always refers to the last step executed.
pipelineId current executing pipeline id
stepId current executing step
validateStepParameterTypes enable/disable parameter type validation for steps
extractMethodsEnabled enable/disable method support for field extraction
dropNoneFromLists when true, will remove “None” options from inline lists