From c357fd39181417291b34a42aaed580125eabad1e Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 16 Feb 2026 13:55:32 +0100 Subject: [PATCH 1/7] Rename ArchiverClient to ArchiverService --- .../configuration/AAChannelProcessor.java | 12 ++++++------ .../{ArchiverClient.java => ArchiverService.java} | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) rename src/main/java/org/phoebus/channelfinder/service/external/{ArchiverClient.java => ArchiverService.java} (99%) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index b0636c0c..12654b91 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -14,7 +14,7 @@ import org.apache.commons.lang3.StringUtils; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; -import org.phoebus.channelfinder.service.external.ArchiverClient; +import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.ChannelProcessorInfo; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; @@ -60,7 +60,7 @@ public class AAChannelProcessor implements ChannelProcessor { @Value("${aa.auto_pause:}") private List autoPauseOptions; - @Autowired private final ArchiverClient archiverClient = new ArchiverClient(); + @Autowired private final ArchiverService archiverService = new ArchiverService(); @Override public boolean enabled() { @@ -180,7 +180,7 @@ public long process(List channels) throws JsonProcessingException { Collectors.toMap(ArchivePVOptions::getPv, archivePVOptions -> archivePVOptions)); Map> archiveActionArchivePVMap = getArchiveActions(archivePVSList, archiverInfo); - count += archiverClient.configureAA(archiveActionArchivePVMap, archiverInfo.url()); + count += archiverService.configureAA(archiveActionArchivePVMap, archiverInfo.url()); } long finalCount = count; logger.log(Level.INFO, () -> String.format("Configured %s channels.", finalCount)); @@ -240,7 +240,7 @@ private Map> getArchiveActions( return result; } List> statuses = - archiverClient.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias()); + archiverService.getStatuses(archivePVS, archiverInfo.url(), archiverInfo.alias()); logger.log(Level.FINER, "Statuses {0}", statuses); statuses.forEach( archivePVStatusJsonMap -> { @@ -290,8 +290,8 @@ private Map getArchiversInfo(Map aaURLs) { // Empty archiver tagged continue; } - String version = archiverClient.getVersion(aa.getValue()); - List policies = archiverClient.getAAPolicies(aa.getValue()); + String version = archiverService.getVersion(aa.getValue()); + List policies = archiverService.getAAPolicies(aa.getValue()); result.put(aa.getKey(), new ArchiverInfo(aa.getKey(), aa.getValue(), version, policies)); } return result; diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverClient.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java similarity index 99% rename from src/main/java/org/phoebus/channelfinder/service/external/ArchiverClient.java rename to src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 7f15b703..821d1794 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverClient.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -26,8 +26,8 @@ import reactor.core.publisher.Mono; @Component -public class ArchiverClient { - private static final Logger logger = Logger.getLogger(ArchiverClient.class.getName()); +public class ArchiverService { + private static final Logger logger = Logger.getLogger(ArchiverService.class.getName()); private static final int STATUS_BATCH_SIZE = 100; // Limit comes from tomcat server maxHttpHeaderSize which by default is a header of size // 8k From a7d4d69689b6747ea73a6f1651d205e4becdaea0 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Tue, 10 Feb 2026 13:26:41 +0100 Subject: [PATCH 2/7] Remove unnecessary version call This was used to distinguish support for getting status via post or query Now (from an earlier merge) we explicitly configure it instead. --- .../configuration/AAChannelProcessor.java | 3 +-- .../service/external/ArchiverService.java | 26 ------------------ .../model/archiver/aa/ArchiverInfo.java | 2 +- .../processors/aa/AAChannelProcessorIT.java | 13 +-------- .../aa/AAChannelProcessorMultiArchiverIT.java | 27 ------------------- .../aa/AAChannelProcessorMultiIT.java | 12 --------- 6 files changed, 3 insertions(+), 80 deletions(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 12654b91..13790024 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -290,9 +290,8 @@ private Map getArchiversInfo(Map aaURLs) { // Empty archiver tagged continue; } - String version = archiverService.getVersion(aa.getValue()); List policies = archiverService.getAAPolicies(aa.getValue()); - result.put(aa.getKey(), new ArchiverInfo(aa.getKey(), aa.getValue(), version, policies)); + result.put(aa.getKey(), new ArchiverInfo(aa.getKey(), aa.getValue(), policies)); } return result; } diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 821d1794..8fb957b5 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -37,7 +37,6 @@ public class ArchiverService { private static final String MGMT_RESOURCE = "/mgmt/bpl"; private static final String POLICY_RESOURCE = MGMT_RESOURCE + "/getPolicyList"; private static final String PV_STATUS_RESOURCE = MGMT_RESOURCE + "/getPVStatus"; - private static final String ARCHIVER_VERSIONS_RESOURCE = MGMT_RESOURCE + "/getVersions"; private static final ObjectMapper objectMapper = new ObjectMapper(); @Value("${aa.timeout_seconds:15}") @@ -233,31 +232,6 @@ public List getAAPolicies(String aaURL) { } } - public String getVersion(String archiverURL) { - try { - String uriString = archiverURL + ARCHIVER_VERSIONS_RESOURCE; - String response = - client - .get() - .uri(URI.create(uriString)) - .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - Map versionMap = objectMapper.readValue(response, Map.class); - String[] mgmtVersion = versionMap.get("mgmt_version").split("Archiver Appliance Version "); - if (mgmtVersion.length > 1) { - return mgmtVersion[1]; - } - - } catch (Exception e) { - logger.log(Level.WARNING, "Could not get version from: " + archiverURL, e); - return ""; - } - return ""; - } - private Mono showError(String uriString, Throwable error) { logger.log( Level.WARNING, diff --git a/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverInfo.java b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverInfo.java index 4d43bdac..508d394b 100644 --- a/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverInfo.java +++ b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiverInfo.java @@ -2,4 +2,4 @@ import java.util.List; -public record ArchiverInfo(String alias, String url, String version, List policies) {} +public record ArchiverInfo(String alias, String url, List policies) {} diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java index afb48dcb..62b6206d 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java @@ -98,13 +98,6 @@ public static void paramableAAChannelProcessorTest( String archiverEndpoint, String submissionBody) throws JsonProcessingException, InterruptedException { - // Request to version - Map versions = Map.of("mgmt_version", "Archiver Appliance Version 1.1.0"); - mockArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(versions)) - .addHeader("Content-Type", "application/json")); - // Request to policies Map policyList = Map.of("policy", "description"); mockArchiverAppliance.enqueue( @@ -141,11 +134,7 @@ public static void paramableAAChannelProcessorTest( long count = aaChannelProcessor.process(channels); assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); - int expectedRequests = 1; - RecordedRequest requestVersion = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestVersion != null; - assertEquals("/mgmt/bpl/getVersions", requestVersion.getPath()); - + int expectedRequests = 0; expectedRequests += 1; RecordedRequest requestPolicy = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); assert requestPolicy != null; diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index 763ed5df..85dc1f2a 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -123,14 +123,6 @@ void testProcessMultiArchivers( int expectedProcessedChannels) throws JsonProcessingException, InterruptedException { - // Request to version - Map queryVersions = - Map.of("mgmt_version", "Archiver Appliance Version 1.1.0 Query Support"); - mockQueryArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(queryVersions)) - .addHeader("Content-Type", "application/json")); - // Request to policies Map policyList = Map.of("policy", "description"); mockQueryArchiverAppliance.enqueue( @@ -165,14 +157,6 @@ void testProcessMultiArchivers( } }); - // Request to query version - Map postVersions = - Map.of("mgmt_version", "Archiver Appliance Version 1.1.0 Post Support"); - mockPostArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(postVersions)) - .addHeader("Content-Type", "application/json")); - // Request to policies mockPostArchiverAppliance.enqueue( new MockResponse() @@ -205,12 +189,6 @@ void testProcessMultiArchivers( aaChannelProcessor.process(channels); AtomicInteger expectedQueryRequests = new AtomicInteger(1); - RecordedRequest requestQueryVersion = - mockQueryArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestQueryVersion != null; - assertEquals("/mgmt/bpl/getVersions", requestQueryVersion.getPath()); - - expectedQueryRequests.addAndGet(1); RecordedRequest requestQueryPolicy = mockQueryArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); assert requestQueryPolicy != null; @@ -224,11 +202,6 @@ void testProcessMultiArchivers( assertEquals("/mgmt/bpl/getPVStatus", requestQueryStatus.getRequestUrl().encodedPath()); AtomicInteger expectedPostRequests = new AtomicInteger(1); - RecordedRequest requestPostVersion = mockPostArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestPostVersion != null; - assertEquals("/mgmt/bpl/getVersions", requestPostVersion.getPath()); - - expectedPostRequests.addAndGet(1); RecordedRequest requestPostPolicy = mockPostArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); assert requestPostPolicy != null; assertEquals("/mgmt/bpl/getPolicyList", requestPostPolicy.getPath()); diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java index 1122d860..834d6945 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java @@ -119,13 +119,6 @@ void testProcessMulti( int expectedProcessedChannels) throws JsonProcessingException, InterruptedException { - // Request to version - Map versions = Map.of("mgmt_version", "Archiver Appliance Version 1.1.0"); - mockArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(versions)) - .addHeader("Content-Type", "application/json")); - // Request to policies Map policyList = Map.of("policy", "description"); mockArchiverAppliance.enqueue( @@ -164,11 +157,6 @@ void testProcessMulti( assertEquals(count, expectedProcessedChannels); AtomicInteger expectedRequests = new AtomicInteger(1); - RecordedRequest requestVersion = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestVersion != null; - assertEquals("/mgmt/bpl/getVersions", requestVersion.getPath()); - - expectedRequests.addAndGet(1); RecordedRequest requestPolicy = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); assert requestPolicy != null; assertEquals("/mgmt/bpl/getPolicyList", requestPolicy.getPath()); From 28c4f60d9c6320fc4df94a931fccc9c8d339e1f2 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 16 Feb 2026 14:01:45 +0100 Subject: [PATCH 3/7] Refactor ArchiverService to use RestClient Adds Validation checking of responses from ArchiveAction s Adds a test for ArchiverService with some example responses from a live archiver --- .../configuration/AAChannelProcessor.java | 2 +- .../exceptions/ArchiverServiceException.java | 12 + .../service/external/ArchiverService.java | 259 +++++++------ .../model/archiver/aa/ArchiveAction.java | 16 +- .../service/external/ArchiverServiceTest.java | 354 ++++++++++++++++++ 5 files changed, 520 insertions(+), 123 deletions(-) create mode 100644 src/main/java/org/phoebus/channelfinder/exceptions/ArchiverServiceException.java create mode 100644 src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index 13790024..e93d6ece 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -60,7 +60,7 @@ public class AAChannelProcessor implements ChannelProcessor { @Value("${aa.auto_pause:}") private List autoPauseOptions; - @Autowired private final ArchiverService archiverService = new ArchiverService(); + @Autowired private ArchiverService archiverService; @Override public boolean enabled() { diff --git a/src/main/java/org/phoebus/channelfinder/exceptions/ArchiverServiceException.java b/src/main/java/org/phoebus/channelfinder/exceptions/ArchiverServiceException.java new file mode 100644 index 00000000..00701530 --- /dev/null +++ b/src/main/java/org/phoebus/channelfinder/exceptions/ArchiverServiceException.java @@ -0,0 +1,12 @@ +package org.phoebus.channelfinder.exceptions; + +public class ArchiverServiceException extends RuntimeException { + + public ArchiverServiceException(String message) { + super(message); + } + + public ArchiverServiceException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index 8fb957b5..c70bb5a2 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -1,11 +1,7 @@ package org.phoebus.channelfinder.service.external; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import java.net.URI; -import java.time.Duration; -import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -16,14 +12,17 @@ import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.commons.lang3.StringUtils; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.MediaType; +import org.springframework.http.client.SimpleClientHttpRequestFactory; import org.springframework.stereotype.Component; -import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.client.RestClient; import org.springframework.web.util.UriComponentsBuilder; -import reactor.core.publisher.Mono; @Component public class ArchiverService { @@ -32,19 +31,45 @@ public class ArchiverService { 100; // Limit comes from tomcat server maxHttpHeaderSize which by default is a header of size // 8k - private final WebClient client = WebClient.create(); + private final RestClient client; private static final String MGMT_RESOURCE = "/mgmt/bpl"; private static final String POLICY_RESOURCE = MGMT_RESOURCE + "/getPolicyList"; private static final String PV_STATUS_RESOURCE = MGMT_RESOURCE + "/getPVStatus"; private static final ObjectMapper objectMapper = new ObjectMapper(); - @Value("${aa.timeout_seconds:15}") - private int timeoutSeconds; + private static final MediaType CONTENT_TYPE = MediaType.APPLICATION_JSON; + + private enum StatusResponseKey { + PV("pv"), + STATUS("status"), + PV_NAME("pvName"); + private final String key; + + StatusResponseKey(String key) { + this.key = key; + } + + String key() { + return this.key; + } + } @Value("${aa.post_support:}") private List postSupportArchivers; + @Autowired + public ArchiverService( + RestClient.Builder builder, @Value("${aa.timeout_seconds:15}") int timeoutSeconds) { + SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); + factory.setReadTimeout(timeoutSeconds * 1000); + this.client = builder.requestFactory(factory).build(); + } + + ArchiverService(RestClient.Builder builder) { + this.client = builder.build(); + } + private Stream> partitionSet(Set pvSet, int pageSize) { List list = new ArrayList<>(pvSet); return IntStream.range(0, (list.size() + pageSize - 1) / pageSize) @@ -54,11 +79,11 @@ private Stream> partitionSet(Set pvSet, int pageSize) { public List> getStatuses( Map archivePVS, String archiverURL, String archiverAlias) { Set pvs = archivePVS.keySet(); - Boolean postSupportOverride = postSupportArchivers.contains(archiverAlias); + boolean postSupportOverride = postSupportArchivers.contains(archiverAlias); logger.log(Level.INFO, "Archiver Alias: {0}", archiverAlias); logger.log(Level.INFO, "Post Support Override Archivers: {0}", postSupportArchivers); - if (Boolean.TRUE.equals(postSupportOverride)) { + if (postSupportOverride) { logger.log(Level.INFO, "Post Support"); return getStatusesFromPvListBody(archiverURL, pvs.stream().toList()); } else { @@ -77,90 +102,110 @@ private List> getStatusesFromPvListQuery( String uriString = archiverURL + PV_STATUS_RESOURCE; URI pvStatusURI = UriComponentsBuilder.fromUri(URI.create(uriString)) - .queryParam("pv", String.join(",", pvs)) + .queryParam(StatusResponseKey.PV.key(), String.join(",", pvs)) .build() .toUri(); - String response = - client - .get() - .uri(pvStatusURI) - .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - try { - return objectMapper.readValue(response, new TypeReference<>() {}); - } catch (JsonProcessingException e) { - logger.log(Level.WARNING, "Could not parse pv status response: " + e.getMessage()); + List> result = + client.get().uri(pvStatusURI).retrieve().body(new ParameterizedTypeReference<>() {}); + return result != null ? result : List.of(); } catch (Exception e) { logger.log( Level.WARNING, - String.format("Error when trying to get status from pv list query: %s", e.getMessage())); + String.format( + "There was an error getting a response with URI: %s. Error: %s", + uriString, e.getMessage())); + return List.of(); } - return List.of(); } private List> getStatusesFromPvListBody( String archiverURL, List pvs) { String uriString = archiverURL + PV_STATUS_RESOURCE; - String response = - client - .post() - .uri(URI.create(uriString)) - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(pvs) - .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - - // Structure of response is - // [{"pvName":"PV:1", "status":"Paused", ... }, {"pvName": "PV:2"}, {"status": "Being - // archived"}, ...}, ... - // ] - try { - return objectMapper.readValue(response, new TypeReference<>() {}); - } catch (JsonProcessingException e) { - logger.log(Level.WARNING, "Could not parse pv status response: " + e.getMessage()); + List> result = + client + .post() + .uri(URI.create(uriString)) + .contentType(CONTENT_TYPE) + .body(pvs) + .retrieve() + .body(new ParameterizedTypeReference<>() {}); + return result != null ? result : List.of(); } catch (Exception e) { logger.log( Level.WARNING, - String.format("Error when trying to get status from pv list body: %s", e.getMessage())); + String.format( + "There was an error getting a response with URI: %s. Error: %s", + uriString, e.getMessage())); + return List.of(); } - return List.of(); } - private void submitAction(String values, String endpoint, String aaURL) { - String uriString = aaURL + MGMT_RESOURCE + endpoint; + private List> sendRequest(Object payload, String uriString) { try { - String response = + String values = objectMapper.writeValueAsString(payload); + List> response = client .post() .uri(URI.create(uriString)) - .contentType(MediaType.APPLICATION_JSON) - .bodyValue(values) + .contentType(CONTENT_TYPE) + .body(values) .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(timeoutSeconds, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - logger.log(Level.FINE, () -> response); - + .body(new ParameterizedTypeReference<>() {}); + if (response == null) { + throw new ArchiverServiceException("No response from " + uriString); + } + return response; } catch (Exception e) { - logger.log( - Level.WARNING, - String.format("Failed to submit %s to %s on %s", values, endpoint, aaURL), - e); + throw new ArchiverServiceException( + String.format("Failed to submit %s to %s", payload, uriString), e); } } - public long configureAA(Map> archivePVS, String aaURL) - throws JsonProcessingException { + List submitArchiveAction(List pvs, List payload, String aaURL) { + String endpoint = ArchiveAction.ARCHIVE.getEndpoint(); + String uriString = aaURL + MGMT_RESOURCE + endpoint; + List> response = sendRequest(payload, uriString); + return validateSubmitActionResponse(pvs, ArchiveAction.ARCHIVE, response); + } + + List submitBasicAction(List pvs, ArchiveAction action, String aaURL) { + String endpoint = action.getEndpoint(); + String uriString = aaURL + MGMT_RESOURCE + endpoint; + List> response = sendRequest(pvs, uriString); + return validateSubmitActionResponse(pvs, action, response); + } + + private static List validateSubmitActionResponse( + List pvs, ArchiveAction action, List> response) { + List successfulPvs = new ArrayList<>(); + List failedPvs = new ArrayList<>(); + for (int i = 0; i < response.size(); i++) { + Map result = response.get(i); + String pv = result.get(StatusResponseKey.PV_NAME.key()); + if (pv == null) { + pv = result.get(StatusResponseKey.PV.key()); + } + if (pv == null) { + pv = (i < pvs.size()) ? pvs.get(i) : "UNKNOWN_PV"; + } + String status = result.get(StatusResponseKey.STATUS.key()); + if (!action.getSuccessfulStatus().equalsIgnoreCase(status)) { + failedPvs.add(pv); + } else { + logger.log(Level.FINE, "Successfully submitted " + action + " for PV " + pv); + successfulPvs.add(pv); + } + } + if (!failedPvs.isEmpty()) { + logger.log(Level.WARNING, "Failed to submit " + action + " for PVs: " + failedPvs); + } + return successfulPvs; + } + + public long configureAA(Map> archivePVS, String aaURL) { logger.log( Level.INFO, () -> String.format("Configure PVs %s in %s", archivePVS.toString(), aaURL)); long count = 0; @@ -168,44 +213,34 @@ public long configureAA(Map> archivePVS, S if (archivePVS.isEmpty()) { return count; } - if (!archivePVS.get(ArchiveAction.ARCHIVE).isEmpty()) { - logger.log( - Level.INFO, - () -> - "Submitting to be archived " + archivePVS.get(ArchiveAction.ARCHIVE).size() + " pvs"); - submitAction( - objectMapper.writeValueAsString(archivePVS.get(ArchiveAction.ARCHIVE)), - ArchiveAction.ARCHIVE.getEndpoint(), - aaURL); - count += archivePVS.get(ArchiveAction.ARCHIVE).size(); - } - if (!archivePVS.get(ArchiveAction.PAUSE).isEmpty()) { - logger.log( - Level.INFO, - () -> "Submitting to be paused " + archivePVS.get(ArchiveAction.PAUSE).size() + " pvs"); - submitAction( - objectMapper.writeValueAsString( - archivePVS.get(ArchiveAction.PAUSE).stream() - .map(ArchivePVOptions::getPv) - .collect(Collectors.toList())), - ArchiveAction.PAUSE.getEndpoint(), - aaURL); - count += archivePVS.get(ArchiveAction.PAUSE).size(); + + count += processAction(ArchiveAction.ARCHIVE, archivePVS.get(ArchiveAction.ARCHIVE), aaURL); + count += processAction(ArchiveAction.PAUSE, archivePVS.get(ArchiveAction.PAUSE), aaURL); + count += processAction(ArchiveAction.RESUME, archivePVS.get(ArchiveAction.RESUME), aaURL); + + return count; + } + + private long processAction(ArchiveAction action, List options, String aaURL) { + if (options.isEmpty()) { + return 0; } - if (!archivePVS.get(ArchiveAction.RESUME).isEmpty()) { - logger.log( - Level.INFO, - () -> "Submitting to be resumed " + archivePVS.get(ArchiveAction.RESUME).size() + " pvs"); - submitAction( - objectMapper.writeValueAsString( - archivePVS.get(ArchiveAction.RESUME).stream() - .map(ArchivePVOptions::getPv) - .collect(Collectors.toList())), - ArchiveAction.RESUME.getEndpoint(), - aaURL); - count += archivePVS.get(ArchiveAction.RESUME).size(); + logger.log( + Level.INFO, + () -> "Submitting to be " + action.name().toLowerCase() + "d " + options.size() + " pvs"); + List pvs = options.stream().map(ArchivePVOptions::getPv).collect(Collectors.toList()); + try { + List successfulPvs; + if (action == ArchiveAction.ARCHIVE) { + successfulPvs = submitArchiveAction(pvs, options, aaURL); + } else { + successfulPvs = submitBasicAction(pvs, action, aaURL); + } + return successfulPvs.size(); + } catch (ArchiverServiceException e) { + logger.log(Level.WARNING, "Failed to submit " + action.name().toLowerCase() + " request", e); + return 0; } - return count; } public List getAAPolicies(String aaURL) { @@ -214,16 +249,15 @@ public List getAAPolicies(String aaURL) { } try { String uriString = aaURL + POLICY_RESOURCE; - String response = + Map policyMap = client .get() .uri(URI.create(uriString)) .retrieve() - .bodyToMono(String.class) - .timeout(Duration.of(10, ChronoUnit.SECONDS)) - .onErrorResume(e -> showError(uriString, e)) - .block(); - Map policyMap = objectMapper.readValue(response, Map.class); + .body(new ParameterizedTypeReference<>() {}); + if (policyMap == null) { + return List.of(); + } return new ArrayList<>(policyMap.keySet()); } catch (Exception e) { // problem collecting policies from AA, so warn and return empty list @@ -231,13 +265,4 @@ public List getAAPolicies(String aaURL) { return List.of(); } } - - private Mono showError(String uriString, Throwable error) { - logger.log( - Level.WARNING, - String.format( - "There was an error getting a response with URI: %s. Error: %s", - uriString, error.getMessage())); - return Mono.empty(); - } } diff --git a/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiveAction.java b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiveAction.java index 40d3d4be..c64fd49b 100644 --- a/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiveAction.java +++ b/src/main/java/org/phoebus/channelfinder/service/model/archiver/aa/ArchiveAction.java @@ -1,18 +1,24 @@ package org.phoebus.channelfinder.service.model.archiver.aa; public enum ArchiveAction { - ARCHIVE("/archivePV"), - PAUSE("/pauseArchivingPV"), - RESUME("/resumeArchivingPV"), - NONE(""); + ARCHIVE("/archivePV", "Archive request submitted"), + PAUSE("/pauseArchivingPV", "ok"), + RESUME("/resumeArchivingPV", "ok"), + NONE("", "ok"); private final String endpoint; + private final String successfulStatus; - ArchiveAction(final String endpoint) { + ArchiveAction(final String endpoint, final String successfulStatus) { this.endpoint = endpoint; + this.successfulStatus = successfulStatus; } public String getEndpoint() { return this.endpoint; } + + public String getSuccessfulStatus() { + return this.successfulStatus; + } } diff --git a/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java new file mode 100644 index 00000000..68c2efc4 --- /dev/null +++ b/src/test/java/org/phoebus/channelfinder/service/external/ArchiverServiceTest.java @@ -0,0 +1,354 @@ +package org.phoebus.channelfinder.service.external; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.springframework.test.web.client.match.MockRestRequestMatchers.content; +import static org.springframework.test.web.client.match.MockRestRequestMatchers.method; +import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo; +import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.junit.jupiter.MockitoExtension; +import org.phoebus.channelfinder.exceptions.ArchiverServiceException; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.test.util.ReflectionTestUtils; +import org.springframework.test.web.client.MockRestServiceServer; +import org.springframework.web.client.RestClient; + +@ExtendWith(MockitoExtension.class) +class ArchiverServiceTest { + + private static final String ARCHIVER_URL = "http://localhost:17665"; + private MockRestServiceServer mockServer; + private ArchiverService archiverService; + private ObjectMapper objectMapper; + + @BeforeEach + void setUp() { + RestClient.Builder builder = RestClient.builder(); + mockServer = MockRestServiceServer.bindTo(builder).build(); + + archiverService = new ArchiverService(builder); + objectMapper = new ObjectMapper(); + + ReflectionTestUtils.setField(archiverService, "postSupportArchivers", List.of("test-archiver")); + } + + @AfterEach + void tearDown() { + mockServer.verify(); + } + + @ParameterizedTest + @ValueSource(strings = {"other-archiver", "test-archiver"}) + void testGetStatuses(String archiverAlias) throws JsonProcessingException { + + Map pvs = Map.of("pv1", new ArchivePVOptions()); + + List> expectedResponse = + List.of(Map.of("pv", "pv1", "status", "Being archived")); + + if ("test-archiver".equals(archiverAlias)) { + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus")) + .andExpect(method(HttpMethod.POST)) + .andExpect(content().json("[\"pv1\"]")) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); + } else { + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus?pv=pv1")) + .andExpect(method(HttpMethod.GET)) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); + } + + List> result = + archiverService.getStatuses(pvs, ARCHIVER_URL, archiverAlias); + + assertEquals(1, result.size()); + assertEquals("pv1", result.getFirst().get("pv")); + assertEquals("Being archived", result.getFirst().get("status")); + } + + @Test + void testGetStatusesInvalidResponse() { + + Map pvs = Map.of("pv1", new ArchivePVOptions()); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPVStatus?pv=pv1")) + .andExpect(method(HttpMethod.GET)) + .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); + + List> result = + archiverService.getStatuses(pvs, ARCHIVER_URL, "other-archiver"); + + assertTrue(result.isEmpty()); + } + + @Test + void testSubmitBasicAction() throws JsonProcessingException { + + List pvs = List.of("pv1"); + List> expectedResponse = List.of(Map.of("pv", "pv1", "status", "ok")); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl")) + .andExpect(method(HttpMethod.POST)) + .andExpect(content().json("[\"pv1\"]")) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); + + List successfulPvs = + archiverService.submitBasicAction(pvs, ArchiveAction.NONE, ARCHIVER_URL); + assertEquals(1, successfulPvs.size()); + assertEquals("pv1", successfulPvs.getFirst()); + } + + @Test + void testSubmitBasicActionStatusNotOk() throws JsonProcessingException { + + List pvs = List.of("pv1"); + List> expectedResponse = List.of(Map.of("pv", "pv1", "status", "failed")); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl")) + .andExpect(method(HttpMethod.POST)) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); + + List successfulPvs = + archiverService.submitBasicAction(pvs, ArchiveAction.NONE, ARCHIVER_URL); + assertTrue(successfulPvs.isEmpty()); + } + + @Test + void testSubmitBasicActionPartialFailure() throws JsonProcessingException { + + List pvs = List.of("pv1", "pv2"); + List> expectedResponse = + List.of(Map.of("pv", "pv1", "status", "ok"), Map.of("pv", "pv2", "status", "failed")); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl")) + .andExpect(method(HttpMethod.POST)) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); + + List successfulPvs = + archiverService.submitBasicAction(pvs, ArchiveAction.NONE, ARCHIVER_URL); + assertEquals(1, successfulPvs.size()); + assertEquals("pv1", successfulPvs.getFirst()); + } + + @Test + void testSubmitBasicActionInvalidResponse() { + + List pvs = List.of("pv1"); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl")) + .andExpect(method(HttpMethod.POST)) + .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); + + assertThrows( + ArchiverServiceException.class, + () -> archiverService.submitBasicAction(pvs, ArchiveAction.NONE, ARCHIVER_URL)); + } + + @ParameterizedTest + @EnumSource(ArchiveAction.class) + void testConfigureAA(ArchiveAction action) throws JsonProcessingException { + + ArchivePVOptions options = new ArchivePVOptions(); + options.setPv("pv1"); + + Map> archivePVS = new EnumMap<>(ArchiveAction.class); + archivePVS.put(ArchiveAction.ARCHIVE, List.of()); + archivePVS.put(ArchiveAction.PAUSE, List.of()); + archivePVS.put(ArchiveAction.RESUME, List.of()); + archivePVS.put(action, List.of(options)); + + String status = action == ArchiveAction.ARCHIVE ? "Archive request submitted" : "ok"; + List> expectedResponse = List.of(Map.of("pv", "pv1", "status", status)); + + if (action != ArchiveAction.NONE) { + String endpoint = action.getEndpoint(); + String expectedBody; + if (action == ArchiveAction.ARCHIVE) { + expectedBody = objectMapper.writeValueAsString(List.of(options)); + } else { + expectedBody = "[\"pv1\"]"; + } + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl" + endpoint)) + .andExpect(method(HttpMethod.POST)) + .andExpect(content().json(expectedBody)) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); + } + + long count = archiverService.configureAA(archivePVS, ARCHIVER_URL); + + assertEquals(action != ArchiveAction.NONE ? 1 : 0, count); + } + + @Test + void testConfigureAAStatusNotOk() throws JsonProcessingException { + + ArchivePVOptions options = new ArchivePVOptions(); + options.setPv("pv1"); + Map> archivePVS = + Map.of( + ArchiveAction.ARCHIVE, List.of(options), + ArchiveAction.PAUSE, List.of(), + ArchiveAction.RESUME, List.of()); + + List> expectedResponse = List.of(Map.of("pv", "pv1", "status", "failed")); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl" + ArchiveAction.ARCHIVE.getEndpoint())) + .andExpect(method(HttpMethod.POST)) + .andRespond( + withSuccess( + objectMapper.writeValueAsString(expectedResponse), MediaType.APPLICATION_JSON)); + + long count = archiverService.configureAA(archivePVS, ARCHIVER_URL); + + assertEquals(0, count); + } + + @Test + void testConfigureAAInvalidResponse() { + + ArchivePVOptions options = new ArchivePVOptions(); + options.setPv("pv1"); + Map> archivePVS = + Map.of( + ArchiveAction.ARCHIVE, List.of(options), + ArchiveAction.PAUSE, List.of(), + ArchiveAction.RESUME, List.of()); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl" + ArchiveAction.ARCHIVE.getEndpoint())) + .andExpect(method(HttpMethod.POST)) + .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); + + long count = archiverService.configureAA(archivePVS, ARCHIVER_URL); + + assertEquals(0, count); + } + + @Test + void testGetAAPolicies() throws JsonProcessingException { + + Map policies = Map.of("policy1", "desc1", "policy2", "desc2"); + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPolicyList")) + .andExpect(method(HttpMethod.GET)) + .andRespond( + withSuccess(objectMapper.writeValueAsString(policies), MediaType.APPLICATION_JSON)); + + List result = archiverService.getAAPolicies(ARCHIVER_URL); + + assertEquals(2, result.size()); + assertTrue(result.contains("policy1")); + assertTrue(result.contains("policy2")); + } + + @Test + void testGetAAPoliciesInvalidResponse() { + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl/getPolicyList")) + .andExpect(method(HttpMethod.GET)) + .andRespond(withSuccess("invalid-json", MediaType.APPLICATION_JSON)); + + List result = archiverService.getAAPolicies(ARCHIVER_URL); + + assertTrue(result.isEmpty()); + } + + @Test + void testSubmitActionWithRealResponseResume() { + + List pvs = List.of("PV1", "PV2"); + String responseBody = + "[{\"pvName\":\"PV1\",\"engine_desc\":\"Successfully resumed the archiving of PV PV1\",\"engine_pvName\":\"PV1\",\"engine_status\":\"ok\",\"status\":\"ok\"},{\"pvName\":\"PV2\",\"engine_desc\":\"Successfully resumed the archiving of PV PV2\",\"engine_pvName\":\"PV2\",\"engine_status\":\"ok\",\"status\":\"ok\"}]"; + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl" + ArchiveAction.RESUME.getEndpoint())) + .andExpect(method(HttpMethod.POST)) + .andExpect(content().json("[\"PV1\",\"PV2\"]")) + .andRespond(withSuccess(responseBody, MediaType.APPLICATION_JSON)); + + List successfulPvs = + archiverService.submitBasicAction(pvs, ArchiveAction.RESUME, ARCHIVER_URL); + assertEquals(2, successfulPvs.size()); + assertTrue(successfulPvs.contains("PV1")); + assertTrue(successfulPvs.contains("PV2")); + } + + @Test + void testSubmitActionWithRealResponsePause() { + + List pvs = List.of("PV1"); + String responseBody = + "[{\"pvName\":\"PV1\",\"engine_desc\":\"Successfully paused the archiving of PV PV1\",\"engine_pvName\":\"PV1\",\"engine_status\":\"ok\",\"etl_status\":\"ok\",\"etl_desc\":\"Successfully removed PV PV1 from the cluster\",\"etl_pvName\":\"PV1\",\"status\":\"ok\"}]"; + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl" + ArchiveAction.PAUSE.getEndpoint())) + .andExpect(method(HttpMethod.POST)) + .andExpect(content().json("[\"PV1\"]")) + .andRespond(withSuccess(responseBody, MediaType.APPLICATION_JSON)); + + List successfulPvs = + archiverService.submitBasicAction(pvs, ArchiveAction.PAUSE, ARCHIVER_URL); + assertEquals(1, successfulPvs.size()); + assertTrue(successfulPvs.contains("PV1")); + } + + @Test + void testSubmitActionWithRealResponseArchive() throws JsonProcessingException { + + List pvs = List.of("PV1"); + ArchivePVOptions options = new ArchivePVOptions(); + options.setPv("PV1"); + List payload = List.of(options); + String responseBody = "[{ \"pvName\": \"PV1\", \"status\": \"Archive request submitted\" }]"; + + mockServer + .expect(requestTo(ARCHIVER_URL + "/mgmt/bpl" + ArchiveAction.ARCHIVE.getEndpoint())) + .andExpect(method(HttpMethod.POST)) + .andExpect(content().json(objectMapper.writeValueAsString(payload))) + .andRespond(withSuccess(responseBody, MediaType.APPLICATION_JSON)); + + List successfulPvs = archiverService.submitArchiveAction(pvs, payload, ARCHIVER_URL); + assertEquals(1, successfulPvs.size()); + assertTrue(successfulPvs.contains("PV1")); + } +} From 88d0b7015b4d3a30a7939f35ff4d502f981167db Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 12 Feb 2026 09:47:44 +0100 Subject: [PATCH 4/7] Mock the ArchiverService for AAChannelProcessorIT tests Instead of the mockresponses --- .../processors/aa/AAChannelProcessorIT.java | 133 +++++++---------- .../aa/AAChannelProcessorMultiIT.java | 138 +++++------------- .../aa/AAChannelProcessorNoDefaultIT.java | 33 +---- .../aa/AAChannelProcessorNoPauseIT.java | 33 +---- .../aa/AAChannelProcessorStatusPauseIT.java | 33 +---- .../aa/AAChannelProcessorTagPauseIT.java | 33 +---- 6 files changed, 113 insertions(+), 290 deletions(-) diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java index 62b6206d..6e26515e 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorIT.java @@ -1,33 +1,39 @@ package org.phoebus.channelfinder.processors.aa; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.RecordedRequest; import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; +import org.mockito.junit.jupiter.MockitoExtension; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.configuration.ChannelProcessor; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.service.external.ArchiverService; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; @WebMvcTest(AAChannelProcessor.class) +@ExtendWith(MockitoExtension.class) @TestPropertySource(value = "classpath:application_aa_proc_test.properties") class AAChannelProcessorIT { @@ -35,11 +41,9 @@ class AAChannelProcessorIT { protected static Property activeProperty = new Property("pvStatus", "owner", "Active"); protected static Property inactiveProperty = new Property("pvStatus", "owner", "Inactive"); + @MockitoBean ArchiverService archiverService; @Autowired AAChannelProcessor aaChannelProcessor; - MockWebServer mockArchiverAppliance; - ObjectMapper objectMapper; - @NotNull private static Stream processSource() { return Stream.of( @@ -90,107 +94,80 @@ private static Stream processSource() { } public static void paramableAAChannelProcessorTest( - MockWebServer mockArchiverAppliance, - ObjectMapper objectMapper, + ArchiverService archiverService, ChannelProcessor aaChannelProcessor, List channels, String archiveStatus, - String archiverEndpoint, - String submissionBody) - throws JsonProcessingException, InterruptedException { - // Request to policies - Map policyList = Map.of("policy", "description"); - mockArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(policyList)) - .addHeader("Content-Type", "application/json")); + String archiverEndpoint) + throws JsonProcessingException { + // Mock getAAPolicies + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); if (!archiveStatus.isEmpty()) { - - // Request to archiver status + // Mock getStatuses List> archivePVStatuses = channels.stream() .map(channel -> Map.of("pvName", channel.getName(), "status", archiveStatus)) .toList(); - mockArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(archivePVStatuses)) - .addHeader("Content-Type", "application/json")); + when(archiverService.getStatuses(anyMap(), anyString(), anyString())) + .thenReturn(archivePVStatuses); } + if (!archiverEndpoint.isEmpty()) { - // Request to archiver to archive - List> archiverResponse = - channels.stream() - .map( - channel -> - Map.of("pvName", channel.getName(), "status", "Archive request submitted")) - .toList(); - mockArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(archiverResponse)) - .addHeader("Content-Type", "application/json")); + // Mock configureAA + when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) channels.size()); + } else { + when(archiverService.configureAA(anyMap(), anyString())).thenReturn(0L); } long count = aaChannelProcessor.process(channels); assertEquals(count, archiverEndpoint.isEmpty() ? 0 : channels.size()); - int expectedRequests = 0; - expectedRequests += 1; - RecordedRequest requestPolicy = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestPolicy != null; - assertEquals("/mgmt/bpl/getPolicyList", requestPolicy.getPath()); + // Verifications + verify(archiverService).getAAPolicies(anyString()); if (!archiveStatus.isEmpty()) { - expectedRequests += 1; - RecordedRequest requestStatus = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestStatus != null; - assert requestStatus.getRequestUrl() != null; - assertEquals("/mgmt/bpl/getPVStatus", requestStatus.getRequestUrl().encodedPath()); + verify(archiverService).getStatuses(anyMap(), anyString(), anyString()); } if (!archiverEndpoint.isEmpty()) { - expectedRequests += 1; - RecordedRequest requestAction = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestAction != null; - assertEquals("/mgmt/bpl/" + archiverEndpoint, requestAction.getPath()); - assertEquals(submissionBody, requestAction.getBody().readUtf8()); + ArgumentCaptor>> captor = + ArgumentCaptor.forClass(Map.class); + verify(archiverService).configureAA(captor.capture(), anyString()); + Map> map = captor.getValue(); + + ArchiveAction expectedAction = getActionFromEndpoint(archiverEndpoint); + if (expectedAction != null) { + assertTrue(map.containsKey(expectedAction)); + List options = map.get(expectedAction); + assertFalse(options.isEmpty()); + // We could parse submissionBody to be more strict, but checking the action is likely + // sufficient for now + // as we trust the mapping logic in AAChannelProcessor + } } - - assertEquals(mockArchiverAppliance.getRequestCount(), expectedRequests); - } - - @BeforeEach - void setUp() throws IOException { - mockArchiverAppliance = new MockWebServer(); - mockArchiverAppliance.start(17665); - - objectMapper = new ObjectMapper(); } - @AfterEach - void teardown() throws IOException { - mockArchiverAppliance.shutdown(); + private static ArchiveAction getActionFromEndpoint(String endpoint) { + if (endpoint.contains("resumeArchivingPV")) return ArchiveAction.RESUME; + if (endpoint.contains("pauseArchivingPV")) return ArchiveAction.PAUSE; + if (endpoint.contains("archivePV")) return ArchiveAction.ARCHIVE; + return null; } @Test void testProcessNoPVs() throws JsonProcessingException { aaChannelProcessor.process(List.of()); - assertEquals(mockArchiverAppliance.getRequestCount(), 0); + // verify interactions are minimal or none if empty + // But since list is empty, process returns 0 early } @ParameterizedTest @MethodSource("processSource") - void testProcessNotArchivedActive( - Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) - throws JsonProcessingException, InterruptedException { + void testProcessNotArchivedActive(Channel channel, String archiveStatus, String archiverEndpoint) + throws JsonProcessingException { paramableAAChannelProcessorTest( - mockArchiverAppliance, - objectMapper, - aaChannelProcessor, - List.of(channel), - archiveStatus, - archiverEndpoint, - submissionBody); + archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java index 834d6945..2a46c9b7 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiIT.java @@ -2,34 +2,33 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.RecordedRequest; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.ArgumentCaptor; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; +import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource(value = "classpath:application_aa_proc_test.properties") @@ -40,22 +39,7 @@ class AAChannelProcessorMultiIT { public static final String NOT_BEING_ARCHIVED = "Not being archived"; public static final String OWNER = "owner"; @Autowired AAChannelProcessor aaChannelProcessor; - - MockWebServer mockArchiverAppliance; - ObjectMapper objectMapper; - - @BeforeEach - void setUp() throws IOException { - mockArchiverAppliance = new MockWebServer(); - mockArchiverAppliance.start(17665); - - objectMapper = new ObjectMapper(); - } - - @AfterEach - void teardown() throws IOException { - mockArchiverAppliance.shutdown(); - } + @MockitoBean ArchiverService archiverService; static Stream provideArguments() { List channels = @@ -117,92 +101,46 @@ void testProcessMulti( Map namesToStatuses, Map> actionsToNames, int expectedProcessedChannels) - throws JsonProcessingException, InterruptedException { + throws JsonProcessingException { - // Request to policies - Map policyList = Map.of("policy", "description"); - mockArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(policyList)) - .addHeader("Content-Type", "application/json")); + // Mock getAAPolicies + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); - // Request to archiver status + // Mock getStatuses List> archivePVStatuses = namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); - mockArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(archivePVStatuses)) - .addHeader("Content-Type", "application/json")); + when(archiverService.getStatuses(anyMap(), anyString(), anyString())) + .thenReturn(archivePVStatuses); - // Requests to archiver - actionsToNames.forEach( - (key, value) -> { - List> archiverResponse = - value.stream() - .map(channel -> Map.of("pvName", channel, "status", key + " request submitted")) - .toList(); - try { - mockArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(archiverResponse)) - .addHeader("Content-Type", "application/json")); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - }); + // Mock configureAA + when(archiverService.configureAA(anyMap(), anyString())) + .thenReturn((long) expectedProcessedChannels); long count = aaChannelProcessor.process(channels); - assertEquals(count, expectedProcessedChannels); - - AtomicInteger expectedRequests = new AtomicInteger(1); - RecordedRequest requestPolicy = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestPolicy != null; - assertEquals("/mgmt/bpl/getPolicyList", requestPolicy.getPath()); + assertEquals(expectedProcessedChannels, count); - expectedRequests.addAndGet(1); - RecordedRequest requestStatus = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestStatus != null; - assert requestStatus.getRequestUrl() != null; - assertEquals("/mgmt/bpl/getPVStatus", requestStatus.getRequestUrl().encodedPath()); - String pvStatusRequestParameter = requestStatus.getRequestUrl().queryParameter("pv"); - namesToStatuses - .keySet() - .forEach( - name -> { - assert pvStatusRequestParameter != null; - assertTrue(pvStatusRequestParameter.contains(name)); - }); + verify(archiverService).getAAPolicies(anyString()); + verify(archiverService).getStatuses(anyMap(), anyString(), anyString()); - while (mockArchiverAppliance.getRequestCount() > 0) { - RecordedRequest requestAction = null; - try { - requestAction = mockArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (requestAction == null) { - break; - } - expectedRequests.addAndGet(1); - assert requestAction.getPath() != null; - assertTrue(requestAction.getPath().startsWith("/mgmt/bpl")); - ArchiveAction key = - actionFromEndpoint(requestAction.getPath().substring("/mgmt/bpl".length())); - String body = requestAction.getBody().readUtf8(); - actionsToNames.get(key).forEach(pv -> assertTrue(body.contains(pv))); - } - - assertEquals(mockArchiverAppliance.getRequestCount(), expectedRequests.get()); - } + ArgumentCaptor>> captor = + ArgumentCaptor.forClass(Map.class); + verify(archiverService).configureAA(captor.capture(), anyString()); + Map> capturedMap = captor.getValue(); - public ArchiveAction actionFromEndpoint(final String endpoint) { - for (ArchiveAction action : ArchiveAction.values()) { - if (action.getEndpoint().equals(endpoint)) { - return action; - } - } - return null; + actionsToNames.forEach( + (action, names) -> { + if (names.isEmpty()) { + return; + } + assertTrue(capturedMap.containsKey(action)); + List capturedNames = + capturedMap.get(action).stream() + .map(ArchivePVOptions::getPv) + .collect(Collectors.toList()); + assertTrue(capturedNames.containsAll(names)); + assertTrue(names.containsAll(capturedNames)); + }); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java index 74cdf96f..3fec1608 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoDefaultIT.java @@ -5,22 +5,19 @@ import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.util.List; import java.util.stream.Stream; -import okhttp3.mockwebserver.MockWebServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; import org.phoebus.channelfinder.entity.Property; +import org.phoebus.channelfinder.service.external.ArchiverService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( @@ -31,8 +28,7 @@ class AAChannelProcessorNoDefaultIT { @Autowired AAChannelProcessor aaChannelProcessor; - MockWebServer mockArchiverAppliance; - ObjectMapper objectMapper; + @MockitoBean ArchiverService archiverService; private static Stream processNoPauseSource() { @@ -54,31 +50,12 @@ private static Stream processNoPauseSource() { "[{\"pv\":\"PVNoneActiveArchiver\"}]")); } - @BeforeEach - void setUp() throws IOException { - mockArchiverAppliance = new MockWebServer(); - mockArchiverAppliance.start(17665); - - objectMapper = new ObjectMapper(); - } - - @AfterEach - void teardown() throws IOException { - mockArchiverAppliance.shutdown(); - } - @ParameterizedTest @MethodSource("processNoPauseSource") void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) - throws JsonProcessingException, InterruptedException { + throws JsonProcessingException { paramableAAChannelProcessorTest( - mockArchiverAppliance, - objectMapper, - aaChannelProcessor, - List.of(channel), - archiveStatus, - archiverEndpoint, - submissionBody); + archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java index 58d9cb2f..ec0f32c6 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorNoPauseIT.java @@ -5,21 +5,18 @@ import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.util.List; import java.util.stream.Stream; -import okhttp3.mockwebserver.MockWebServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.service.external.ArchiverService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( @@ -29,8 +26,7 @@ class AAChannelProcessorNoPauseIT { @Autowired AAChannelProcessor aaChannelProcessor; - MockWebServer mockArchiverAppliance; - ObjectMapper objectMapper; + @MockitoBean ArchiverService archiverService; private static Stream processNoPauseSource() { @@ -47,31 +43,12 @@ private static Stream processNoPauseSource() { Arguments.of(new Channel("PVArchivedNotag", "owner", List.of(), List.of()), "", "", "")); } - @BeforeEach - void setUp() throws IOException { - mockArchiverAppliance = new MockWebServer(); - mockArchiverAppliance.start(17665); - - objectMapper = new ObjectMapper(); - } - - @AfterEach - void teardown() throws IOException { - mockArchiverAppliance.shutdown(); - } - @ParameterizedTest @MethodSource("processNoPauseSource") void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) - throws JsonProcessingException, InterruptedException { + throws JsonProcessingException { paramableAAChannelProcessorTest( - mockArchiverAppliance, - objectMapper, - aaChannelProcessor, - List.of(channel), - archiveStatus, - archiverEndpoint, - submissionBody); + archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java index a7171e92..deda7c26 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorStatusPauseIT.java @@ -5,21 +5,18 @@ import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.util.List; import java.util.stream.Stream; -import okhttp3.mockwebserver.MockWebServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.service.external.ArchiverService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( @@ -29,8 +26,7 @@ class AAChannelProcessorStatusPauseIT { @Autowired AAChannelProcessor aaChannelProcessor; - MockWebServer mockArchiverAppliance; - ObjectMapper objectMapper; + @MockitoBean ArchiverService archiverService; private static Stream processNoPauseSource() { @@ -47,31 +43,12 @@ private static Stream processNoPauseSource() { Arguments.of(new Channel("PVArchivedNotag", "owner", List.of(), List.of()), "", "", "")); } - @BeforeEach - void setUp() throws IOException { - mockArchiverAppliance = new MockWebServer(); - mockArchiverAppliance.start(17665); - - objectMapper = new ObjectMapper(); - } - - @AfterEach - void teardown() throws IOException { - mockArchiverAppliance.shutdown(); - } - @ParameterizedTest @MethodSource("processNoPauseSource") void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) - throws JsonProcessingException, InterruptedException { + throws JsonProcessingException { paramableAAChannelProcessorTest( - mockArchiverAppliance, - objectMapper, - aaChannelProcessor, - List.of(channel), - archiveStatus, - archiverEndpoint, - submissionBody); + archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); } } diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java index bcb3ced4..310f8047 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorTagPauseIT.java @@ -5,21 +5,18 @@ import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.paramableAAChannelProcessorTest; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.util.List; import java.util.stream.Stream; -import okhttp3.mockwebserver.MockWebServer; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.service.external.ArchiverService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource( @@ -29,8 +26,7 @@ class AAChannelProcessorTagPauseIT { @Autowired AAChannelProcessor aaChannelProcessor; - MockWebServer mockArchiverAppliance; - ObjectMapper objectMapper; + @MockitoBean ArchiverService archiverService; private static Stream processNoPauseSource() { @@ -51,31 +47,12 @@ private static Stream processNoPauseSource() { "[\"PVArchivedNotag\"]")); } - @BeforeEach - void setUp() throws IOException { - mockArchiverAppliance = new MockWebServer(); - mockArchiverAppliance.start(17665); - - objectMapper = new ObjectMapper(); - } - - @AfterEach - void teardown() throws IOException { - mockArchiverAppliance.shutdown(); - } - @ParameterizedTest @MethodSource("processNoPauseSource") void testProcessNotArchivedActive( Channel channel, String archiveStatus, String archiverEndpoint, String submissionBody) - throws JsonProcessingException, InterruptedException { + throws JsonProcessingException { paramableAAChannelProcessorTest( - mockArchiverAppliance, - objectMapper, - aaChannelProcessor, - List.of(channel), - archiveStatus, - archiverEndpoint, - submissionBody); + archiverService, aaChannelProcessor, List.of(channel), archiveStatus, archiverEndpoint); } } From 7a6286643d818d5706e9df52af1a7fcfbcc731e9 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Thu, 12 Feb 2026 16:00:56 +0100 Subject: [PATCH 5/7] Refactor AAChannelProcessorMultiArchiverIT to use MockitoBean --- .../aa/AAChannelProcessorMultiArchiverIT.java | 170 +++--------------- 1 file changed, 22 insertions(+), 148 deletions(-) diff --git a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java index 85dc1f2a..56a2b119 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/aa/AAChannelProcessorMultiArchiverIT.java @@ -1,35 +1,31 @@ package org.phoebus.channelfinder.processors.aa; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.activeProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.archiveProperty; import static org.phoebus.channelfinder.processors.aa.AAChannelProcessorIT.inactiveProperty; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; -import okhttp3.mockwebserver.MockResponse; -import okhttp3.mockwebserver.MockWebServer; -import okhttp3.mockwebserver.RecordedRequest; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.phoebus.channelfinder.configuration.AAChannelProcessor; import org.phoebus.channelfinder.entity.Channel; +import org.phoebus.channelfinder.service.external.ArchiverService; import org.phoebus.channelfinder.service.model.archiver.aa.ArchiveAction; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; @WebMvcTest(AAChannelProcessor.class) @TestPropertySource(value = "classpath:application_test_multi.properties") @@ -40,26 +36,7 @@ class AAChannelProcessorMultiArchiverIT { public static final String NOT_BEING_ARCHIVED = "Not being archived"; public static final String OWNER = "owner"; @Autowired AAChannelProcessor aaChannelProcessor; - - MockWebServer mockQueryArchiverAppliance; - MockWebServer mockPostArchiverAppliance; - ObjectMapper objectMapper; - - @BeforeEach - void setUp() throws IOException { - mockQueryArchiverAppliance = new MockWebServer(); - mockQueryArchiverAppliance.start(17664); - mockPostArchiverAppliance = new MockWebServer(); - mockPostArchiverAppliance.start(17665); - - objectMapper = new ObjectMapper(); - } - - @AfterEach - void teardown() throws IOException { - mockQueryArchiverAppliance.shutdown(); - mockPostArchiverAppliance.shutdown(); - } + @MockitoBean ArchiverService archiverService; static Stream provideArguments() { List channels = @@ -119,147 +96,44 @@ static Stream provideArguments() { void testProcessMultiArchivers( List channels, Map namesToStatuses, - Map> actionsToNames, - int expectedProcessedChannels) - throws JsonProcessingException, InterruptedException { - - // Request to policies - Map policyList = Map.of("policy", "description"); - mockQueryArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(policyList)) - .addHeader("Content-Type", "application/json")); + Map> actionsToNames) + throws JsonProcessingException { + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); // Request to archiver status List> archivePVStatuses = namesToStatuses.entrySet().stream() .map(entry -> Map.of("pvName", entry.getKey(), "status", entry.getValue())) .toList(); - mockQueryArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(archivePVStatuses)) - .addHeader("Content-Type", "application/json")); + when(archiverService.getStatuses(anyMap(), anyString(), anyString())) + .thenReturn(archivePVStatuses); // Requests to archiver actionsToNames.forEach( (key, value) -> { - List> archiverResponse = - value.stream() - .map(channel -> Map.of("pvName", channel, "status", key + " request submitted")) - .toList(); - try { - mockQueryArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(archiverResponse)) - .addHeader("Content-Type", "application/json")); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) value.size()); }); // Request to policies - mockPostArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(policyList)) - .addHeader("Content-Type", "application/json")); + when(archiverService.getAAPolicies(anyString())).thenReturn(List.of("policy")); // Request to archiver status - mockPostArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(archivePVStatuses)) - .addHeader("Content-Type", "application/json")); + when(archiverService.getStatuses(anyMap(), anyString(), anyString())) + .thenReturn(archivePVStatuses); // Requests to archiver actionsToNames.forEach( (key, value) -> { - List> archiverResponse = - value.stream() - .map(channel -> Map.of("pvName", channel, "status", key + " request submitted")) - .toList(); - try { - mockPostArchiverAppliance.enqueue( - new MockResponse() - .setBody(objectMapper.writeValueAsString(archiverResponse)) - .addHeader("Content-Type", "application/json")); - } catch (JsonProcessingException e) { - throw new RuntimeException(e); - } + when(archiverService.configureAA(anyMap(), anyString())).thenReturn((long) value.size()); }); aaChannelProcessor.process(channels); - AtomicInteger expectedQueryRequests = new AtomicInteger(1); - RecordedRequest requestQueryPolicy = - mockQueryArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestQueryPolicy != null; - assertEquals("/mgmt/bpl/getPolicyList", requestQueryPolicy.getPath()); - - expectedQueryRequests.addAndGet(1); - RecordedRequest requestQueryStatus = - mockQueryArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestQueryStatus != null; - assert requestQueryStatus.getRequestUrl() != null; - assertEquals("/mgmt/bpl/getPVStatus", requestQueryStatus.getRequestUrl().encodedPath()); - - AtomicInteger expectedPostRequests = new AtomicInteger(1); - RecordedRequest requestPostPolicy = mockPostArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestPostPolicy != null; - assertEquals("/mgmt/bpl/getPolicyList", requestPostPolicy.getPath()); - - expectedPostRequests.addAndGet(1); - RecordedRequest requestPostStatus = mockPostArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - assert requestPostStatus != null; - assert requestPostStatus.getRequestUrl() != null; - assertEquals("/mgmt/bpl/getPVStatus", requestPostStatus.getRequestUrl().encodedPath()); - - while (mockQueryArchiverAppliance.getRequestCount() > 0) { - RecordedRequest requestAction = null; - try { - requestAction = mockQueryArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (requestAction == null) { - break; - } - expectedQueryRequests.addAndGet(1); - assert requestAction.getPath() != null; - assertTrue(requestAction.getPath().startsWith("/mgmt/bpl")); - ArchiveAction key = - actionFromEndpoint(requestAction.getPath().substring("/mgmt/bpl".length())); - String body = requestAction.getBody().readUtf8(); - actionsToNames.get(key).forEach(pv -> assertTrue(body.contains(pv))); - } - - while (mockPostArchiverAppliance.getRequestCount() > 0) { - RecordedRequest requestAction = null; - try { - requestAction = mockPostArchiverAppliance.takeRequest(2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if (requestAction == null) { - break; - } - expectedPostRequests.addAndGet(1); - assert requestAction.getPath() != null; - assertTrue(requestAction.getPath().startsWith("/mgmt/bpl")); - ArchiveAction key = - actionFromEndpoint(requestAction.getPath().substring("/mgmt/bpl".length())); - String body = requestAction.getBody().readUtf8(); - actionsToNames.get(key).forEach(pv -> assertTrue(body.contains(pv))); - } - - assertEquals(mockPostArchiverAppliance.getRequestCount(), expectedPostRequests.get()); - assertEquals(mockQueryArchiverAppliance.getRequestCount(), expectedQueryRequests.get()); - } + // Verifications + verify(archiverService, times(2)).getAAPolicies(anyString()); - public ArchiveAction actionFromEndpoint(final String endpoint) { - for (ArchiveAction action : ArchiveAction.values()) { - if (action.getEndpoint().equals(endpoint)) { - return action; - } + if (!namesToStatuses.isEmpty()) { + verify(archiverService, times(2)).getStatuses(anyMap(), anyString(), anyString()); } - return null; } } From 82ae89dce0a2e42b80ac996590aaa1bebbb4e9c0 Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 16 Feb 2026 14:08:14 +0100 Subject: [PATCH 6/7] Remove unneeded okhttp dependancy --- pom.xml | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pom.xml b/pom.xml index 30c5e5f2..48904012 100644 --- a/pom.xml +++ b/pom.xml @@ -170,18 +170,6 @@ - - com.squareup.okhttp3 - mockwebserver - 4.11.0 - test - - - junit - junit - - - io.netty netty-all From 778ad1ad1a60bdd63150100fae89d1c05ce420cf Mon Sep 17 00:00:00 2001 From: Sky Brewer Date: Mon, 16 Feb 2026 16:16:21 +0100 Subject: [PATCH 7/7] On conditional AAChannelProcessor configuration Means the integration tests won't try to autowire it --- .../channelfinder/configuration/AAChannelProcessor.java | 2 ++ .../channelfinder/service/external/ArchiverService.java | 2 ++ .../processors/ChannelProcessorControllerIT.java | 5 ++++- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java index e93d6ece..7625d8bd 100644 --- a/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java +++ b/src/main/java/org/phoebus/channelfinder/configuration/AAChannelProcessor.java @@ -21,6 +21,7 @@ import org.phoebus.channelfinder.service.model.archiver.aa.ArchiverInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Configuration; /** @@ -31,6 +32,7 @@ *

e.g. archive=monitor@1.0 */ @Configuration +@ConditionalOnProperty(name = "aa.enabled", havingValue = "true") public class AAChannelProcessor implements ChannelProcessor { private static final Logger logger = Logger.getLogger(AAChannelProcessor.class.getName()); diff --git a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java index c70bb5a2..c2ce34a7 100644 --- a/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java +++ b/src/main/java/org/phoebus/channelfinder/service/external/ArchiverService.java @@ -17,6 +17,7 @@ import org.phoebus.channelfinder.service.model.archiver.aa.ArchivePVOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.core.ParameterizedTypeReference; import org.springframework.http.MediaType; import org.springframework.http.client.SimpleClientHttpRequestFactory; @@ -25,6 +26,7 @@ import org.springframework.web.util.UriComponentsBuilder; @Component +@ConditionalOnProperty(name = "aa.enabled", havingValue = "true") public class ArchiverService { private static final Logger logger = Logger.getLogger(ArchiverService.class.getName()); private static final int STATUS_BATCH_SIZE = diff --git a/src/test/java/org/phoebus/channelfinder/processors/ChannelProcessorControllerIT.java b/src/test/java/org/phoebus/channelfinder/processors/ChannelProcessorControllerIT.java index c0d889cc..5787daa4 100644 --- a/src/test/java/org/phoebus/channelfinder/processors/ChannelProcessorControllerIT.java +++ b/src/test/java/org/phoebus/channelfinder/processors/ChannelProcessorControllerIT.java @@ -14,11 +14,13 @@ import org.phoebus.channelfinder.entity.Scroll; import org.phoebus.channelfinder.rest.api.IChannelScroll; import org.phoebus.channelfinder.rest.controller.ChannelProcessorController; +import org.phoebus.channelfinder.service.external.ArchiverService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.autoconfigure.web.servlet.WebMvcTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.http.HttpHeaders; import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.bean.override.mockito.MockitoBean; import org.springframework.test.context.junit.jupiter.SpringExtension; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder; @@ -27,7 +29,7 @@ @WebMvcTest(ChannelProcessorController.class) @TestPropertySource( value = "classpath:application_test.properties", - properties = {"elasticsearch.create.indices = false"}) + properties = {"elasticsearch.create.indices = false", "aa.enabled=true"}) class ChannelProcessorControllerIT { protected static final String AUTHORIZATION = @@ -35,6 +37,7 @@ class ChannelProcessorControllerIT { @Autowired protected MockMvc mockMvc; @MockBean IChannelScroll channelScroll; + @MockitoBean ArchiverService archiverService; @Test void testProcessorCount() throws Exception {