Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
import static org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders.newThingPlaceholder;

import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.IntPredicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.service.EnforcementFactoryFactory;
import org.eclipse.ditto.connectivity.service.placeholders.SourceAddressPlaceholder;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.Enforcement;
Expand All @@ -41,6 +41,7 @@
import org.eclipse.ditto.connectivity.service.messaging.Resolvers;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.common.InvalidMqttQosCodeException;
import org.eclipse.ditto.connectivity.service.messaging.validation.AbstractProtocolValidator;
import org.eclipse.ditto.placeholders.Placeholder;
import org.eclipse.ditto.placeholders.PlaceholderFilter;

import com.hivemq.client.mqtt.datatypes.MqttQos;
Expand All @@ -60,7 +61,7 @@ public abstract class AbstractMqttValidator extends AbstractProtocolValidator {
protected static final Collection<String> SECURE_SCHEMES = List.of("ssl");

private static final String ERROR_DESCRIPTION =
"''{0}'' is not a valid value for MQTT enforcement. Valid values are: ''{1}''.";
"''{0}'' is not a valid value for MQTT enforcement. Valid placeholders are: ''{1}''.";

private final MqttConfig mqttConfig;

Expand All @@ -81,7 +82,18 @@ protected void validateSource(final Source source, final DittoHeaders dittoHeade
}

validateSourceQoS(qos.get(), dittoHeaders, sourceDescription);
validateSourceEnforcement(source.getEnforcement().orElse(null), dittoHeaders, sourceDescription);
validateSourceEnforcement(source.getEnforcement().orElse(null), dittoHeaders, sourceDescription,
getSourceEnforcementInputPlaceholders());
}

/**
* Returns the placeholders allowed for the enforcement input of MQTT sources.
* Subclasses may override to allow additional placeholders (e.g. header placeholders for MQTT 5).
*
* @return the allowed enforcement input placeholders.
*/
protected Placeholder<?>[] getSourceEnforcementInputPlaceholders() {
return new Placeholder<?>[]{newSourceAddressPlaceholder()};
}

@Override
Expand Down Expand Up @@ -116,25 +128,35 @@ public static MqttQos getHiveQoS(final int qos) {
}

protected static void validateSourceEnforcement(@Nullable final Enforcement enforcement,
final DittoHeaders dittoHeaders, final Supplier<String> sourceDescription) {
final DittoHeaders dittoHeaders, final Supplier<String> sourceDescription,
final Placeholder<?>... inputPlaceholders) {
if (enforcement != null) {

validateEnforcementInput(enforcement, sourceDescription, dittoHeaders);
validateEnforcementInput(enforcement, sourceDescription, dittoHeaders, inputPlaceholders);
validateEnforcementFilters(enforcement.getFilters(), sourceDescription, dittoHeaders);
}
}

protected static void validateEnforcementInput(final Enforcement enforcement,
final Supplier<String> sourceDescription, final DittoHeaders dittoHeaders) {
final SourceAddressPlaceholder sourceAddressPlaceholder = newSourceAddressPlaceholder();
private static void validateEnforcementInput(final Enforcement enforcement,
final Supplier<String> sourceDescription, final DittoHeaders dittoHeaders,
final Placeholder<?>... inputPlaceholders) {
try {
EnforcementFactoryFactory.newEnforcementFilterFactory(enforcement, sourceAddressPlaceholder)
.getFilter("dummyTopic");
PlaceholderFilter.validate(enforcement.getInput(), inputPlaceholders);
} catch (final DittoRuntimeException e) {
final String validPlaceholders = Arrays.stream(inputPlaceholders)
.map(p -> {
final List<String> names = p.getSupportedNames();
if (names.isEmpty()) {
return p.getPrefix() + ":<name>";
}
return names.stream()
.map(n -> p.getPrefix() + ":" + n)
.collect(Collectors.joining(", "));
})
.collect(Collectors.joining(", "));
throw invalidValueForConfig(enforcement.getInput(), "input", sourceDescription.get())
.cause(e)
.description(MessageFormat.format(ERROR_DESCRIPTION, enforcement.getInput(),
sourceAddressPlaceholder.getSupportedNames()))
validPlaceholders))
.dittoHeaders(dittoHeaders)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt;

import static org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders.newSourceAddressPlaceholder;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.config.MqttConfig;
import org.eclipse.ditto.placeholders.Placeholder;
import org.eclipse.ditto.placeholders.PlaceholderFactory;

import org.apache.pekko.actor.ActorSystem;

Expand Down Expand Up @@ -47,6 +51,11 @@ public ConnectionType type() {
return ConnectionType.MQTT_5;
}

@Override
protected Placeholder<?>[] getSourceEnforcementInputPlaceholders() {
return new Placeholder<?>[]{newSourceAddressPlaceholder(), PlaceholderFactory.newHeadersPlaceholder()};
}

@Override
public void validate(final Connection connection, final DittoHeaders dittoHeaders, final ActorSystem actorSystem,
final ConnectivityConfig connectivityConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,30 @@ public void testInvalidSourceEnforcementFilters() {
}

@Test
public void testInvalidEnforcementOrigin() {
final Source mqttSourceWithInvalidFilter =
public void testHeaderEnforcementInputIsInvalidForMqtt3() {
// header-based enforcement input is only supported for MQTT 5, not for MQTT 3.1.1
final Source mqttSourceWithHeaderEnforcement =
ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(AUTHORIZATION_CONTEXT)
.enforcement(newEnforcement("{{ header:device_id }}", "things/{{ thing:id }}/+"))
.enforcement(newEnforcement("{{ header:device_id }}", "{{ thing:id }}"))
.address("#")
.qos(1)
.build();

testInvalidSourceEnforcementFilters(mqttSourceWithInvalidFilter);
testInvalidSourceEnforcementFilters(mqttSourceWithHeaderEnforcement);
}

@Test
public void testInvalidEnforcementInputWithThingPlaceholder() {
final Source mqttSourceWithInvalidInput =
ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(AUTHORIZATION_CONTEXT)
.enforcement(newEnforcement("{{ thing:id }}", "{{ thing:id }}"))
.address("#")
.qos(1)
.build();

testInvalidSourceEnforcementFilters(mqttSourceWithInvalidInput);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,62 @@ public void testInvalidSourceTopicFilters() {
}

@Test
public void testInvalidEnforcementOrigin() {
public void testValidHeaderEnforcementInput() {
final Source mqttSourceWithHeaderEnforcement =
ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(AUTHORIZATION_CONTEXT)
.enforcement(newEnforcement("{{ header:device_id }}", "{{ thing:id }}"))
.address("#")
.qos(1)
.build();

final Source mqttSourceWithInvalidFilter =
final Connection connection = connectionWithSource(mqttSourceWithHeaderEnforcement);
Mqtt5Validator.newInstance(mqttConfig).validate(connection, DittoHeaders.empty(), actorSystem,
connectivityConfig);
}

@Test
public void testValidHeaderEnforcementInputWithArbitraryHeaderName() {
final Source mqttSourceWithHeaderEnforcement =
ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(AUTHORIZATION_CONTEXT)
.enforcement(newEnforcement("{{ header:device_id }}", "things/{{ thing:id }}/+"))
.enforcement(newEnforcement("{{ header:device }}", "{{ thing:id }}"))
.address("#")
.qos(1)
.build();

testInvalidSourceEnforcementFilters(mqttSourceWithInvalidFilter);
final Connection connection = connectionWithSource(mqttSourceWithHeaderEnforcement);
Mqtt5Validator.newInstance(mqttConfig).validate(connection, DittoHeaders.empty(), actorSystem,
connectivityConfig);
}

@Test
public void testValidSourceAddressEnforcementInput() {
final Source mqttSourceWithSourceAddressEnforcement =
ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(AUTHORIZATION_CONTEXT)
.enforcement(newSourceAddressEnforcement("things/{{ thing:id }}"))
.address("#")
.qos(1)
.build();

final Connection connection = connectionWithSource(mqttSourceWithSourceAddressEnforcement);
Mqtt5Validator.newInstance(mqttConfig).validate(connection, DittoHeaders.empty(), actorSystem,
connectivityConfig);
}

@Test
public void testInvalidEnforcementInputWithThingPlaceholder() {
final Source mqttSourceWithInvalidInput =
ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(AUTHORIZATION_CONTEXT)
.enforcement(newEnforcement("{{ thing:id }}", "{{ thing:id }}"))
.address("#")
.qos(1)
.build();

testInvalidSourceEnforcementFilters(mqttSourceWithInvalidInput);
}

@Test
public void testValidLastWill() {
Expand Down
Loading