View on GitHub

Data Lineage Tracking and Visualization tool for Apache Spark ™

The Spline (from Spark lineage) project helps people get a further insight into the data processing performed by Apache Spark.

The project consists of two parts:


Spline paper

Summary

in your POM file:

<dependency>
    <groupId>za.co.absa.spline</groupId>
    <artifactId>spline-core</artifactId>
    <version>0.3.8</version>
</dependency>
<dependency>
    <groupId>za.co.absa.spline</groupId>
    <artifactId>spline-core-spark-adapter-2.2</artifactId>
    <!-- For compatibility with Spark 2.3 change above to spline-core-spark-adapter-2.3. -->
    <version>0.3.8</version>
</dependency>
<dependency>
    <groupId>za.co.absa.spline</groupId>
    <artifactId>spline-persistence-mongo</artifactId>
    <!-- You can use other types of persistence including your own. -->
    <!-- See below for details. -->
    <version>0.3.8</version>
</dependency>

in your Spark job:

// given a Spark session ...
val sparkSession: SparkSession = ???

// ... enable data lineage tracking with Spline
import za.co.absa.spline.core.SparkLineageInitializer._
sparkSession.enableLineageTracking()

// ... then run some Dataset computations as usual.
// Data lineage of the job will be captured and stored in the
// configured Mongo database for further visualization by Spline Web UI

download Spline Web UI executable JAR and run:

java -jar spline-web-0.3.8-exec-war.jar -Dspline.mongodb.url=...

in your browser open localhost:8080 and you will get:

Motivation

Spline aims to fill a big gap within the Apache Hadoop ecosystem. Spark jobs shouldn’t be treated only as magic black boxes; people should be able to understand what happens with their data. Our main focus is to solve the following particular problems:

Getting started

Usage

Spline is available for use in a number of configurations; within your code or entirely codeless, prebundled by us for ease of use or to be bundled for fine grained dependency management.

Dependencies

Project Integration Options

Code based

Scala

// given a Spark session ...
val sparkSession: SparkSession = ???

// ... enable data lineage tracking with Spline
import za.co.absa.spline.core.SparkLineageInitializer._
sparkSession.enableLineageTracking()

 session.read() ...

Java

import za.co.absa.spline.core.SparkLineageInitializer;
// given a Spark session ...
SparkSession session = ???

// configure Spline to track lineage
SparkLineageInitializer.enableLineageTracking(session);
session.read() ...

Python


# Build project Sample with Shade profile to create Fat-JAR containing all needed dependencies:
  mvn package -P spark-2.3,shade

# Execute pyspark with Fat-JAR on class path:
  pyspark --jars 'target/spline-sample-0.3.8-SNAPSHOT.jar'

# Enable Spline tracking, Inside PySpark :
sc._jvm.za.co.absa.spline.core\
    .SparkLineageInitializer.enableLineageTracking(spark._jsparkSession)

spark.read\ ...

Codeless

object CodelessSparkJob extends SparkApp(
  name = "Codeless Job name",
  // Spark configuration used to register Spline listener for codeless init.
  conf = Seq(("spark.sql.queryExecutionListeners", "za.co.absa.spline.core.listener.QueryExecutionEventHandler"))
  ){
  // A business logic of a spark job ...
  spark.read ...

  }

Bundle Based Integration

Use these dependencies in your POM file.
⚠ Manage your own exclusions

<dependency>
    <groupId>za.co.absa.spline</groupId>
    <artifactId>spline-core</artifactId>
    <version>0.3.8</version>
</dependency>
<dependency>
    <groupId>za.co.absa.spline</groupId>
    <artifactId>spline-core-spark-adapter-2.2</artifactId>
    <!-- For compatibility with Spark 2.3 change above to spline-core-spark-adapter-2.3. -->
    <version>0.3.8</version>
</dependency>
<dependency>
    <groupId>za.co.absa.spline</groupId>
    <artifactId>spline-persistence-mongo</artifactId>
    <!-- You can use other types of persistence including your own. -->
    <!-- See below for details. -->
    <version>0.3.8</version>
</dependency>

Pre-bundled by Us

According to the version of spark you are using you can find our different bundles here : Bundles

<dependency>
  <groupId>za.co.absa.spline</groupId>
  <artifactId>{spline-bundle-version}</artifactId>
  <version>0.3.8</version>
</dependency>

Use maven dependency

Setup for your Spark job:
  1. Include the Spline core jar into your Spark job classpath (it’s enough to have it in a driver only, executors don’t need it)

  2. Configure database connection properties (see Configuration section)

  3. Enable data lineage tracking on a Spark session before calling any action method:

     import za.co.absa.spline.core.SparkLineageInitializer._
     sparkSession.enableLineageTracking()
    
Web UI application:

There are two ways to run the Spline Web UI:

Standalone application (executable JAR)

Execute:
java -jar spline-web-0.3.8-exec-war.jar -Dspline.mongodb.url=... and then point your browser to http://localhost:8080.

To change the port number from 8080 to say 1234 add -httpPort 1234 to the command line.

(For more details see the sectionGenerated executable jar/war.)

Standard Java web application (WAR)
  1. In your Java web container, e.g. Tomcat, set up the Spline database connection properties either via system environment variables or JVM system properties in the following format:
     spline.mongodb.url=mongodb://11.22.33.44/my_lineage_database_name
    
  2. Deploy the Spline WAR file to your Java web container (tested on Tomcat 7, but other containers should also work)

Build Spline from the source code

You will need:

mvn install -DskipTests -Pspark-2.3

Lineage persistence

Spline can persist harvested lineages in various ways. It uses PersistenceFactory to obtain instances of DataLineageReader and DataLineageWriter to persist and access the data lineages. Out of the box Spline supports three types of persisters:

There is also a ParallelCompositeFactory that works as a proxy and delegates work to other persisters. You can store the lineages to multiple sources simultaneously, e.g. Mongo and HDFS.

Configuration

When enabling data lineage tracking for a Spark session in your Spark job, a SparkConfigurer instance can be passed as a argument to the enableLineageTracking() method.

The method signature is as follows:

def enableLineageTracking(configurer: SplineConfigurer = new DefaultSplineConfigurer(defaultSplineConfiguration)): SparkSession

DefaultSplineConfigurer looks up the configuration parameters in the given Configuration object.

defaultSplineConfiguration object combines several configuration sources (ordered by priority):

  1. Hadoop config (core-site.xml)
  2. JVM system properties
  3. spline.properties file in the classpath

Configuration properties

Shared

Property Description Example
spline.mode DISABLED
Lineage tracking is completely disabled and Spline is unhooked from Spark.

REQUIRED
If Spline fails to initialize itself (e.g. wrong configuration, no db connection etc) the Spark application aborts with an error.

BEST_EFFORT (default)
Spline will try to initialize itself, but if fails it switches to DISABLED mode allowing the Spark application to proceed normally without Lineage tracking.
BEST_EFFORT
spline.persistence.factory Fully qualified name of the PersistenceFactory implementation to use by Spline za.co.absa.spline.persistence.mongo.MongoPersistenceFactory

Mongo Persistence Only

Property Description Example
spline.mongodb.url Mongo connection URL
mongodb://1.2.3.4/my_lineage_database

Composition Factories Only

Property Description Example
spline.persistence.composition.factories Comma separated list of factories to delegate to
za.co.absa.spline.persistence.mongo.MongoPersistenceFactory, za.co.absa.spline.persistence.hdfs.HdfsPersistenceFactory

Optional: Async Timeout

Depending on your persistence setup and requirements you can consider increasing the HTTP request timeout to avoid premature failures (AsyncRequestTimeoutException). You can increase Tomcat’s global asyncTimeout property in conf/servers.xml from default 30 seconds to desired value e.g. to 60 seconds as in example below.

<Connector port="8080" protocol="HTTP/1.1" asyncTimeout="60000" ... />

Examples

The sample folder contains some sample Spline enabled Spark jobs.

Sample jobs read data from the /sample/data/input/ folder and write the result into /sample/data/results/

When the lineage data is captured and stored into the database, it can be visualized and explored via the Spline UI Web application.

Sample job 1
val sparkBuilder = SparkSession.builder().appName("Sample Job 2")
val spark = sparkBuilder.getOrCreate()

// Enable data lineage tracking with Spline
import za.co.absa.spline.core.SparkLineageInitializer._
spark.enableLineageTracking()

// A business logic of a Spark job ...
import spark.implicits._

val sourceDS = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("data/input/wikidata.csv")
  .as("source")
  .filter($"total_response_size" > 1000)
  .filter($"count_views" > 10)

val domainMappingDS = spark.read
  .option("header", "true")
  .option("inferSchema", "true")
  .csv("data/input/domain.csv")
  .as("mapping")

val joinedDS = sourceDS
  .join(domainMappingDS, $"domain_code" === $"d_code", "left_outer")
  .select($"page_title".as("page"), $"d_name".as("domain"), $"count_views")

joinedDS.write.mode(SaveMode.Overwrite).parquet("data/results/job1_results")

Release Migration: MongoDb

Please follow the instructions in the readme file here to migrate your MongoDB release from your current version to the desired version.

Contribution

TODO

License

Copyright 2017 ABSA Group Limited

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.