Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extensions/common/azure/azure-blob-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ plugins {
}

dependencies {
api(libs.edc.spi.core)
api(libs.edc.controlplane.spi)
implementation(libs.azure.storageblob)
implementation(libs.azure.identity)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.SecretToken;

@JsonTypeName("dataspaceconnector:azuretoken")
public class AzureSasToken implements SecretToken {
public class AzureSasToken {
private final String sas;
private final long expiration;

Expand All @@ -32,8 +31,4 @@ public String getSas() {
return sas;
}

@Override
public long getExpiration() {
return expiration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.edc.azure.blob.validator;


import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.util.string.StringUtils;

import java.util.regex.Pattern;
Expand Down Expand Up @@ -135,9 +136,9 @@ public static void validateMetadata(String metadata) {
* @param keyName A string representing blob key secret.
* @throws IllegalArgumentException if the string is null or blank.
*/
public static void validateKeyName(String keyName) {
if (StringUtils.isNullOrBlank(keyName)) {
throw new IllegalArgumentException(String.format(INVALID_RESOURCE_NAME, KEY_NAME));
public static void validateKeyNameOrSecret(DataAddress keyName) {
if (StringUtils.isNullOrBlank(keyName.getKeyName()) && StringUtils.isNullOrBlank(keyName.getStringProperty(DataAddress.EDC_DATA_ADDRESS_SECRET))) {
throw new IllegalArgumentException(String.format(INVALID_RESOURCE_NAME, KEY_NAME + " or " + DataAddress.EDC_DATA_ADDRESS_SECRET));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package org.eclipse.edc.azure.blob.validator;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
Expand Down Expand Up @@ -64,18 +63,6 @@ void validateContainerName_fail(String input) {
.isThrownBy(() -> AzureStorageValidator.validateContainerName(input));
}

@ParameterizedTest
@NullAndEmptySource
void validateKeyName_fail(String input) {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> AzureStorageValidator.validateKeyName(input));
}

@Test
void validateKeyName_success() {
AzureStorageValidator.validateKeyName("test random key name");
}

@ParameterizedTest
@ArgumentsSource(ValidBlobNameProvider.class)
void validateBlobName_success(String input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.eclipse.edc.azure.testfixtures.annotations.PostgresCosmosTest;
import org.eclipse.edc.connector.controlplane.store.sql.transferprocess.store.SqlTransferProcessStore;
import org.eclipse.edc.connector.controlplane.store.sql.transferprocess.store.schema.postgres.PostgresDialectStatements;
import org.eclipse.edc.connector.controlplane.transfer.spi.testfixtures.store.TestFunctions;
import org.eclipse.edc.connector.controlplane.transfer.spi.testfixtures.store.TransferProcessStoreTestBase;
import org.eclipse.edc.json.JacksonTypeManager;
import org.eclipse.edc.policy.model.PolicyRegistrationTypes;
Expand Down Expand Up @@ -67,7 +66,6 @@ static void dropTables(CosmosPostgresTestExtension.SqlHelper helper) {
void setUp(DataSourceRegistry reg, PostgresqlStoreSetupExtension extension, TransactionContext transactionContext, QueryExecutor queryExecutor, CosmosPostgresTestExtension.SqlHelper helper, DataSource datasource) {

var typeManager = new JacksonTypeManager();
typeManager.registerTypes(TestFunctions.TestResourceDef.class, TestFunctions.TestProvisionedResource.class);
typeManager.registerTypes(PolicyRegistrationTypes.TYPES.toArray(Class<?>[]::new));

var leaseContextBuilder = SqlLeaseContextBuilderImpl.with(extension.getTransactionContext(), CONNECTOR_NAME, STATEMENTS.getTransferProcessTableName(), LEASE_STATEMENTS, clock, queryExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import static org.eclipse.edc.azure.blob.validator.AzureStorageValidator.validateAccountName;
import static org.eclipse.edc.azure.blob.validator.AzureStorageValidator.validateBlobName;
import static org.eclipse.edc.azure.blob.validator.AzureStorageValidator.validateContainerName;
import static org.eclipse.edc.azure.blob.validator.AzureStorageValidator.validateKeyName;
import static org.eclipse.edc.azure.blob.validator.AzureStorageValidator.validateKeyNameOrSecret;

/**
* Validator for {@link AzureDataFactoryTransferService}.
Expand Down Expand Up @@ -63,6 +63,6 @@ private void validateDestination(DataAddress destination) {
private void validateCommon(DataAddress dataAddress) {
validateAccountName(dataAddress.getStringProperty(AzureBlobStoreSchema.ACCOUNT_NAME));
validateContainerName(dataAddress.getStringProperty(AzureBlobStoreSchema.CONTAINER_NAME));
validateKeyName(dataAddress.getKeyName());
validateKeyNameOrSecret(dataAddress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.concurrent.ExecutorService;

Expand All @@ -38,7 +40,7 @@
import static org.eclipse.edc.azure.blob.AzureBlobStoreSchema.CONTAINER_NAME;
import static org.eclipse.edc.azure.blob.validator.AzureStorageValidator.validateAccountName;
import static org.eclipse.edc.azure.blob.validator.AzureStorageValidator.validateContainerName;
import static org.eclipse.edc.azure.blob.validator.AzureStorageValidator.validateKeyName;
import static org.eclipse.edc.azure.blob.validator.AzureStorageValidator.validateKeyNameOrSecret;

/**
* Instantiates {@link AzureStorageDataSink}s for requests whose source data type is {@link AzureBlobStoreSchema#TYPE}.
Expand Down Expand Up @@ -78,15 +80,11 @@ public DataSink createSink(DataFlowStartMessage request) {
}

var dataAddress = request.getDestinationDataAddress();
var requestId = request.getId();

var participantContext = singleParticipantContextSupplier.get()
.orElseThrow(f -> new EdcException("Failed to obtain participant context for data sink creation"));

var secret = vault.resolveSecret(participantContext.getParticipantContextId(), dataAddress.getKeyName());
var secret = getSecret(dataAddress);

if (secret == null) {
throw new EdcException("SAS token for the Azure Blob DataSink not found in Vault (alias = '%s')".formatted(dataAddress.getKeyName()));
throw new EdcException("SAS token for the Azure Blob DataSink not found neither in DataAddresss (property = '%s') nor Vault (alias = '%s')"
.formatted(DataAddress.EDC_DATA_ADDRESS_SECRET, dataAddress.getKeyName()));
}

var token = typeManager.readValue(secret, AzureSasToken.class);
Expand All @@ -99,7 +97,7 @@ public DataSink createSink(DataFlowStartMessage request) {
.containerName(dataAddress.getStringProperty(AzureBlobStoreSchema.CONTAINER_NAME))
.destinationBlobName(destinationBlobName)
.sharedAccessSignature(token.getSas())
.requestId(requestId)
.requestId(request.getId())
.partitionSize(partitionSize)
.blobStoreApi(blobStoreApi)
.executorService(executorService)
Expand All @@ -117,7 +115,7 @@ public DataSink createSink(DataFlowStartMessage request) {
try {
validateAccountName(dataAddress.getStringProperty(ACCOUNT_NAME));
validateContainerName(dataAddress.getStringProperty(CONTAINER_NAME));
validateKeyName(dataAddress.getKeyName());
validateKeyNameOrSecret(dataAddress);
if (dataSourceAddress.hasProperty(BLOB_PREFIX)) {
if (dataSourceAddress.hasProperty(BLOB_NAME)) {
monitor.warning("Folder transfer (property '%s' is present), will ignore the blob name (property '%s')".formatted(BLOB_PREFIX, BLOB_NAME));
Expand All @@ -128,4 +126,21 @@ public DataSink createSink(DataFlowStartMessage request) {
}
return Result.success();
}

private @Nullable String getSecret(DataAddress dataAddress) {
var addressSecret = dataAddress.getStringProperty(DataAddress.EDC_DATA_ADDRESS_SECRET);
if (addressSecret != null) {
return addressSecret;
}

var keyName = dataAddress.getKeyName();
if (keyName == null) {
return null;
}

var participantContext = singleParticipantContextSupplier.get()
.orElseThrow(f -> new EdcException("Failed to obtain participant context for data sink creation"));

return vault.resolveSecret(participantContext.getParticipantContextId(), keyName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.jetbrains.annotations.NotNull;

import java.util.Optional;

import static org.eclipse.edc.azure.blob.AzureBlobStoreSchema.ACCOUNT_NAME;
import static org.eclipse.edc.azure.blob.AzureBlobStoreSchema.BLOB_NAME;
import static org.eclipse.edc.azure.blob.AzureBlobStoreSchema.BLOB_PREFIX;
Expand Down Expand Up @@ -72,12 +74,12 @@ public DataSource createSource(DataFlowStartMessage request) {
.retryPolicy(retryPolicy)
.monitor(monitor);

if (null != dataAddress.getKeyName() && !dataAddress.getKeyName().isEmpty()) {
monitor.debug("Attempting to use shared key authentication for Azure Storage data source");
builder.sharedKey(vault.resolveSecret(dataAddress.getKeyName()));
} else {
monitor.debug("Attempting to use default identity for Azure Storage data source");
}
Optional.ofNullable(dataAddress.getKeyName())
.filter(it -> !it.isBlank())
.ifPresentOrElse(keyName -> {
monitor.debug("Attempting to use shared key authentication for Azure Storage data source");
builder.sharedKey(vault.resolveSecret(dataAddress.getKeyName()));
}, () -> monitor.debug("Attempting to use default identity for Azure Storage data source"));

return builder.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@
import static org.eclipse.edc.azure.blob.testfixtures.AzureStorageTestFixtures.createBlobPrefix;
import static org.eclipse.edc.azure.blob.testfixtures.AzureStorageTestFixtures.createContainerName;
import static org.eclipse.edc.azure.blob.testfixtures.AzureStorageTestFixtures.createRequest;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class AzureStorageDataSinkFactoryTest {
private final BlobStoreApi blobStoreApi = mock();
Expand All @@ -63,7 +61,6 @@ class AzureStorageDataSinkFactoryTest {
private final String accountName = createAccountName();
private final String containerName = createContainerName();


private final String blobPrefix = createBlobPrefix();
private final String keyName = "test-keyname";
private final AzureSasToken token = new AzureSasToken("test-writeonly-sas", new Random().nextLong());
Expand Down Expand Up @@ -123,10 +120,10 @@ void validate_whenMissingKeyName_fails() {

@Test
void createSink_whenValidRequest_succeeds() {
when(vault.resolveSecret(participantContext.getParticipantContextId(), keyName)).thenReturn(typeManager.writeValueAsString(token));
var validRequest = request.destinationDataAddress(dataAddress
.property(AzureBlobStoreSchema.ACCOUNT_NAME, accountName)
.property(AzureBlobStoreSchema.CONTAINER_NAME, containerName)
.property(DataAddress.EDC_DATA_ADDRESS_SECRET, typeManager.writeValueAsString(token))
.keyName(keyName)
.build());
assertThat(factory.createSink(validRequest.build())).isNotNull();
Expand All @@ -141,14 +138,13 @@ void createSink_whenInvalidRequest_fails() {

@Test
void createSink_whenSecretNotFoundRequest_fails() {
when(vault.resolveSecret(anyString())).thenReturn(null);
var validRequest = request.destinationDataAddress(dataAddress
.property(AzureBlobStoreSchema.ACCOUNT_NAME, accountName)
.property(AzureBlobStoreSchema.CONTAINER_NAME, containerName)
.keyName(keyName)
.build());
assertThatThrownBy(() -> factory.createSink(validRequest.build()))
.isInstanceOf(EdcException.class)
.hasMessageStartingWith("SAS token for the Azure Blob DataSink not found in Vault");
.hasMessageStartingWith("SAS token for the Azure Blob DataSink not found");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import dev.failsafe.RetryPolicy;
import org.eclipse.edc.azure.blob.AzureSasToken;
import org.eclipse.edc.azure.blob.api.BlobStoreApi;
import org.eclipse.edc.connector.controlplane.transfer.spi.provision.Provisioner;
import org.eclipse.edc.connector.dataplane.provision.azure.blob.ObjectStorageConsumerProvisionResourceGenerator;
import org.eclipse.edc.connector.dataplane.provision.azure.blob.ObjectStorageDeprovisioner;
import org.eclipse.edc.connector.dataplane.provision.azure.blob.ObjectStorageProvisioner;
Expand All @@ -27,13 +26,12 @@
import org.eclipse.edc.runtime.metamodel.annotation.Configuration;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.security.Vault;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;

/**
* Provides data transfer {@link Provisioner}s backed by Azure services.
* Provides data transfer provisioners backed by Azure services.
*/
public class AzureProvisionExtension implements ServiceExtension {

Expand All @@ -52,9 +50,6 @@ public class AzureProvisionExtension implements ServiceExtension {
@Inject
private TypeManager typeManager;

@Inject
private Vault vault;

@Inject
private ProvisionerManager provisionManager;

Expand All @@ -71,7 +66,7 @@ public String name() {

@Override
public void initialize(ServiceExtensionContext context) {
provisionManager.register(new ObjectStorageProvisioner(retryPolicy, context.getMonitor(), blobStoreApi, azureProvisionConfiguration, vault, typeManager, participantContextSupplier));
provisionManager.register(new ObjectStorageProvisioner(retryPolicy, context.getMonitor(), blobStoreApi, azureProvisionConfiguration, typeManager, participantContextSupplier));
provisionManager.register(new ObjectStorageDeprovisioner());
manifestGenerator.registerConsumerGenerator(new ObjectStorageConsumerProvisionResourceGenerator(monitor.withPrefix("AzureStorageProvisioner")));

Expand Down
Loading