Skip to content

Commit

Permalink
[SPARK] Reading Apache Iceberg data from a filepath, instead of a Spa…
Browse files Browse the repository at this point in the history
…rk Catalog, results in no input datasets being present in the OpenLineage event (#2937)

* Implemented functionality that allows lineage to be derived from Iceberg datasets that are not located within the configured Iceberg SparkCatalog

1. Modified the IcebergHandler to accomplish the building of these paths
2. Splits the DataSourceV2ScanRelationInputDatasetBuilder into 2 classes that focus on different parts of the Spark lifecycle.

Regarding (1)

Docker container based tests were created in the SparkIcebergMetadataJsonTest class. These tests launch Spark applications present in the "scala-fixtures" module.

Regarding (2)

These classes are:

1. DataSourceV2ScanRelationOnEndInputDatasetBuilder
2. DataSourceV2ScanRelationOnStartInputDatasetBuilder

The relevant tests were also updated.

Signed-off-by: Damien Hawes <[email protected]>
  • Loading branch information
d-m-h authored Aug 23, 2024
1 parent 9f9b074 commit 3cb3e1c
Show file tree
Hide file tree
Showing 23 changed files with 1,083 additions and 157 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 1,8 @@
# Changelog

## [Unreleased](https://github.com/OpenLineage/OpenLineage/compare/1.20.5...HEAD)
## [Unreleased](https://github.com/OpenLineage/OpenLineage/compare/1.20.4...HEAD)
* **Spark: Lineage for Iceberg datasets that are present outside of Spark's catalog is now present** [`#2937`](https://github.com/OpenLineage/OpenLineage/pull/2937) [@d-m-h](https://github.com/d-m-h)
*Previously, reading Iceberg datasets outside the configured Spark catalog prevented the datasets from being present in the `inputs` property of the `RunEvent`.*

## [1.20.5](https://github.com/OpenLineage/OpenLineage/compare/1.19.0...1.20.5) - 2024-08-23

Expand Down
25 changes: 15 additions & 10 deletions integration/spark/app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 391,8 @@ tasks.register("integrationTest", Test.class) {
excludeTags("iceberg")
}
}
systemProperties.put("test.results.dir", buildDirectory.dir("test-results/${name}/o").get().asFile.absolutePath)
systemProperties.put("test.output.dir", buildDirectory.dir("test-output/${name}").get().asFile.toString())
systemProperties.put("test.results.dir", buildDirectory.dir("test-results/${name}/o").get().asFile.toString())
}

tasks.register("databricksIntegrationTest", Test) {
Expand Down Expand Up @@ -477,25 478,25 @@ List<Dependency> icebergDependencies(String spark, String scala) {

final def registry = [
"3.2.4": [
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.2_${scala}:0.14.0"),
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.2_${scala}:1.4.3"),
dependencies.create("org.scala-lang:scala-library:${scalaVersion}"),
dependencies.create("org.scala-lang:scala-reflect:${scalaVersion}"),
dependencies.create("org.scala-lang.modules:scala-collection-compat_${scala}:2.11.0"),
],
"3.3.4": [
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.3_${scala}:0.14.0"),
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.3_${scala}:1.6.0"),
dependencies.create("org.scala-lang:scala-library:${scalaVersion}"),
dependencies.create("org.scala-lang:scala-reflect:${scalaVersion}"),
dependencies.create("org.scala-lang.modules:scala-collection-compat_${scala}:2.11.0"),
],
"3.4.3": [
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.4_${scala}:1.3.0"),
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.4_${scala}:1.6.0"),
dependencies.create("org.scala-lang:scala-library:${scalaVersion}"),
dependencies.create("org.scala-lang:scala-reflect:${scalaVersion}"),
dependencies.create("org.scala-lang.modules:scala-collection-compat_${scala}:2.11.0"),
],
"3.5.1": [
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.5_${scala}:1.4.0"),
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.5_${scala}:1.6.0"),
dependencies.create("org.scala-lang:scala-library:${scalaVersion}"),
dependencies.create("org.scala-lang:scala-reflect:${scalaVersion}"),
dependencies.create("org.scala-lang.modules:scala-collection-compat_${scala}:2.12.0"),
Expand Down Expand Up @@ -583,30 584,34 @@ List<Dependency> additionalJars(String spark, String scala) {
exclude(group: "org.slf4j")
}),
dependencies.create("org.apache.spark:spark-mllib_${scala}:${spark}", {
exclude(group: "org.slf4j")
transitive = false
}),
],
"3.2.4": [
dependencies.create("org.apache.spark:spark-mllib_${scala}:${spark}", {
exclude(group: "org.slf4j")
transitive = false
}),
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.2_${scala}:1.4.3")
],
"3.3.4": [
dependencies.create("org.apache.spark:spark-mllib_${scala}:${spark}", {
exclude(group: "org.slf4j")
transitive = false
}),
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.3_${scala}:1.6.0")
],
"3.4.3": [
dependencies.create("org.apache.spark:spark-mllib_${scala}:${spark}", {
exclude(group: "org.slf4j")
transitive = false
}),
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.4_${scala}:1.6.0")
],
"3.5.1": [
dependencies.create("org.slf4j:slf4j-api:2.0.10"),
dependencies.create("org.slf4j:slf4j-reload4j:2.0.10"),
dependencies.create("org.apache.spark:spark-mllib_${scala}:${spark}", {
exclude(group: "org.slf4j")
transitive = false
}),
dependencies.create("org.apache.iceberg:iceberg-spark-runtime-3.5_${scala}:1.6.0")
],
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 21,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.CreateReplaceDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -56,7 57,8 @@ public Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> getIn
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new SubqueryAliasInputDatasetBuilder(context))
.add(new MergeIntoCommandEdgeInputDatasetBuilder(context))
.add(new DataSourceV2RelationInputDatasetBuilder(context, datasetFactory));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 18,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.AppendDataDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -55,7 56,8 @@ public Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> getIn
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new SubqueryAliasInputDatasetBuilder(context))
.add(new CreateReplaceInputDatasetBuilder(context))
.add(new MergeIntoCommandEdgeInputDatasetBuilder(context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 18,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.AppendDataDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -54,7 55,8 @@ public Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> getIn
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new SubqueryAliasInputDatasetBuilder(context))
.add(new CreateReplaceInputDatasetBuilder(context))
.add(new SparkExtensionV1InputDatasetBuilder(context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 19,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.CreateReplaceDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -49,7 50,8 @@ public Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> getIn
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2RelationInputDatasetBuilder(context, datasetFactory))
.add(new MergeIntoCommandEdgeInputDatasetBuilder(context))
.add(new SubqueryAliasInputDatasetBuilder(context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 18,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.AppendDataDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -55,7 56,8 @@ public Collection<PartialFunction<Object, List<InputDataset>>> getInputBuilders(
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new SubqueryAliasInputDatasetBuilder(context))
.add(new CreateReplaceInputDatasetBuilder(context))
.add(new SparkExtensionV1InputDatasetBuilder(context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 42,7 @@

@Slf4j
public class SparkContainerUtils {
public static final String SPARK_DOCKER_CONTAINER_WAIT_MESSAGE =
".*ShutdownHookManager - Shutdown hook called.*";
public static final String SPARK_DOCKER_CONTAINER_WAIT_MESSAGE = ".*Shutdown hook called.*";

public static final DockerImageName MOCKSERVER_IMAGE =
DockerImageName.parse("mockserver/mockserver")
Expand Down
Loading

0 comments on commit 3cb3e1c

Please sign in to comment.