From 891d5fde085572f9143c44c4ac38c308fa74b0a8 Mon Sep 17 00:00:00 2001 From: Damien Hawes Date: Tue, 20 Aug 2024 13:38:52 +0200 Subject: [PATCH] Split the DataSourceV2ScanRelationInputDatasetBuilder into two classes, focused on different Spark lifecycle events These are: 1. DataSourceV2ScanRelationOnEndInputDatasetBuilder 2. DataSourceV2ScanRelationOnStartInputDatasetBuilder Also refactored the tests Signed-off-by: Damien Hawes --- .../Spark32DatasetBuilderFactory.java | 6 +- .../Spark34DatasetBuilderFactory.java | 6 +- .../Spark35DatasetBuilderFactory.java | 6 +- .../Spark3DatasetBuilderFactory.java | 6 +- .../Spark40DatasetBuilderFactory.java | 6 +- .../agent/SparkIcebergMetadataJsonTest.java | 4 +- .../spark/agent/SparkStreamingTest.java | 13 ++- .../agent/util/OpenLineageHttpHandler.java | 24 ++--- .../spark/agent/util/StatefulHttpServer.java | 2 +- ...2ScanRelationOnEndInputDatasetBuilder.java | 57 +++++++++++ ...anRelationOnStartInputDatasetBuilder.java} | 32 +++--- .../DatasetVersionDatasetFacetUtils.java | 7 +- ...V2ScanRelationInputDatasetBuilderTest.java | 70 ------------- ...nRelationOnEndInputDatasetBuilderTest.java | 97 +++++++++++++++++++ ...elationOnStartInputDatasetBuilderTest.java | 96 ++++++++++++++++++ 15 files changed, 306 insertions(+), 126 deletions(-) create mode 100644 integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnEndInputDatasetBuilder.java rename integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/{DataSourceV2ScanRelationInputDatasetBuilder.java => DataSourceV2ScanRelationOnStartInputDatasetBuilder.java} (51%) delete mode 100644 integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationInputDatasetBuilderTest.java create mode 100644 integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnEndInputDatasetBuilderTest.java create mode 100644 integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnStartInputDatasetBuilderTest.java diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark32DatasetBuilderFactory.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark32DatasetBuilderFactory.java index cd04fde518..e44f57204e 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark32DatasetBuilderFactory.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark32DatasetBuilderFactory.java @@ -21,7 +21,8 @@ import io.openlineage.spark3.agent.lifecycle.plan.CreateReplaceDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder; -import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder; @@ -56,7 +57,8 @@ public Collection>> getIn .add(new LogicalRelationDatasetBuilder(context, datasetFactory, true)) .add(new InMemoryRelationInputDatasetBuilder(context)) .add(new CommandPlanVisitor(context)) - .add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory)) .add(new SubqueryAliasInputDatasetBuilder(context)) .add(new MergeIntoCommandEdgeInputDatasetBuilder(context)) .add(new DataSourceV2RelationInputDatasetBuilder(context, datasetFactory)); diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark34DatasetBuilderFactory.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark34DatasetBuilderFactory.java index 1a527a3b72..71d328f266 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark34DatasetBuilderFactory.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark34DatasetBuilderFactory.java @@ -18,7 +18,8 @@ import io.openlineage.spark3.agent.lifecycle.plan.AppendDataDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder; -import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder; @@ -55,7 +56,8 @@ public Collection>> getIn .add(new LogicalRelationDatasetBuilder(context, datasetFactory, true)) .add(new InMemoryRelationInputDatasetBuilder(context)) .add(new CommandPlanVisitor(context)) - .add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory)) .add(new SubqueryAliasInputDatasetBuilder(context)) .add(new CreateReplaceInputDatasetBuilder(context)) .add(new MergeIntoCommandEdgeInputDatasetBuilder(context)) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark35DatasetBuilderFactory.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark35DatasetBuilderFactory.java index 708e86fa3a..798a002c67 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark35DatasetBuilderFactory.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark35DatasetBuilderFactory.java @@ -18,7 +18,8 @@ import io.openlineage.spark3.agent.lifecycle.plan.AppendDataDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder; -import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder; @@ -54,7 +55,8 @@ public Collection>> getIn .add(new LogicalRelationDatasetBuilder(context, datasetFactory, true)) .add(new InMemoryRelationInputDatasetBuilder(context)) .add(new CommandPlanVisitor(context)) - .add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory)) .add(new SubqueryAliasInputDatasetBuilder(context)) .add(new CreateReplaceInputDatasetBuilder(context)) .add(new SparkExtensionV1InputDatasetBuilder(context)) diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark3DatasetBuilderFactory.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark3DatasetBuilderFactory.java index 1798da0513..64cc6d40dc 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark3DatasetBuilderFactory.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark3DatasetBuilderFactory.java @@ -19,7 +19,8 @@ import io.openlineage.spark3.agent.lifecycle.plan.CreateReplaceDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder; -import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder; @@ -49,7 +50,8 @@ public Collection>> getIn .add(new LogicalRelationDatasetBuilder(context, datasetFactory, true)) .add(new InMemoryRelationInputDatasetBuilder(context)) .add(new CommandPlanVisitor(context)) - .add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory)) .add(new DataSourceV2RelationInputDatasetBuilder(context, datasetFactory)) .add(new MergeIntoCommandEdgeInputDatasetBuilder(context)) .add(new SubqueryAliasInputDatasetBuilder(context)); diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark40DatasetBuilderFactory.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark40DatasetBuilderFactory.java index 823c604b33..b0efa4d0d8 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark40DatasetBuilderFactory.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/Spark40DatasetBuilderFactory.java @@ -18,7 +18,8 @@ import io.openlineage.spark3.agent.lifecycle.plan.AppendDataDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder; -import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder; +import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder; import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder; @@ -55,7 +56,8 @@ public Collection>> getInputBuilders( .add(new LogicalRelationDatasetBuilder(context, datasetFactory, true)) .add(new InMemoryRelationInputDatasetBuilder(context)) .add(new CommandPlanVisitor(context)) - .add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory)) + .add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory)) .add(new SubqueryAliasInputDatasetBuilder(context)) .add(new CreateReplaceInputDatasetBuilder(context)) .add(new SparkExtensionV1InputDatasetBuilder(context)) diff --git a/integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkIcebergMetadataJsonTest.java b/integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkIcebergMetadataJsonTest.java index 8e975be7b0..b389ccbd37 100644 --- a/integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkIcebergMetadataJsonTest.java +++ b/integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkIcebergMetadataJsonTest.java @@ -111,7 +111,7 @@ void beforeEach() throws IOException { @AfterEach void afterEach() throws IOException { server.close(); - server = null; + server = null; // NOPMD } @AfterAll @@ -247,7 +247,7 @@ void readIcebergMetadataJsonOutsideConfiguredCatalog() { } @Test - void readIcebergMetadataJsonWithoutAConfiguredIcebergCatalog() throws IOException { + void readIcebergMetadataJsonWithoutAConfiguredIcebergCatalog() { final String testName = "read_iceberg_metadata_json_without_a_configured_iceberg_catalog"; Map props = new TreeMap<>(Comparator.naturalOrder()); props.put("spark.app.name", testName); diff --git a/integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkStreamingTest.java b/integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkStreamingTest.java index 35e0c151f2..9f2281f1f5 100644 --- a/integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkStreamingTest.java +++ b/integration/spark/app/src/test/java/io/openlineage/spark/agent/SparkStreamingTest.java @@ -192,7 +192,7 @@ void testKafkaSourceToKafkaSink() throws TimeoutException, StreamingQueryExcepti Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> spark.sparkContext().isStopped()); List events = - handler.eventMap().getOrDefault("test_kafka_source_to_kafka_sink", new ArrayList<>()); + handler.getEventsMap().getOrDefault("test_kafka_source_to_kafka_sink", new ArrayList<>()); List sqlEvents = events.stream() @@ -281,7 +281,7 @@ void testKafkaSourceToBatchSink() throws TimeoutException, StreamingQueryExcepti .start(); streamingQuery.awaitTermination(Duration.ofSeconds(20).toMillis()); - List events = handler.eventMap().get("test_kafka_source_to_batch_sink"); + List events = handler.getEventsMap().get("test_kafka_source_to_batch_sink"); assertThat(events).isNotEmpty(); @@ -357,7 +357,7 @@ void testKafkaSourceToJdbcBatchSink() streamingQuery.awaitTermination(Duration.ofSeconds(20).toMillis()); - List events = handler.eventMap().get("test_kafka_source_to_jdbc_batch_sink"); + List events = handler.getEventsMap().get("test_kafka_source_to_jdbc_batch_sink"); assertTrue(events.size() > 1); @@ -418,7 +418,9 @@ void testKafkaClusterResolveNamespace() .awaitTermination(Duration.ofSeconds(10).toMillis()); List events = - handler.eventMap().getOrDefault("test_kafka_cluster_resolve_namespace", new ArrayList<>()); + handler + .getEventsMap() + .getOrDefault("test_kafka_cluster_resolve_namespace", new ArrayList<>()); assertTrue(events.stream().anyMatch(x -> !x.getInputs().isEmpty())); @@ -456,7 +458,8 @@ void readFromCsvFilesInAStreamingMode() .start() .awaitTermination(Duration.ofSeconds(10).toMillis()); - List events = handler.eventMap().get("test_read_from_csv_files_in_a_streaming_mode"); + List events = + handler.getEventsMap().get("test_read_from_csv_files_in_a_streaming_mode"); List csvInputEventsUsingStreaming = events.stream().filter(x -> !x.getInputs().isEmpty()).collect(Collectors.toList()); diff --git a/integration/spark/app/src/test/java/io/openlineage/spark/agent/util/OpenLineageHttpHandler.java b/integration/spark/app/src/test/java/io/openlineage/spark/agent/util/OpenLineageHttpHandler.java index 6d7bb87c6a..940aa402ed 100644 --- a/integration/spark/app/src/test/java/io/openlineage/spark/agent/util/OpenLineageHttpHandler.java +++ b/integration/spark/app/src/test/java/io/openlineage/spark/agent/util/OpenLineageHttpHandler.java @@ -21,10 +21,12 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import lombok.Getter; +@Getter public final class OpenLineageHttpHandler implements HttpHandler { - private final List eventsContainer = new ArrayList<>(); - private final Map> events = new HashMap<>(); + private final List events = new ArrayList<>(); + private final Map> eventsMap = new HashMap<>(); @Override public void handle(HttpExchange exchange) throws IOException { @@ -33,7 +35,7 @@ public void handle(HttpExchange exchange) throws IOException { BufferedReader br = new BufferedReader(isr); String value = br.readLine(); - eventsContainer.add(value); + events.add(value); RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(value); String jobName = runEvent.getJob().getName(); @@ -46,11 +48,11 @@ public void handle(HttpExchange exchange) throws IOException { String jobNameShortString = jobNameShort.get(); - if (!events.containsKey(jobNameShortString)) { - events.put(jobNameShortString, new ArrayList<>()); + if (!eventsMap.containsKey(jobNameShortString)) { + eventsMap.put(jobNameShortString, new ArrayList<>()); } - events.get(jobNameShortString).add(runEvent); + eventsMap.get(jobNameShortString).add(runEvent); exchange.sendResponseHeaders(200, 0); try (Writer writer = @@ -59,16 +61,8 @@ public void handle(HttpExchange exchange) throws IOException { } } - public List events() { - return eventsContainer; - } - - public Map> eventMap() { - return events; - } - public void clear() { - eventsContainer.clear(); events.clear(); + eventsMap.clear(); } } diff --git a/integration/spark/app/src/test/java/io/openlineage/spark/agent/util/StatefulHttpServer.java b/integration/spark/app/src/test/java/io/openlineage/spark/agent/util/StatefulHttpServer.java index e8365270f4..da9bdbce22 100644 --- a/integration/spark/app/src/test/java/io/openlineage/spark/agent/util/StatefulHttpServer.java +++ b/integration/spark/app/src/test/java/io/openlineage/spark/agent/util/StatefulHttpServer.java @@ -45,7 +45,7 @@ public static StatefulHttpServer create(String path, OpenLineageHttpHandler hand } public List events() { - return new ArrayList<>(handler.events()); + return new ArrayList<>(handler.getEvents()); } public String getHost() { diff --git a/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnEndInputDatasetBuilder.java b/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnEndInputDatasetBuilder.java new file mode 100644 index 0000000000..f81665c05e --- /dev/null +++ b/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnEndInputDatasetBuilder.java @@ -0,0 +1,57 @@ +/* +/* Copyright 2018-2024 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark3.agent.lifecycle.plan; + +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineage.InputDataset; +import io.openlineage.spark.api.AbstractQueryPlanInputDatasetBuilder; +import io.openlineage.spark.api.DatasetFactory; +import io.openlineage.spark.api.OpenLineageContext; +import io.openlineage.spark3.agent.utils.DataSourceV2RelationDatasetExtractor; +import java.util.List; +import java.util.Objects; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.scheduler.SparkListenerEvent; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; + +@Slf4j +public final class DataSourceV2ScanRelationOnEndInputDatasetBuilder + extends AbstractQueryPlanInputDatasetBuilder { + private final DatasetFactory factory; + + public DataSourceV2ScanRelationOnEndInputDatasetBuilder( + OpenLineageContext context, DatasetFactory factory) { + super(context, true); + this.factory = Objects.requireNonNull(factory, "parameter: factory"); + } + + @Override + public boolean isDefinedAt(SparkListenerEvent event) { + return event instanceof SparkListenerSQLExecutionEnd; + } + + @Override + protected boolean isDefinedAtLogicalPlan(LogicalPlan plan) { + return plan instanceof DataSourceV2ScanRelation; + } + + @Override + public List apply(DataSourceV2ScanRelation plan) { + DataSourceV2Relation relation = plan.relation(); + OpenLineage.DatasetFacetsBuilder datasetFacetsBuilder = + context.getOpenLineage().newDatasetFacetsBuilder(); + return DataSourceV2RelationDatasetExtractor.extract( + factory, context, relation, datasetFacetsBuilder); + } + + @Override + public String toString() { + return this.getClass().getSimpleName(); + } +} diff --git a/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationInputDatasetBuilder.java b/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnStartInputDatasetBuilder.java similarity index 51% rename from integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationInputDatasetBuilder.java rename to integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnStartInputDatasetBuilder.java index df0e29d1aa..5b5038cb85 100644 --- a/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationInputDatasetBuilder.java +++ b/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnStartInputDatasetBuilder.java @@ -6,53 +6,45 @@ package io.openlineage.spark3.agent.lifecycle.plan; import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineage.InputDataset; import io.openlineage.spark.api.AbstractQueryPlanInputDatasetBuilder; import io.openlineage.spark.api.DatasetFactory; import io.openlineage.spark.api.OpenLineageContext; import io.openlineage.spark3.agent.utils.DataSourceV2RelationDatasetExtractor; import io.openlineage.spark3.agent.utils.DatasetVersionDatasetFacetUtils; import java.util.List; +import java.util.Objects; import lombok.extern.slf4j.Slf4j; import org.apache.spark.scheduler.SparkListenerEvent; import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation; -import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart; @Slf4j -public class DataSourceV2ScanRelationInputDatasetBuilder +public final class DataSourceV2ScanRelationOnStartInputDatasetBuilder extends AbstractQueryPlanInputDatasetBuilder { + private final DatasetFactory factory; - private final DatasetFactory factory; - - public DataSourceV2ScanRelationInputDatasetBuilder( - OpenLineageContext context, DatasetFactory factory) { + public DataSourceV2ScanRelationOnStartInputDatasetBuilder( + OpenLineageContext context, DatasetFactory factory) { super(context, true); - this.factory = factory; + this.factory = Objects.requireNonNull(factory, "parameter: factory"); } @Override public boolean isDefinedAt(SparkListenerEvent event) { - /* (2024-08-15) I implemented this method because it deferred to super.isDefinedAt(SparkListenerEvent), - which only looks at SparkListenerJobStart and SparkListenerSQLExecutionStart. However, due to - https://github.com/OpenLineage/OpenLineage/issues/2935, openlineage-spark was not adding the actual - input dataset, and debugging the code eventually led me to this point. For some reason, the - SparkListenerSQLExecutionStart event didn't have the necessary details to create an ExecutionContext. - Therefore, this code was never evaluated for the SparkListenerSQLExecutionStart. I'm keeping the - SparkListenerSQLExecutionStart defined here, just in case there is a code path that uses it.*/ - return event instanceof SparkListenerSQLExecutionStart - || event instanceof SparkListenerSQLExecutionEnd; + return event instanceof SparkListenerSQLExecutionStart; } @Override - public boolean isDefinedAtLogicalPlan(LogicalPlan logicalPlan) { - return logicalPlan instanceof DataSourceV2ScanRelation; + protected boolean isDefinedAtLogicalPlan(LogicalPlan plan) { + return plan instanceof DataSourceV2ScanRelation; } @Override - public List apply(DataSourceV2ScanRelation scanRelation) { - DataSourceV2Relation relation = scanRelation.relation(); + public List apply(DataSourceV2ScanRelation plan) { + DataSourceV2Relation relation = plan.relation(); OpenLineage.DatasetFacetsBuilder datasetFacetsBuilder = context.getOpenLineage().newDatasetFacetsBuilder(); diff --git a/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/utils/DatasetVersionDatasetFacetUtils.java b/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/utils/DatasetVersionDatasetFacetUtils.java index d3e7776a50..004041da2d 100644 --- a/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/utils/DatasetVersionDatasetFacetUtils.java +++ b/integration/spark/spark3/src/main/java/io/openlineage/spark3/agent/utils/DatasetVersionDatasetFacetUtils.java @@ -21,7 +21,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; @Slf4j -public class DatasetVersionDatasetFacetUtils { +public final class DatasetVersionDatasetFacetUtils { + private DatasetVersionDatasetFacetUtils() {} private static final String DELTA = "delta"; @@ -31,13 +32,13 @@ public class DatasetVersionDatasetFacetUtils { public static Optional extractVersionFromDataSourceV2Relation( OpenLineageContext context, DataSourceV2Relation table) { if (table.identifier().isEmpty()) { - log.warn("Couldn't find identifier for dataset in plan " + table); + log.warn("Couldn't find identifier for dataset in plan {}", table); return Optional.empty(); } Identifier identifier = table.identifier().get(); if (table.catalog().isEmpty() || !(table.catalog().get() instanceof TableCatalog)) { - log.warn("Couldn't find catalog for dataset in plan " + table); + log.warn("Couldn't find catalog for dataset in plan {}", table); return Optional.empty(); } TableCatalog tableCatalog = (TableCatalog) table.catalog().get(); diff --git a/integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationInputDatasetBuilderTest.java b/integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationInputDatasetBuilderTest.java deleted file mode 100644 index 1503e89da7..0000000000 --- a/integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationInputDatasetBuilderTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/* -/* Copyright 2018-2024 contributors to the OpenLineage project -/* SPDX-License-Identifier: Apache-2.0 -*/ - -package io.openlineage.spark3.agent.lifecycle.plan; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.mockStatic; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.when; - -import io.openlineage.client.OpenLineage; -import io.openlineage.spark.api.DatasetFactory; -import io.openlineage.spark.api.OpenLineageContext; -import io.openlineage.spark3.agent.utils.DataSourceV2RelationDatasetExtractor; -import io.openlineage.spark3.agent.utils.DatasetVersionDatasetFacetUtils; -import java.util.List; -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; -import org.mockito.MockedStatic; - -class DataSourceV2ScanRelationInputDatasetBuilderTest { - - OpenLineage openLineage = mock(OpenLineage.class); - OpenLineageContext context = mock(OpenLineageContext.class); - DatasetFactory factory = mock(DatasetFactory.class); - DataSourceV2ScanRelationInputDatasetBuilder builder = - new DataSourceV2ScanRelationInputDatasetBuilder(context, factory); - - @Test - void testIsDefinedAt() { - Assertions.assertFalse(builder.isDefinedAtLogicalPlan(mock(LogicalPlan.class))); - Assertions.assertTrue(builder.isDefinedAtLogicalPlan(mock(DataSourceV2ScanRelation.class))); - } - - @Test - void testApply() { - OpenLineage.DatasetFacetsBuilder datasetFacetsBuilder = - mock(OpenLineage.DatasetFacetsBuilder.class); - List datasets = mock(List.class); - DataSourceV2ScanRelation scanRelation = mock(DataSourceV2ScanRelation.class); - DataSourceV2Relation relation = mock(DataSourceV2Relation.class); - - when(openLineage.newDatasetFacetsBuilder()).thenReturn(datasetFacetsBuilder); - when(context.getOpenLineage()).thenReturn(openLineage); - when(scanRelation.relation()).thenReturn(relation); - - try (MockedStatic planUtils3MockedStatic = - mockStatic(DataSourceV2RelationDatasetExtractor.class)) { - try (MockedStatic facetUtilsMockedStatic = - mockStatic(DatasetVersionDatasetFacetUtils.class)) { - when(DataSourceV2RelationDatasetExtractor.extract( - factory, context, relation, datasetFacetsBuilder)) - .thenReturn(datasets); - - Assertions.assertEquals(datasets, builder.apply(scanRelation)); - - facetUtilsMockedStatic.verify( - () -> - DatasetVersionDatasetFacetUtils.includeDatasetVersion( - context, datasetFacetsBuilder, relation), - times(1)); - } - } - } -} diff --git a/integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnEndInputDatasetBuilderTest.java b/integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnEndInputDatasetBuilderTest.java new file mode 100644 index 0000000000..25291a363c --- /dev/null +++ b/integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnEndInputDatasetBuilderTest.java @@ -0,0 +1,97 @@ +/* +/* Copyright 2018-2024 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark3.agent.lifecycle.plan; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineage.InputDataset; +import io.openlineage.spark.api.DatasetFactory; +import io.openlineage.spark.api.OpenLineageContext; +import io.openlineage.spark3.agent.utils.DataSourceV2RelationDatasetExtractor; +import io.openlineage.spark3.agent.utils.DatasetVersionDatasetFacetUtils; +import java.util.List; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +class DataSourceV2ScanRelationOnEndInputDatasetBuilderTest { + + private final OpenLineage openLineage = mock(OpenLineage.class); + private final OpenLineageContext context = mock(OpenLineageContext.class); + private final DatasetFactory factory = mock(DatasetFactory.class); + private final DataSourceV2ScanRelationOnEndInputDatasetBuilder builder = + new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, factory); + + @Test + void testIsDefinedAt() { + Assertions.assertThat(builder) + .returns(true, b -> b.isDefinedAt(mock(SparkListenerSQLExecutionEnd.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerSQLExecutionStart.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerApplicationStart.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerApplicationEnd.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerStageSubmitted.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerStageCompleted.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerTaskStart.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerTaskEnd.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerJobStart.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerJobEnd.class))); + } + + @Test + void testIsDefinedAtLogicalPlan() { + Assertions.assertThat(builder) + .returns(false, b -> b.isDefinedAtLogicalPlan(mock(LogicalPlan.class))) + .returns(true, b -> b.isDefinedAtLogicalPlan(mock(DataSourceV2ScanRelation.class))); + } + + @Test + void testApply() { + OpenLineage.DatasetFacetsBuilder datasetFacetsBuilder = + mock(OpenLineage.DatasetFacetsBuilder.class); + List datasets = mock(List.class); + DataSourceV2ScanRelation scanRelation = mock(DataSourceV2ScanRelation.class); + DataSourceV2Relation relation = mock(DataSourceV2Relation.class); + + when(openLineage.newDatasetFacetsBuilder()).thenReturn(datasetFacetsBuilder); + when(context.getOpenLineage()).thenReturn(openLineage); + when(scanRelation.relation()).thenReturn(relation); + + try (MockedStatic ignored = + mockStatic(DataSourceV2RelationDatasetExtractor.class)) { + try (MockedStatic facetUtilsMockedStatic = + mockStatic(DatasetVersionDatasetFacetUtils.class)) { + when(DataSourceV2RelationDatasetExtractor.extract( + factory, context, relation, datasetFacetsBuilder)) + .thenReturn(datasets); + + Assertions.assertThat(builder.apply(scanRelation)).isEqualTo(datasets); + + facetUtilsMockedStatic.verify( + () -> + DatasetVersionDatasetFacetUtils.includeDatasetVersion( + context, datasetFacetsBuilder, relation), + times(0)); + } + } + } +} diff --git a/integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnStartInputDatasetBuilderTest.java b/integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnStartInputDatasetBuilderTest.java new file mode 100644 index 0000000000..95add2c238 --- /dev/null +++ b/integration/spark/spark3/src/test/java/io/openlineage/spark3/agent/lifecycle/plan/DataSourceV2ScanRelationOnStartInputDatasetBuilderTest.java @@ -0,0 +1,96 @@ +/* +/* Copyright 2018-2024 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark3.agent.lifecycle.plan; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.api.DatasetFactory; +import io.openlineage.spark.api.OpenLineageContext; +import io.openlineage.spark3.agent.utils.DataSourceV2RelationDatasetExtractor; +import io.openlineage.spark3.agent.utils.DatasetVersionDatasetFacetUtils; +import java.util.List; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; + +class DataSourceV2ScanRelationOnStartInputDatasetBuilderTest { + + private final OpenLineage openLineage = mock(OpenLineage.class); + private final OpenLineageContext context = mock(OpenLineageContext.class); + private final DatasetFactory factory = mock(DatasetFactory.class); + private final DataSourceV2ScanRelationOnStartInputDatasetBuilder builder = + new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, factory); + + @Test + void testIsDefinedAt() { + Assertions.assertThat(builder) + .returns(true, b -> b.isDefinedAt(mock(SparkListenerSQLExecutionStart.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerSQLExecutionEnd.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerApplicationStart.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerApplicationEnd.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerStageSubmitted.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerStageCompleted.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerTaskStart.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerTaskEnd.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerJobStart.class))) + .returns(false, b -> b.isDefinedAt(mock(SparkListenerJobEnd.class))); + } + + @Test + void testIsDefinedAtLogicalPlan() { + Assertions.assertThat(builder) + .returns(false, b -> b.isDefinedAtLogicalPlan(mock(LogicalPlan.class))) + .returns(true, b -> b.isDefinedAtLogicalPlan(mock(DataSourceV2ScanRelation.class))); + } + + @Test + void testApply() { + OpenLineage.DatasetFacetsBuilder datasetFacetsBuilder = + mock(OpenLineage.DatasetFacetsBuilder.class); + List datasets = mock(List.class); + DataSourceV2ScanRelation scanRelation = mock(DataSourceV2ScanRelation.class); + DataSourceV2Relation relation = mock(DataSourceV2Relation.class); + + when(openLineage.newDatasetFacetsBuilder()).thenReturn(datasetFacetsBuilder); + when(context.getOpenLineage()).thenReturn(openLineage); + when(scanRelation.relation()).thenReturn(relation); + + try (MockedStatic ignored = + mockStatic(DataSourceV2RelationDatasetExtractor.class)) { + try (MockedStatic facetUtilsMockedStatic = + mockStatic(DatasetVersionDatasetFacetUtils.class)) { + when(DataSourceV2RelationDatasetExtractor.extract( + factory, context, relation, datasetFacetsBuilder)) + .thenReturn(datasets); + + Assertions.assertThat(builder.apply(scanRelation)).isEqualTo(datasets); + + facetUtilsMockedStatic.verify( + () -> + DatasetVersionDatasetFacetUtils.includeDatasetVersion( + context, datasetFacetsBuilder, relation), + times(1)); + } + } + } +}