Spline (from Spark lineage) project helps people get insight into data processing performed by Apache Spark.
The project consists of two parts:
- A core library that sits on drivers, capturing data lineages from the jobs being executed by analyzing Spark execution plans
- and a Web UI application that visualizes the stored data lineages.
in your POM file:
<dependency> <groupId>za.co.absa.spline</groupId> <artifactId>spline-core</artifactId> <version>0.3.6</version> </dependency> <dependency> <groupId>za.co.absa.spline</groupId> <artifactId>spline-core-spark-adapter-2.2</artifactId> <!-- For compatibility with Spark 2.3 change above to spline-core-spark-adapter-2.3. --> <version>0.3.6</version> </dependency> <dependency> <groupId>za.co.absa.spline</groupId> <artifactId>spline-persistence-mongo</artifactId> <!-- You can use other types of persistence including your own. --> <!-- See below for details. --> <version>0.3.6</version> </dependency>
in your Spark job:
// given a Spark session ... val sparkSession: SparkSession = ??? // ... enable data lineage tracking with Spline import za.co.absa.spline.core.SparkLineageInitializer._ sparkSession.enableLineageTracking() // ... then run some Dataset computations as usual. // Data lineage of the job will be captured and stored in the // configured Mongo database for further visualization by Spline Web UI
download Spline Web UI executable JAR and run:
java -jar spline-web-0.3.6-exec-war.jar -Dspline.mongodb.url=...
in your browser open localhost:8080 and you will get:
Spline should fill a big gap within Apache Hadoop ecosystem. Spark jobs should’t be treated only as magic black boxes and people should have a chance to understand what happens with their data. Our main focus is to solve the following particular problems:
Regulatory requirement for SA banks (BCBS 239)
By 2020, all South African banks will have to be able to prove how numbers are calculated in their reports to regulatory authority.
Documentation of business logic
Business analysts should get a chance to verify whether Spark jobs were written according to the rules they provided. Moreover, it would be beneficial for them to have an up-to-date documentation where they could refresh their knowledge about a project.
Identification of performance bottlenecks
Our focus is not only business-oriented. We see Spline also as a development tool that should be able to help developers with performance optimization of their Spark jobs.
Setup for your Spark job:
Include Spline core jar into your Spark job classpath (it’s enough to have it in a driver only, executors don’t need it)
Configure database connection properties (see Configuration section)
Enable data lineage tracking on a Spark session before calling any action method:
import za.co.absa.spline.core.SparkLineageInitializer._ sparkSession.enableLineageTracking()
Web UI application:
There are two ways how to run Spline Web UI:
Standalone application (executable JAR)
java -jar spline-web-0.3.6-exec-war.jar -Dspline.mongodb.url=...
and then point your browser to http://localhost:8080.
To change the port number from 8080 to say 1234 add
-httpPort 1234 to the command line.
(for more details see Generated executable jar/war section.
Standard Java web application (WAR)
- In your Java web container (e.g. Tomcat) setup the Spline database connection properties
(either via system environment variables or JVM system properties) in the following format:
- Deploy Spline WAR file to your Java web container (tested on Tomcat 7, but other containers should also work)
Build Spline from the source code
You will need:
mvn install -DskipTests -Pspark-2.3
Spline can persist harvested lineages in various ways. It uses PersistenceFactory to obtain instances of DataLineageReader and DataLineageWriter to persist and access the data lineages. Out of the box Spline supports three types of persisters:
- MongoPersistenceFactory (stores lineages to the MongoDB)
- HdfsPersistenceFactory (stores lineages as a JSON file)
There is also a ParallelCompositeFactory that works as a proxy and delegate work to other persisters. So for example, you can store the lineages to, say, Mongo and HDFS simultaneously.
When enabling data lineage tracking for a Spark session in your Spark job a
SparkConfigurer instance can be passed
as a argument to the
The method signature is the following:
def enableLineageTracking(configurer: SplineConfigurer = new DefaultSplineConfigurer(defaultSplineConfiguration)): SparkSession
DefaultSplineConfigurer looks up the configuration parameters in the given
defaultSplineConfiguration object combines several configuration sources (ordered by priority):
- Hadoop config (
- JVM system properties
spline.propertiesfile in the classpath
Lineage tracking is completely disabled and Spline is unhooked from Spark.
If Spline fails to initialize itself (e.g. wrong configuration, no db connection etc) the Spark application aborts with an error.
Spline will try to initialize itself, but if fails it switches to DISABLED mode allowing the Spark application to proceed normally without Lineage tracking.
||Fully qualified name of the PersistenceFactory implementation to use by Spline||za.co.absa.spline.persistence.mongo.MongoPersistenceFactory|
Mongo Persistence Only
||Mongo connection URL
Composition Factories Only
||Comma separated list of factories to delegate to
Optional: Async Timeout
Depending on your persistence setup and requirements you can consider increasing HTTP request timeout to avoid premature failures (
AsyncRequestTimeoutException). You can increase Tomcat’s global
asyncTimeout property in
conf/servers.xml from default 30 seconds to desired value e.g. to 60 seconds as in example below.
<Connector port="8080" protocol="HTTP/1.1" asyncTimeout="60000" ... />
Sample folder contains some sample Spline enabled Spark jobs.
When the lineage data is captured and stored into the database, it can be visualized and explored via Spline UI Web application.
Sample job 1
val sparkBuilder = SparkSession.builder().appName("Sample Job 2") val spark = sparkBuilder.getOrCreate() // Enable data lineage tracking with Spline import za.co.absa.spline.core.SparkLineageInitializer._ spark.enableLineageTracking() // A business logic of a Spark job ... import spark.implicits._ val sourceDS = spark.read .option("header", "true") .option("inferSchema", "true") .csv("data/input/wikidata.csv") .as("source") .filter($"total_response_size" > 1000) .filter($"count_views" > 10) val domainMappingDS = spark.read .option("header", "true") .option("inferSchema", "true") .csv("data/input/domain.csv") .as("mapping") val joinedDS = sourceDS .join(domainMappingDS, $"domain_code" === $"d_code", "left_outer") .select($"page_title".as("page"), $"d_name".as("domain"), $"count_views") joinedDS.write.mode(SaveMode.Overwrite).parquet("data/results/job1_results")
Release Migration: MongoDb
Please follow instructions in readme file to migrate your MongoDb from your current version to desired version.
Copyright 2017 ABSA Group Limited Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.