Skip to content

Commit

Permalink
Split the DataSourceV2ScanRelationInputDatasetBuilder into two classe…
Browse files Browse the repository at this point in the history
…s, focused on different Spark lifecycle events

These are:

1. DataSourceV2ScanRelationOnEndInputDatasetBuilder
2. DataSourceV2ScanRelationOnStartInputDatasetBuilder

Also refactored the tests

Signed-off-by: Damien Hawes <[email protected]>
  • Loading branch information
d-m-h committed Aug 20, 2024
1 parent a95734b commit 891d5fd
Show file tree
Hide file tree
Showing 15 changed files with 306 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 21,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.CreateReplaceDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -56,7 57,8 @@ public Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> getIn
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new SubqueryAliasInputDatasetBuilder(context))
.add(new MergeIntoCommandEdgeInputDatasetBuilder(context))
.add(new DataSourceV2RelationInputDatasetBuilder(context, datasetFactory));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 18,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.AppendDataDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -55,7 56,8 @@ public Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> getIn
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new SubqueryAliasInputDatasetBuilder(context))
.add(new CreateReplaceInputDatasetBuilder(context))
.add(new MergeIntoCommandEdgeInputDatasetBuilder(context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 18,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.AppendDataDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -54,7 55,8 @@ public Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> getIn
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new SubqueryAliasInputDatasetBuilder(context))
.add(new CreateReplaceInputDatasetBuilder(context))
.add(new SparkExtensionV1InputDatasetBuilder(context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 19,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.CreateReplaceDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -49,7 50,8 @@ public Collection<PartialFunction<Object, List<OpenLineage.InputDataset>>> getIn
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2RelationInputDatasetBuilder(context, datasetFactory))
.add(new MergeIntoCommandEdgeInputDatasetBuilder(context))
.add(new SubqueryAliasInputDatasetBuilder(context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 18,8 @@
import io.openlineage.spark3.agent.lifecycle.plan.AppendDataDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2RelationOutputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnEndInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.DataSourceV2ScanRelationOnStartInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.InMemoryRelationInputDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.LogicalRelationDatasetBuilder;
import io.openlineage.spark3.agent.lifecycle.plan.MergeIntoCommandEdgeInputDatasetBuilder;
Expand Down Expand Up @@ -55,7 56,8 @@ public Collection<PartialFunction<Object, List<InputDataset>>> getInputBuilders(
.add(new LogicalRelationDatasetBuilder(context, datasetFactory, true))
.add(new InMemoryRelationInputDatasetBuilder(context))
.add(new CommandPlanVisitor(context))
.add(new DataSourceV2ScanRelationInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnStartInputDatasetBuilder(context, datasetFactory))
.add(new DataSourceV2ScanRelationOnEndInputDatasetBuilder(context, datasetFactory))
.add(new SubqueryAliasInputDatasetBuilder(context))
.add(new CreateReplaceInputDatasetBuilder(context))
.add(new SparkExtensionV1InputDatasetBuilder(context))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 111,7 @@ void beforeEach() throws IOException {
@AfterEach
void afterEach() throws IOException {
server.close();
server = null;
server = null; // NOPMD
}

@AfterAll
Expand Down Expand Up @@ -247,7 247,7 @@ void readIcebergMetadataJsonOutsideConfiguredCatalog() {
}

@Test
void readIcebergMetadataJsonWithoutAConfiguredIcebergCatalog() throws IOException {
void readIcebergMetadataJsonWithoutAConfiguredIcebergCatalog() {
final String testName = "read_iceberg_metadata_json_without_a_configured_iceberg_catalog";
Map<String, String> props = new TreeMap<>(Comparator.naturalOrder());
props.put("spark.app.name", testName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 192,7 @@ void testKafkaSourceToKafkaSink() throws TimeoutException, StreamingQueryExcepti
Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> spark.sparkContext().isStopped());

List<RunEvent> events =
handler.eventMap().getOrDefault("test_kafka_source_to_kafka_sink", new ArrayList<>());
handler.getEventsMap().getOrDefault("test_kafka_source_to_kafka_sink", new ArrayList<>());

List<RunEvent> sqlEvents =
events.stream()
Expand Down Expand Up @@ -281,7 281,7 @@ void testKafkaSourceToBatchSink() throws TimeoutException, StreamingQueryExcepti
.start();

streamingQuery.awaitTermination(Duration.ofSeconds(20).toMillis());
List<RunEvent> events = handler.eventMap().get("test_kafka_source_to_batch_sink");
List<RunEvent> events = handler.getEventsMap().get("test_kafka_source_to_batch_sink");

assertThat(events).isNotEmpty();

Expand Down Expand Up @@ -357,7 357,7 @@ void testKafkaSourceToJdbcBatchSink()

streamingQuery.awaitTermination(Duration.ofSeconds(20).toMillis());

List<RunEvent> events = handler.eventMap().get("test_kafka_source_to_jdbc_batch_sink");
List<RunEvent> events = handler.getEventsMap().get("test_kafka_source_to_jdbc_batch_sink");

assertTrue(events.size() > 1);

Expand Down Expand Up @@ -418,7 418,9 @@ void testKafkaClusterResolveNamespace()
.awaitTermination(Duration.ofSeconds(10).toMillis());

List<RunEvent> events =
handler.eventMap().getOrDefault("test_kafka_cluster_resolve_namespace", new ArrayList<>());
handler
.getEventsMap()
.getOrDefault("test_kafka_cluster_resolve_namespace", new ArrayList<>());

assertTrue(events.stream().anyMatch(x -> !x.getInputs().isEmpty()));

Expand Down Expand Up @@ -456,7 458,8 @@ void readFromCsvFilesInAStreamingMode()
.start()
.awaitTermination(Duration.ofSeconds(10).toMillis());

List<RunEvent> events = handler.eventMap().get("test_read_from_csv_files_in_a_streaming_mode");
List<RunEvent> events =
handler.getEventsMap().get("test_read_from_csv_files_in_a_streaming_mode");

List<RunEvent> csvInputEventsUsingStreaming =
events.stream().filter(x -> !x.getInputs().isEmpty()).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 21,12 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;

@Getter
public final class OpenLineageHttpHandler implements HttpHandler {
private final List<String> eventsContainer = new ArrayList<>();
private final Map<String, List<RunEvent>> events = new HashMap<>();
private final List<String> events = new ArrayList<>();
private final Map<String, List<RunEvent>> eventsMap = new HashMap<>();

@Override
public void handle(HttpExchange exchange) throws IOException {
Expand All @@ -33,7 35,7 @@ public void handle(HttpExchange exchange) throws IOException {
BufferedReader br = new BufferedReader(isr);
String value = br.readLine();

eventsContainer.add(value);
events.add(value);

RunEvent runEvent = OpenLineageClientUtils.runEventFromJson(value);
String jobName = runEvent.getJob().getName();
Expand All @@ -46,11 48,11 @@ public void handle(HttpExchange exchange) throws IOException {

String jobNameShortString = jobNameShort.get();

if (!events.containsKey(jobNameShortString)) {
events.put(jobNameShortString, new ArrayList<>());
if (!eventsMap.containsKey(jobNameShortString)) {
eventsMap.put(jobNameShortString, new ArrayList<>());
}

events.get(jobNameShortString).add(runEvent);
eventsMap.get(jobNameShortString).add(runEvent);

exchange.sendResponseHeaders(200, 0);
try (Writer writer =
Expand All @@ -59,16 61,8 @@ public void handle(HttpExchange exchange) throws IOException {
}
}

public List<String> events() {
return eventsContainer;
}

public Map<String, List<RunEvent>> eventMap() {
return events;
}

public void clear() {
eventsContainer.clear();
events.clear();
eventsMap.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 45,7 @@ public static StatefulHttpServer create(String path, OpenLineageHttpHandler hand
}

public List<String> events() {
return new ArrayList<>(handler.events());
return new ArrayList<>(handler.getEvents());
}

public String getHost() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 1,57 @@
/*
/* Copyright 2018-2024 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.spark3.agent.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineage.InputDataset;
import io.openlineage.spark.api.AbstractQueryPlanInputDatasetBuilder;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark3.agent.utils.DataSourceV2RelationDatasetExtractor;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;

@Slf4j
public final class DataSourceV2ScanRelationOnEndInputDatasetBuilder
extends AbstractQueryPlanInputDatasetBuilder<DataSourceV2ScanRelation> {
private final DatasetFactory<InputDataset> factory;

public DataSourceV2ScanRelationOnEndInputDatasetBuilder(
OpenLineageContext context, DatasetFactory<InputDataset> factory) {
super(context, true);
this.factory = Objects.requireNonNull(factory, "parameter: factory");
}

@Override
public boolean isDefinedAt(SparkListenerEvent event) {
return event instanceof SparkListenerSQLExecutionEnd;
}

@Override
protected boolean isDefinedAtLogicalPlan(LogicalPlan plan) {
return plan instanceof DataSourceV2ScanRelation;
}

@Override
public List<InputDataset> apply(DataSourceV2ScanRelation plan) {
DataSourceV2Relation relation = plan.relation();
OpenLineage.DatasetFacetsBuilder datasetFacetsBuilder =
context.getOpenLineage().newDatasetFacetsBuilder();
return DataSourceV2RelationDatasetExtractor.extract(
factory, context, relation, datasetFacetsBuilder);
}

@Override
public String toString() {
return this.getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 6,45 @@
package io.openlineage.spark3.agent.lifecycle.plan;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineage.InputDataset;
import io.openlineage.spark.api.AbstractQueryPlanInputDatasetBuilder;
import io.openlineage.spark.api.DatasetFactory;
import io.openlineage.spark.api.OpenLineageContext;
import io.openlineage.spark3.agent.utils.DataSourceV2RelationDatasetExtractor;
import io.openlineage.spark3.agent.utils.DatasetVersionDatasetFacetUtils;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;

@Slf4j
public class DataSourceV2ScanRelationInputDatasetBuilder
public final class DataSourceV2ScanRelationOnStartInputDatasetBuilder
extends AbstractQueryPlanInputDatasetBuilder<DataSourceV2ScanRelation> {
private final DatasetFactory<InputDataset> factory;

private final DatasetFactory<OpenLineage.InputDataset> factory;

public DataSourceV2ScanRelationInputDatasetBuilder(
OpenLineageContext context, DatasetFactory<OpenLineage.InputDataset> factory) {
public DataSourceV2ScanRelationOnStartInputDatasetBuilder(
OpenLineageContext context, DatasetFactory<InputDataset> factory) {
super(context, true);
this.factory = factory;
this.factory = Objects.requireNonNull(factory, "parameter: factory");
}

@Override
public boolean isDefinedAt(SparkListenerEvent event) {
/* (2024-08-15) I implemented this method because it deferred to super.isDefinedAt(SparkListenerEvent),
which only looks at SparkListenerJobStart and SparkListenerSQLExecutionStart. However, due to
https://github.com/OpenLineage/OpenLineage/issues/2935, openlineage-spark was not adding the actual
input dataset, and debugging the code eventually led me to this point. For some reason, the
SparkListenerSQLExecutionStart event didn't have the necessary details to create an ExecutionContext.
Therefore, this code was never evaluated for the SparkListenerSQLExecutionStart. I'm keeping the
SparkListenerSQLExecutionStart defined here, just in case there is a code path that uses it.*/
return event instanceof SparkListenerSQLExecutionStart
|| event instanceof SparkListenerSQLExecutionEnd;
return event instanceof SparkListenerSQLExecutionStart;
}

@Override
public boolean isDefinedAtLogicalPlan(LogicalPlan logicalPlan) {
return logicalPlan instanceof DataSourceV2ScanRelation;
protected boolean isDefinedAtLogicalPlan(LogicalPlan plan) {
return plan instanceof DataSourceV2ScanRelation;
}

@Override
public List<OpenLineage.InputDataset> apply(DataSourceV2ScanRelation scanRelation) {
DataSourceV2Relation relation = scanRelation.relation();
public List<InputDataset> apply(DataSourceV2ScanRelation plan) {
DataSourceV2Relation relation = plan.relation();
OpenLineage.DatasetFacetsBuilder datasetFacetsBuilder =
context.getOpenLineage().newDatasetFacetsBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 21,8 @@
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation;

@Slf4j
public class DatasetVersionDatasetFacetUtils {
public final class DatasetVersionDatasetFacetUtils {
private DatasetVersionDatasetFacetUtils() {}

private static final String DELTA = "delta";

Expand All @@ -31,13 32,13 @@ public class DatasetVersionDatasetFacetUtils {
public static Optional<String> extractVersionFromDataSourceV2Relation(
OpenLineageContext context, DataSourceV2Relation table) {
if (table.identifier().isEmpty()) {
log.warn("Couldn't find identifier for dataset in plan " table);
log.warn("Couldn't find identifier for dataset in plan {}", table);
return Optional.empty();
}
Identifier identifier = table.identifier().get();

if (table.catalog().isEmpty() || !(table.catalog().get() instanceof TableCatalog)) {
log.warn("Couldn't find catalog for dataset in plan " table);
log.warn("Couldn't find catalog for dataset in plan {}", table);
return Optional.empty();
}
TableCatalog tableCatalog = (TableCatalog) table.catalog().get();
Expand Down
Loading

0 comments on commit 891d5fd

Please sign in to comment.