Documentation Home | Connectors |
Data Connectors
Data Connectors provide an abstraction for loading and writing data using the DataConnectorSteps. This is useful for creating generic pipelines that can used across providers without source/destination knowledge prior to runtime. Each connector has the responsibility to load and write a DataFrame based on the underlying system.
Parameters The following parameters are available to all data connectors:
- name - The name of the connector
- credentialName - The optional credential name to use to authenticate
- credential - The optional credential to use to authenticate
- MongoDataConnector
- JDBCDataConnector
- JSONApiDataConnector
- KinesisDataConnector
- KafkaDataConnector
- HDFSDataConnector
- S3DataConnector
- GCSDataConnector
- BigQueryDataConnector
Traits
BatchDataConnector
This trait signifies that the DataConnector supports batch operations using DataFrameReader/Writers.
StreamingDataConnector
This trait signifies that the DataConnector supports structured streaming operations using DataStreamReader/Writers. Streaming connectors offer a way to use pipelines with Spark Structured Streaming without the need to write new drivers. When designing pipelines for streaming, care must be taken to not inject steps that are more batch oriented such as doing a file copy. When using streaming connectors, the monitor step should be used and the command line parameter streaming-job should be set to true when invoking the Default Pipeline Driver.
FileSystemDataConnector
This trait provides basic implementations of the load and write methods and is meant to be implemented by DataConnectors that will support reading and writing to Hadoop FileSystem implementations. These connectors support both streaming and batch operations, and use the streaming flag from the readOptions to determine which approach to take. When writing, the Dataset’s isStreaming flag is used to automatically select the correct operation. Examples include the HDFSDataConnector, S3DataConnector, and GCSDataConnector classes.
Connectors
MongoDataConnector
This connector provides access to Mongo. Security is handled using the uri or a UserNameCredential. In addition to the standard parameters, the following parameters are available:
- uri - The name connection URI
Scala
val connector = MongoDataConnector("mongodb://127.0.0.1/test", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
Globals JSON
{
"customMongoConnector": {
"className": "com.acxiom.metalus.pipeline.connectors.MongoDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"uri": "mongodb://127.0.0.1/test"
}
}
}
Read/Write support
| Operation | Read | Write | |———–|——-|——-| | Batch | True | True | | Streaming | False | True |
JDBCDataConnector
This connector provides access to JDBC. It supports both structured streaming and batch Datasets. Security is handled using the uri or a UserNameCredential. In addition to the standard parameters, the following parameters are available:
- url - The connection URL
Scala
val connector = JDBCDataConnector("jdbc:derby:memory:test", "table_name", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
Globals JSON
{
"customJDBCConnector": {
"className": "com.acxiom.pipeline.connectors.JDBCDataConnector",
"object": {
"name": "my-jdbc-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"url": "jdbc:derby:memory:test"
}
}
}
Read/Write support
| Operation | Read | Write | |———–|——-|——-| | Batch | True | True | | Streaming | False | True |
JSONApiDataConnector (Experimental)
This connector provides the ability to interact with data in an API. The ApiHandler trait is used to allow extensibility. In addition to the standard parameters, the following parameters are available:
- apiHandler - The ApiHandler is used to handle parsing/writing the data to/from DataFrames
- hostUrl - The url that is hosting data. This doesn’t have to include the path since that can be passed in the load/write functions
- authorizationClass - This is the fully qualified class name to use when authenticating to the API
- allowSelfSignedCertificates - Flag indicating whether self-signed certificates should be accepted from the API
ApiHandler
The ApiHandler is used to handle parsing/writing the data to/from DataFrames. There are two ways to parse data from the JSON, as a list of maps and as a list of list.
Scala
val connector = JSONApiDataConnector(apiHandler, "my-connector", Some("my-credential-name-for-secrets-manager"), None)
Globals JSON
{
"jsonApiConnector": {
"className": "com.acxiom.pipeline.connectors.JSONApiDataConnector",
"object": {
"apiHandler": {
"jsonDocumentPath": "location.within.json.to.fetch.rows",
"useRowArray": true,
"hostUrl": "https://localhost:8080/",
"authorizationClass": "com.acxiom.pipeline.api.BasicAuthorization",
"allowSelfSignedCertificates": true,
"schema": {
"attributes": [
{
"name": "column1",
"dataType": {
"baseType": "string"
}
},
{
"name": "column2",
"dataType": {
"baseType": "integer"
}
},
{
"name": "column3",
"dataType": {
"baseType": "string"
}
}
]
}
},
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
Read/Write support
| Operation | Read | Write | |———–|——-|——-| | Batch | True | True | | Streaming | False | True |
KinesisDataConnector
This connector provides access to Kinesis. When using the connector to write a streaming DataFrame to Kinesis, the checkpointLocation option is required since it cannot be derived. In addition to the standard parameters, the following parameters are available:
- streamName - The name of the Kinesis stream.
- region - The region containing the Kinesis stream
- partitionKey - The optional static partition key to use
- partitionKeyIndex - The optional field index in the DataFrame row containing the value to use as the partition key
- separator - The field separator to use when formatting the row data
- initialPosition - The starting point to begin reading data (trim_horizon, latest, at_timestamp) from the shard. trim_horizon and latest can be passed as strings. at_timestamp needs to be a json object. An example is provided below. Default is trim_horizon
Below is an example setup that expects a secrets manager credential provider:
Scala
val connector = KinesisDataConnector("stream-name", "us-east-1", None, Some(15), ",",
"{\"at_timestamp\": \"06/25/2020 10:23:45 PDT\", \"format\": \"MM/dd/yyyy HH:mm:ss ZZZ\"}", "my-connector",
Some("my-credential-name-for-secrets-manager"))
Globals JSON
{
"kinesisConnector": {
"className": "com.acxiom.aws.pipeline.connectors.KinesisDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"streamName": "stream-name",
"region": "us-east-1",
"separator": ",",
"initialPosition": "{\"at_timestamp\": \"06/25/2020 10:23:45 PDT\", \"format\": \"MM/dd/yyyy HH:mm:ss ZZZ\"}"
}
}
}
Read/Write support
| Operation | Read | Write | |———–|——-|——-| | Batch | False | False | | Streaming | True | True |
KafkaDataConnector
This connector provides access to Kinesis. In addition to the standard parameters, the following parameters are available:
- topics - The name of the Kinesis stream.
- kafkaNodes - The region containing the Kinesis stream
- key - The optional static key to use
- keyField - The optional field name in the DataFrame row containing the value to use as the key
- separator - The field separator to use when formatting the row data
Below is an example setup that expects a secrets manager credential provider:
Scala
val connector = KafkaDataConnector("topic-name1,topic-name2", "host1:port1,host2:port2", "message-key", None,
"my-connector", Some("my-credential-name-for-secrets-manager"))
Globals JSON
{
"kafkaConnector": {
"className": "com.acxiom.kafka.pipeline.connectors.KafkaDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"topics": "topic-name1,topic-name2",
"kafkaNodes": "host1:port1,host2:port2",
"key": "message-key"
}
}
}
Read/Write support
| Operation | Read | Write | |———–|——-|——-| | Batch | False | False | | Streaming | True | True |
HDFSDataConnector
This connector provides access to HDFS. The credentialName and credential parameters are not used in this implementation, instead relying on the permissions of the cluster. Below is an example setup:
Scala
val connector = HDFSDataConnector("my-connector", None, None)
Globals JSON
{
"myConnector": {
"className": "com.acxiom.pipeline.connectors.HDFSDataConnector",
"object": {
"name": "my-connector"
}
}
}
Read/Write support
| Operation | Read | Write | |———–|——-|——-| | Batch | True | True | | Streaming | True | True |
S3DataConnector
This connector provides access to S3. Below is an example setup that expects a secrets manager credential provider:
Scala
val connector = S3DataConnector("my-connector", Some("my-credential-name-for-secrets-manager"), None)
Globals JSON
{
"myS3Connector": {
"className": "com.acxiom.aws.pipeline.connectors.S3DataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
GCSDataConnector
This connector provides access to GCS. The source parameter of the load function can take multiple paths by providing a comma separated string. Below is an example setup that expects a secrets manager credential provider:
Scala
val connector = GCSDataConnector("my-connector", Some("my-credential-name-for-secrets-manager"), None)
Globals JSON
{
"myGCSConnector": {
"className": "com.acxiom.gcp.pipeline.connectors.GCSDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager"
}
}
}
Read/Write support
| Operation | Read | Write | |———–|——-|——-| | Batch | True | True | | Streaming | True | True |
BigQueryDataConnector
This connector provides access to BigQuery. When writing a streaming DataFrame, this connector will set the saveMode to Append. The checkpointLocation is required when streaming and will default to the tempWriteBucket with an object name of streaming_checkpoints_${table.replaceAll(“(?U)[^\w\._]+”, “_”)}.
Below is an example setup that expects a secrets manager credential provider:
Scala
val connector = BigQueryDataConnector("temp-bucket-name", "my-connector", Some("my-credential-name-for-secrets-manager"), None)
Globals JSON
{
"bigQueryConnector": {
"className": "com.acxiom.gcp.pipeline.connectors.BigQueryDataConnector",
"object": {
"name": "my-connector",
"credentialName": "my-credential-name-for-secrets-manager",
"tempWriteBucket": "temp-bucket-name"
}
}
}
Read/Write support
| Operation | Read | Write | |———–|——-|——-| | Batch | True | True | | Streaming | True | True |