diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java index 68ff6621265..2c714ef88f7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java @@ -202,7 +202,7 @@ public static String[] getIPs(String strInterface, /** - * Returns the first available IP address associated with the provided + * Returns the first available IP address with a resolvable hostname associated with the provided * network interface or the local host IP if "default" is given. * * @param strInterface The name of the network interface or subinterface to query @@ -214,7 +214,24 @@ public static String[] getIPs(String strInterface, public static String getDefaultIP(String strInterface) throws UnknownHostException { String[] ips = getIPs(strInterface); - return ips[0]; + UnknownHostException lastException = null; + for (String ip : ips) { + String resolved = null; + try { + resolved = InetAddress.getByName(ip).getHostName(); + } catch (UnknownHostException e) { + // skip ip addresses that cannot be resolved to a hostname + lastException = e; + continue; + } + if (resolved.equals(ip)) { + lastException = new UnknownHostException( + "IP address " + ip + " for " + strInterface + " interface cannot be resolved to a hostname"); + continue; + } + return ip; + } + throw lastException; } /** diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java index 9c6be197550..2be82159e7e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/replication/Auditor.java @@ -35,6 +35,8 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Collectors; import org.apache.bookkeeper.client.BKException; @@ -76,11 +78,23 @@ public class Auditor implements AutoCloseable { private LedgerManager ledgerManager; private LedgerUnderreplicationManager ledgerUnderreplicationManager; private final ScheduledExecutorService executor; - private List knownBookies = new ArrayList(); + // volatile so the executor thread always sees the reference set by start() + private volatile List knownBookies = new ArrayList(); private final String bookieIdentifier; protected volatile Future auditTask; private final Set bookiesToBeAudited = Sets.newHashSet(); private volatile int lostBookieRecoveryDelayBeforeChange; + // Latest bookie sets received from watcher callbacks (null until the first watcher fires). + // Written by ZK watcher callback threads, read by the executor thread. + private final AtomicReference> pendingWritableBookies = new AtomicReference<>(); + private final AtomicReference> pendingReadOnlyBookies = new AtomicReference<>(); + // Counts tasks (running + queued) per change type. Incremented when a task is + // submitted, decremented when it finishes. Capped at 2 (one in-progress + one + // queued): a queued task always reads the latest pending bookie set when it + // starts, so there is no value in queuing more than one additional task. + private static final int MAX_AUDIT_TASKS_PER_TYPE = 2; + private final AtomicInteger writableAuditTaskCount = new AtomicInteger(0); + private final AtomicInteger readOnlyAuditTaskCount = new AtomicInteger(0); protected AuditorBookieCheckTask auditorBookieCheckTask; protected AuditorTask auditorCheckAllLedgersTask; protected AuditorTask auditorPlacementPolicyCheckTask; @@ -237,6 +251,46 @@ private void submitShutdownTask() { } } + /** + * Submit an audit task triggered by a bookie change notification from the watcher. + * + *

At most {@value #MAX_AUDIT_TASKS_PER_TYPE} tasks (one in-progress + one + * queued) are allowed per change type at any time. The counter is incremented on + * submission and decremented when the task finishes, so the limit is enforced + * across the full task lifetime. A queued task always reads the latest pending + * bookie set when it starts, so queuing more than one additional task would only + * duplicate work without improving correctness. + */ + private synchronized void submitAuditTaskForBookieChange(boolean writableChange) { + if (executor.isShutdown()) { + return; + } + AtomicInteger count = writableChange ? writableAuditTaskCount : readOnlyAuditTaskCount; + if (count.get() >= MAX_AUDIT_TASKS_PER_TYPE) { + // One task is already running and one is queued. The queued task will + // pick up the latest pendingWritableBookies / pendingReadOnlyBookies + // when it executes, so no additional task is needed. + return; + } + count.incrementAndGet(); + executor.submit(() -> { + try { + runAuditTask(); + } finally { + count.decrementAndGet(); + } + }); + } + + /** + * Submit a full bookie-check audit task unconditionally. Used by tests and by the + * LostBookieRecoveryDelay-changed event handler. + * + *

Runs the full {@code auditBookies()} scan (which uses ledger metadata) rather + * than the lightweight {@link #runAuditTask()}, so that bookies which registered + * and died since the auditor started are correctly detected even if they were never + * added to {@code knownBookies}. + */ @VisibleForTesting synchronized Future submitAuditTask() { if (executor.isShutdown()) { @@ -244,78 +298,117 @@ synchronized Future submitAuditTask() { f.setException(new BKAuditException("Auditor shutting down")); return f; } - return executor.submit(() -> { - try { - waitIfLedgerReplicationDisabled(); - int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager - .getLostBookieRecoveryDelay(); - List availableBookies = getAvailableBookies(); - - // casting to String, as knownBookies and availableBookies - // contains only String values - // find new bookies(if any) and update the known bookie list - Collection newBookies = CollectionUtils.subtract( - availableBookies, knownBookies); - knownBookies.addAll(newBookies); - if (!bookiesToBeAudited.isEmpty() && knownBookies.containsAll(bookiesToBeAudited)) { - // the bookie, which went down earlier and had an audit scheduled for, - // has come up. So let us stop tracking it and cancel the audit. Since - // we allow delaying of audit when there is only one failed bookie, - // bookiesToBeAudited should just have 1 element and hence containsAll - // check should be ok - if (auditTask != null && auditTask.cancel(false)) { - auditTask = null; - auditorStats.getNumDelayedBookieAuditsCancelled().inc(); - } - bookiesToBeAudited.clear(); + return executor.submit(() -> auditorBookieCheckTask.startAudit(false)); + } + + /** + * Core audit logic: determine which bookies have disappeared and trigger + * re-replication if needed. Runs on the single-threaded {@link #executor}. + * + *

Uses the bookie sets most recently received from the ZK watcher callbacks + * ({@link #pendingWritableBookies} / {@link #pendingReadOnlyBookies}) when + * both are available, avoiding a redundant ZK round-trip and the race where a + * fresh {@code getAvailableBookies()} call could see a different state than the + * event that triggered this task. + */ + private void runAuditTask() { + try { + waitIfLedgerReplicationDisabled(); + int lostBookieRecoveryDelay = Auditor.this.ledgerUnderreplicationManager + .getLostBookieRecoveryDelay(); + + // Use the bookie sets captured synchronously by the watcher callbacks. + // If only one type has fired so far, fetch only the other type from the + // admin API to avoid a redundant ZK round-trip for the known half. + // Fall back to a full getAvailableBookies() only when neither watcher + // has fired yet (e.g. direct test invocations via submitAuditTask()). + Set writableSnapshot = pendingWritableBookies.get(); + Set readOnlySnapshot = pendingReadOnlyBookies.get(); + List availableBookies; + if (writableSnapshot != null && readOnlySnapshot != null) { + availableBookies = new ArrayList<>(writableSnapshot); + availableBookies.addAll(readOnlySnapshot); + } else if (writableSnapshot != null) { + // Readonly watcher hasn't fired yet; fetch only readonly bookies. + availableBookies = new ArrayList<>(writableSnapshot); + for (BookieId id : admin.getReadOnlyBookies()) { + availableBookies.add(id.toString()); } + } else if (readOnlySnapshot != null) { + // Writable watcher hasn't fired yet; fetch only writable bookies. + availableBookies = new ArrayList<>(); + for (BookieId id : admin.getAvailableBookies()) { + availableBookies.add(id.toString()); + } + availableBookies.addAll(readOnlySnapshot); + } else { + availableBookies = getAvailableBookies(); + } - // find lost bookies(if any) - bookiesToBeAudited.addAll(CollectionUtils.subtract(knownBookies, availableBookies)); - if (bookiesToBeAudited.size() == 0) { - return; + // casting to String, as knownBookies and availableBookies + // contains only String values + // find new bookies(if any) and update the known bookie list + Collection newBookies = CollectionUtils.subtract( + availableBookies, knownBookies); + knownBookies.addAll(newBookies); + if (!bookiesToBeAudited.isEmpty() && knownBookies.containsAll(bookiesToBeAudited)) { + // the bookie, which went down earlier and had an audit scheduled for, + // has come up. So let us stop tracking it and cancel the audit. Since + // we allow delaying of audit when there is only one failed bookie, + // bookiesToBeAudited should just have 1 element and hence containsAll + // check should be ok + if (auditTask != null && auditTask.cancel(false)) { + auditTask = null; + auditorStats.getNumDelayedBookieAuditsCancelled().inc(); } + bookiesToBeAudited.clear(); + } - knownBookies.removeAll(bookiesToBeAudited); - if (lostBookieRecoveryDelay == 0) { - auditorBookieCheckTask.startAudit(false); - bookiesToBeAudited.clear(); - return; + // find lost bookies(if any) + bookiesToBeAudited.addAll(CollectionUtils.subtract(knownBookies, availableBookies)); + if (bookiesToBeAudited.size() == 0) { + return; + } + + knownBookies.removeAll(bookiesToBeAudited); + if (lostBookieRecoveryDelay == 0) { + auditorBookieCheckTask.startAudit(false); + bookiesToBeAudited.clear(); + return; + } + if (bookiesToBeAudited.size() > 1) { + // if more than one bookie is down, start the audit immediately; + LOG.info("Multiple bookie failure; not delaying bookie audit. " + + "Bookies lost now: {}; All lost bookies: {}", + CollectionUtils.subtract(knownBookies, availableBookies), + bookiesToBeAudited); + if (auditTask != null && auditTask.cancel(false)) { + auditTask = null; + auditorStats.getNumDelayedBookieAuditsCancelled().inc(); } - if (bookiesToBeAudited.size() > 1) { - // if more than one bookie is down, start the audit immediately; - LOG.info("Multiple bookie failure; not delaying bookie audit. " - + "Bookies lost now: {}; All lost bookies: {}", - CollectionUtils.subtract(knownBookies, availableBookies), - bookiesToBeAudited); - if (auditTask != null && auditTask.cancel(false)) { - auditTask = null; - auditorStats.getNumDelayedBookieAuditsCancelled().inc(); - } + auditorBookieCheckTask.startAudit(false); + bookiesToBeAudited.clear(); + return; + } + if (auditTask == null) { + // if there is no scheduled audit, schedule one + auditTask = executor.schedule(() -> { auditorBookieCheckTask.startAudit(false); + auditTask = null; bookiesToBeAudited.clear(); - return; - } - if (auditTask == null) { - // if there is no scheduled audit, schedule one - auditTask = executor.schedule(() -> { - auditorBookieCheckTask.startAudit(false); - auditTask = null; - bookiesToBeAudited.clear(); - }, lostBookieRecoveryDelay, TimeUnit.SECONDS); - auditorStats.getNumBookieAuditsDelayed().inc(); - LOG.info("Delaying bookie audit by {} secs for {}", lostBookieRecoveryDelay, - bookiesToBeAudited); - } - } catch (BKException bke) { - LOG.error("Exception getting bookie list", bke); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - LOG.error("Interrupted while watching available bookies ", ie); - } catch (UnavailableException ue) { - LOG.error("Exception while watching available bookies", ue); + }, lostBookieRecoveryDelay, TimeUnit.SECONDS); + auditorStats.getNumBookieAuditsDelayed().inc(); + LOG.info("Delaying bookie audit by {} secs for {}", lostBookieRecoveryDelay, + bookiesToBeAudited); } - }); + } catch (BKException bke) { + LOG.error("Exception getting bookie list", bke); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted while watching available bookies ", ie); + } catch (UnavailableException ue) { + LOG.error("Exception while watching available bookies", ue); + } } synchronized Future submitLostBookieRecoveryDelayChangedEvent() { @@ -386,13 +479,12 @@ public void start() { } try { - watchBookieChanges(); - // Start with all available bookies - // to handle situations where the auditor - // is started after some bookies have already failed + // Initialize knownBookies before registering watchers so that any + // watcher callback that fires immediately sees a consistent baseline. knownBookies = admin.getAllBookies().stream() .map(BookieId::toString) .collect(Collectors.toList()); + watchBookieChanges(); this.ledgerUnderreplicationManager .notifyLostBookieRecoveryDelayChanged(new LostBookieRecoveryDelayChangedCb()); } catch (BKException bke) { @@ -598,8 +690,18 @@ protected List getAvailableBookies() throws BKException { } private void watchBookieChanges() throws BKException { - admin.watchWritableBookiesChanged(bookies -> submitAuditTask()); - admin.watchReadOnlyBookiesChanged(bookies -> submitAuditTask()); + admin.watchWritableBookiesChanged(bookies -> { + pendingWritableBookies.set(bookies.getValue().stream() + .map(BookieId::toString) + .collect(Collectors.toSet())); + submitAuditTaskForBookieChange(true); + }); + admin.watchReadOnlyBookiesChanged(bookies -> { + pendingReadOnlyBookies.set(bookies.getValue().stream() + .map(BookieId::toString) + .collect(Collectors.toSet())); + submitAuditTaskForBookieChange(false); + }); } /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java index 4b1d64fc94b..b823649140e 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/conf/TestBKConfiguration.java @@ -23,10 +23,14 @@ import java.net.NetworkInterface; import java.net.SocketException; +import java.net.UnknownHostException; import java.util.Collections; import java.util.Enumeration; +import java.util.LinkedHashSet; +import java.util.Set; import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage; import org.apache.bookkeeper.common.allocator.PoolingPolicy; +import org.apache.bookkeeper.net.DNS; import org.apache.bookkeeper.util.PortManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,12 +73,27 @@ public static ServerConfiguration newServerConfiguration() { private static String getLoopbackInterfaceName() { try { + Set loopbackInterfaces = new LinkedHashSet<>(); Enumeration nifs = NetworkInterface.getNetworkInterfaces(); for (NetworkInterface nif : Collections.list(nifs)) { if (nif.isLoopback()) { - return nif.getName(); + String nifName = nif.getName(); + try { + DNS.getDefaultIP(nifName); + } catch (UnknownHostException e) { + // skip interfaces that don't have a resolvable hostname + continue; + } + loopbackInterfaces.add(nifName); } } + // prefer lo if available to avoid issues on Linux + if (loopbackInterfaces.contains("lo")) { + return "lo"; + } + if (!loopbackInterfaces.isEmpty()) { + return loopbackInterfaces.iterator().next(); + } } catch (SocketException se) { LOG.warn("Exception while figuring out loopback interface. Will use null.", se); return null;