Skip to content

Commit

Permalink
Add the ability to intercept File IO by specifying a wrapper class (#155
Browse files Browse the repository at this point in the history
)

* Add FileIOWrapper to allow intercepting File IO

* Clean up issues after squash

* Clean up after rebase

* Update copyrights, other minor fixes from rebase

* Apply spotless
  • Loading branch information
andrew4699 authored Aug 29, 2024
1 parent 5354333 commit 07c8444
Show file tree
Hide file tree
Showing 17 changed files with 451 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.core.monitor;

/** Allows setting a configured instance of {@link PolarisMetricRegistry} */
public interface MetricRegistryAware {
void setMetricRegistry(PolarisMetricRegistry metricRegistry);
}
2 changes: 2 additions & 0 deletions polaris-server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 91,8 @@ metaStoreManager:
# type: eclipse-link # uncomment to use eclipse-link as metastore
# persistence-unit: polaris

io:
factoryType: default

# TODO - avoid duplicating token broker config
oauth2:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 74,15 @@
import org.apache.polaris.core.auth.PolarisAuthorizer;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.monitor.MetricRegistryAware;
import org.apache.polaris.core.monitor.PolarisMetricRegistry;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.service.admin.PolarisServiceImpl;
import org.apache.polaris.service.admin.api.PolarisCatalogsApi;
import org.apache.polaris.service.admin.api.PolarisPrincipalRolesApi;
import org.apache.polaris.service.admin.api.PolarisPrincipalsApi;
import org.apache.polaris.service.auth.DiscoverableAuthenticator;
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.catalog.IcebergCatalogAdapter;
import org.apache.polaris.service.catalog.api.IcebergRestCatalogApi;
import org.apache.polaris.service.catalog.api.IcebergRestConfigurationApi;
Expand Down Expand Up @@ -185,10 187,22 @@ public void run(PolarisApplicationConfig configuration, Environment environment)
"realmContext", new ContextResolverFilter(realmContextResolver, callContextResolver))
.addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true, "/*");

FileIOFactory fileIOFactory = configuration.getFileIOFactory();
if (fileIOFactory instanceof MetricRegistryAware mrAware) {
mrAware.setMetricRegistry(polarisMetricRegistry);
}
if (fileIOFactory instanceof OpenTelemetryAware otAware) {
otAware.setOpenTelemetry(openTelemetry);
}
if (fileIOFactory instanceof ConfigurationStoreAware csAware) {
csAware.setConfigurationStore(configurationStore);
}

TaskHandlerConfiguration taskConfig = configuration.getTaskHandler();
TaskExecutorImpl taskExecutor =
new TaskExecutorImpl(taskConfig.executorService(), metaStoreManagerFactory);
TaskFileIOSupplier fileIOSupplier = new TaskFileIOSupplier(metaStoreManagerFactory);
TaskFileIOSupplier fileIOSupplier =
new TaskFileIOSupplier(metaStoreManagerFactory, fileIOFactory);
taskExecutor.addTaskHandler(
new TableCleanupTaskHandler(taskExecutor, metaStoreManagerFactory, fileIOSupplier));
taskExecutor.addTaskHandler(
Expand All @@ -199,7 213,7 @@ public void run(PolarisApplicationConfig configuration, Environment environment)
"Initializing PolarisCallContextCatalogFactory for metaStoreManagerType {}",
metaStoreManagerFactory);
CallContextCatalogFactory catalogFactory =
new PolarisCallContextCatalogFactory(entityManagerFactory, taskExecutor);
new PolarisCallContextCatalogFactory(entityManagerFactory, taskExecutor, fileIOFactory);

PolarisAuthorizer authorizer = new PolarisAuthorizer(configurationStore);
IcebergCatalogAdapter catalogAdapter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 42,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
Expand Down Expand Up @@ -175,6 173,7 @@ public class BasePolarisCatalog extends BaseMetastoreViewCatalog
private CloseableGroup closeableGroup;
private Map<String, String> catalogProperties;
private Map<String, String> tableDefaultProperties;
private final FileIOFactory fileIOFactory;

/**
* @param entityManager provides handle to underlying PolarisMetaStoreManager with which to
Expand All @@ -189,7 188,8 @@ public BasePolarisCatalog(
CallContext callContext,
PolarisResolutionManifestCatalogView resolvedEntityView,
AuthenticatedPolarisPrincipal authenticatedPrincipal,
TaskExecutor taskExecutor) {
TaskExecutor taskExecutor,
FileIOFactory fileIOFactory) {
this.entityManager = entityManager;
this.callContext = callContext;
this.resolvedEntityView = resolvedEntityView;
Expand All @@ -199,6 199,7 @@ public BasePolarisCatalog(
this.taskExecutor = taskExecutor;
this.catalogId = catalogEntity.getId();
this.catalogName = catalogEntity.getName();
this.fileIOFactory = fileIOFactory;
}

@Override
Expand Down Expand Up @@ -1967,8 1968,7 @@ private FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
Map<String, String> propertiesWithS3CustomizedClientFactory = new HashMap<>(properties);
propertiesWithS3CustomizedClientFactory.put(
S3FileIOProperties.CLIENT_FACTORY, PolarisS3FileIOClientFactory.class.getName());
return CatalogUtil.loadFileIO(
ioImpl, propertiesWithS3CustomizedClientFactory, new Configuration());
return fileIOFactory.loadFileIO(ioImpl, propertiesWithS3CustomizedClientFactory);
}

private void blockedUserSpecifiedWriteLocation(Map<String, String> properties) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog;

import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.io.FileIO;

/** A simple FileIOFactory implementation that defers all the work to the Iceberg SDK */
@JsonTypeName("default")
public class DefaultFileIOFactory implements FileIOFactory {
@Override
public FileIO loadFileIO(String impl, Map<String, String> properties) {
return CatalogUtil.loadFileIO(impl, properties, new Configuration());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.service.catalog;

import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.dropwizard.jackson.Discoverable;
import java.util.Map;
import org.apache.iceberg.io.FileIO;

/** Interface for providing a way to construct FileIO objects, such as for reading/writing S3. */
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "factoryType")
public interface FileIOFactory extends Discoverable {
FileIO loadFileIO(String impl, Map<String, String> properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 28,7 @@
import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.service.auth.DiscoverableAuthenticator;
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.context.CallContextResolver;
import org.apache.polaris.service.context.RealmContextResolver;
import org.slf4j.LoggerFactory;
Expand All @@ -52,6 53,7 @@ public class PolarisApplicationConfig extends Configuration {
private List<String> defaultRealms;
private String awsAccessKey;
private String awsSecretKey;
private FileIOFactory fileIOFactory;

@JsonProperty("metaStoreManager")
public void setMetaStoreManagerFactory(MetaStoreManagerFactory metaStoreManagerFactory) {
Expand All @@ -63,6 65,16 @@ public MetaStoreManagerFactory getMetaStoreManagerFactory() {
return metaStoreManagerFactory;
}

@JsonProperty("io")
public void setFileIOFactory(FileIOFactory fileIOFactory) {
this.fileIOFactory = fileIOFactory;
}

@JsonProperty("io")
public FileIOFactory getFileIOFactory() {
return fileIOFactory;
}

@JsonProperty("authenticator")
public void setPolarisAuthenticator(
DiscoverableAuthenticator<String, AuthenticatedPolarisPrincipal> polarisAuthenticator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 31,7 @@
import org.apache.polaris.core.persistence.PolarisEntityManager;
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.service.catalog.BasePolarisCatalog;
import org.apache.polaris.service.catalog.FileIOFactory;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
import org.apache.polaris.service.task.TaskExecutor;
import org.slf4j.Logger;
Expand All @@ -45,11 46,15 @@ public class PolarisCallContextCatalogFactory implements CallContextCatalogFacto

private final RealmEntityManagerFactory entityManagerFactory;
private final TaskExecutor taskExecutor;
private final FileIOFactory fileIOFactory;

public PolarisCallContextCatalogFactory(
RealmEntityManagerFactory entityManagerFactory, TaskExecutor taskExecutor) {
RealmEntityManagerFactory entityManagerFactory,
TaskExecutor taskExecutor,
FileIOFactory fileIOFactory) {
this.entityManagerFactory = entityManagerFactory;
this.taskExecutor = taskExecutor;
this.fileIOFactory = fileIOFactory;
}

@Override
Expand All @@ -70,7 75,12 @@ public Catalog createCallContextCatalog(

BasePolarisCatalog catalogInstance =
new BasePolarisCatalog(
entityManager, context, resolvedManifest, authenticatedPrincipal, taskExecutor);
entityManager,
context,
resolvedManifest,
authenticatedPrincipal,
taskExecutor,
fileIOFactory);

context.contextVariables().put(CallContext.REQUEST_PATH_CATALOG_INSTANCE_KEY, catalogInstance);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 22,23 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.PolarisTaskConstants;
import org.apache.polaris.core.entity.TaskEntity;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.service.catalog.FileIOFactory;

public class TaskFileIOSupplier implements Function<TaskEntity, FileIO> {
private final MetaStoreManagerFactory metaStoreManagerFactory;
private final FileIOFactory fileIOFactory;

public TaskFileIOSupplier(MetaStoreManagerFactory metaStoreManagerFactory) {
public TaskFileIOSupplier(
MetaStoreManagerFactory metaStoreManagerFactory, FileIOFactory fileIOFactory) {
this.metaStoreManagerFactory = metaStoreManagerFactory;
this.fileIOFactory = fileIOFactory;
}

@Override
Expand All @@ -60,6 62,6 @@ public FileIO apply(TaskEntity task) {
String ioImpl =
properties.getOrDefault(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.io.ResolvingFileIO");
return CatalogUtil.loadFileIO(ioImpl, properties, new Configuration());
return fileIOFactory.loadFileIO(ioImpl, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 22,5 @@ org.apache.polaris.core.persistence.MetaStoreManagerFactory
org.apache.polaris.service.config.OAuth2ApiService
org.apache.polaris.service.context.RealmContextResolver
org.apache.polaris.service.context.CallContextResolver
org.apache.polaris.service.auth.TokenBrokerFactory
org.apache.polaris.service.auth.TokenBrokerFactory
org.apache.polaris.service.catalog.FileIOFactory
Original file line number Diff line number Diff line change
@@ -0,0 1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

org.apache.polaris.service.catalog.DefaultFileIOFactory
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 61,7 @@
import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
import org.apache.polaris.core.storage.cache.StorageCredentialCache;
import org.apache.polaris.service.catalog.BasePolarisCatalog;
import org.apache.polaris.service.catalog.DefaultFileIOFactory;
import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
import org.apache.polaris.service.config.DefaultConfigurationStore;
import org.apache.polaris.service.config.RealmEntityManagerFactory;
Expand Down Expand Up @@ -353,7 354,12 @@ private void initBaseCatalog() {
callContext, entityManager, authenticatedRoot, CATALOG_NAME);
this.baseCatalog =
new BasePolarisCatalog(
entityManager, callContext, passthroughView, authenticatedRoot, Mockito.mock());
entityManager,
callContext,
passthroughView,
authenticatedRoot,
Mockito.mock(),
new DefaultFileIOFactory());
this.baseCatalog.initialize(
CATALOG_NAME,
ImmutableMap.of(
Expand All @@ -369,7 375,8 @@ public PolarisEntityManager getOrCreateEntityManager(RealmContext realmContext)
return entityManager;
}
},
Mockito.mock());
Mockito.mock(),
new DefaultFileIOFactory());
}

@Override
Expand Down
Loading

0 comments on commit 07c8444

Please sign in to comment.