metalus

This project aims to make writing Spark applications easier by abstracting the effort to assemble the driver into reusable steps and pipelines.

View project on GitHub

Documentation Home

Pipeline Manager

The pipeline manager is used to retrieve pipelines within an application or step group.

Cached Pipeline Manager

The default implementation of PipelineManager takes a list of pipelines and serves them by id. Additionally, this implementation will attempt to scan the step libraries metadata/pipelines path looking for the pipeline. The pipeline must be in a JSON file with the following naming convention: <pipeline.id>.json

Custom Implementation

Developers may create a custom implementation by implementing the PipelineManager trait and overriding the getPipeline function. Implementation functions that are not able to resolve pipelines should call super.getPipeline as a default to have the classpath will be scanned.

Example File Implementation

This example illustrates how to implement a new PipelineManager that loads pipelines from disk. Walking through the implementation, the new class LocalFilePipelineManager extends PipelineManager with a single constructor parameter named path which indicates where to look for pipeline json. NOTE: Implementations could load pipeline classes instead of JSON based pipelines.

Next the getPipeline function is overridden to look for pipelines in the path provided in the constructor.

class LocalFilePipelineManager(path: String) extends PipelineManager {
  override def getPipeline(id: String): Option[Pipeline] = {
    if (new java.io.File(path, s"$id.json").exists()) {
      val pipelineList = DriverUtils.parsePipelineJson(Source.fromFile(new java.io.File(path, s"$id.json")).mkString)
      if (pipelineList.isDefined && pipelineList.get.nonEmpty) {
        Some(pipelineList.get.head)
      } else {
        None
      }
    } else {
      super.getPipeline(id)
    }
  }
}