diff --git a/paper-server/src/main/java/io/papermc/paper/util/concurrent/TickBoundTask.java b/paper-server/src/main/java/io/papermc/paper/util/concurrent/TickBoundTask.java new file mode 100644 index 000000000000..f33bde0000e3 --- /dev/null +++ b/paper-server/src/main/java/io/papermc/paper/util/concurrent/TickBoundTask.java @@ -0,0 +1,7 @@ +package io.papermc.paper.util.concurrent; + +public interface TickBoundTask { + long getNextRun(); + void setNextRun(long next); + long getCreatedAt(); +} diff --git a/paper-server/src/main/java/io/papermc/paper/util/concurrent/TimingWheel.java b/paper-server/src/main/java/io/papermc/paper/util/concurrent/TimingWheel.java new file mode 100644 index 000000000000..512fee5df447 --- /dev/null +++ b/paper-server/src/main/java/io/papermc/paper/util/concurrent/TimingWheel.java @@ -0,0 +1,159 @@ +package io.papermc.paper.util.concurrent; + +import org.jetbrains.annotations.NotNull; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.ListIterator; +import java.util.NoSuchElementException; +import java.util.function.Predicate; + +/** + * This class schedules tasks in ticks and executes them efficiently using a circular array (the wheel). + * Each slot in the wheel represents a specific tick modulo the wheel size. + * Tasks are placed into slots based on their target execution tick. + * On each tick, the wheel checks the current slot and runs any tasks whose execute tick has been reached. + * + * O(1) task scheduling and retrieval within a single wheel rotation. + * We are using power of 2 for faster operations than modulo. + * + */ +public class TimingWheel implements Iterable { + private final int wheelSize; + private final long mask; + private final LinkedList[] wheel; + + private static final Comparator ORDERING = Comparator.comparingLong(TickBoundTask::getCreatedAt); + + @SuppressWarnings("unchecked") + public TimingWheel(int exponent) { + this.wheelSize = 1 << exponent; + this.mask = wheelSize - 1L; + + this.wheel = (LinkedList[]) new LinkedList[wheelSize]; + for (int i = 0; i < wheelSize; i++) { + wheel[i] = new LinkedList<>(); + } + } + + public void add(T task, int currentTick) { + long nextRun = task.getNextRun(); + + if (nextRun <= currentTick) { + nextRun = currentTick; + task.setNextRun(nextRun); + } + + int slot = (int) (nextRun & mask); + LinkedList bucket = wheel[slot]; + bucket.add(task); + } + + public void addAll(Collection tasks, int currentTick) { + for (T task : tasks) { + this.add(task, currentTick); + } + } + + public @NotNull List popValid(int currentTick) { + int slot = (int) (currentTick & mask); + LinkedList bucket = wheel[slot]; + if (bucket.isEmpty()) return Collections.emptyList(); + + Iterator iter = bucket.iterator(); + List list = new ArrayList<>(); + while (iter.hasNext()) { + T task = iter.next(); + + if (task.getNextRun() <= currentTick) { + iter.remove(); + list.add(task); + } + } + + list.sort(ORDERING); + return list; + } + + public boolean isReady(int currentTick) { + int slot = (int) (currentTick & mask); + LinkedList bucket = wheel[slot]; + if (bucket.isEmpty()) return false; + + for (final T task : bucket) { + if (task.getNextRun() <= currentTick) { + return true; + } + } + + return false; + } + + public void removeIf(Predicate apply) { + Iterator itr = iterator(); + while (itr.hasNext()) { + T next = itr.next(); + if (apply.test(next)) { + itr.remove(); + } + } + } + + @SuppressWarnings("unchecked") + private class Itr implements Iterator { + private int index = 0; + private Iterator current = Collections.emptyIterator(); + private Iterator lastIterator = null; + + @Override + public boolean hasNext() { + if (current.hasNext()) { + return true; + } + + for (int i = index; i < wheelSize; i++) { + if (!wheel[i].isEmpty()) { + return true; + } + } + + return false; + } + + @Override + public T next() { + while (true) { + if (current.hasNext()) { + lastIterator = current; + return current.next(); + } + + if (index >= wheelSize) { + throw new NoSuchElementException(); + } + + current = wheel[index++].iterator(); + } + } + + @Override + public void remove() { + if (lastIterator == null) { + throw new NoSuchElementException(); + } + + lastIterator.remove(); + lastIterator = null; + } + } + + + @Override + public @NotNull Iterator iterator() { + return new Itr(); + } +} diff --git a/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncScheduler.java b/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncScheduler.java index 27562fd66ae9..0a731ef2445d 100644 --- a/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncScheduler.java +++ b/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftAsyncScheduler.java @@ -75,18 +75,24 @@ public void mainThreadHeartbeat() { private synchronized void runTasks(int currentTick) { parsePending(); - while (!this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick) { - CraftTask task = this.pending.remove(); - if (executeTask(task)) { - final long period = task.getPeriod(); - if (period > 0) { - task.setNextRun(currentTick + period); - temp.add(task); + while (true) { + List tasks = this.pending.popValid(currentTick); + if (tasks.isEmpty()) break; + + for (CraftTask task : tasks) { + if (executeTask(task)) { + final long period = task.getPeriod(); + if (period > 0) { + task.setNextRun(currentTick + period); + temp.add(task); + } } } + parsePending(); } - this.pending.addAll(temp); + + this.pending.addAll(temp, this.currentTick); temp.clear(); } diff --git a/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java b/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java index 7ffb7a210bf8..ef322e1592f0 100644 --- a/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java +++ b/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftScheduler.java @@ -17,6 +17,7 @@ import java.util.function.Consumer; import java.util.function.IntUnaryOperator; import java.util.logging.Level; +import io.papermc.paper.util.concurrent.TimingWheel; import org.bukkit.plugin.IllegalPluginAccessException; import org.bukkit.plugin.Plugin; import org.bukkit.scheduler.BukkitRunnable; @@ -75,16 +76,7 @@ public class CraftScheduler implements BukkitScheduler { /** * Main thread logic only */ - final PriorityQueue pending = new PriorityQueue(10, // Paper - new Comparator() { - @Override - public int compare(final CraftTask o1, final CraftTask o2) { - int value = Long.compare(o1.getNextRun(), o2.getNextRun()); - - // If the tasks should run on the same tick they should be run FIFO - return value != 0 ? value : Long.compare(o1.getCreatedAt(), o2.getCreatedAt()); - } - }); + final TimingWheel pending = new TimingWheel<>(12); /** * Main thread logic only */ @@ -458,54 +450,59 @@ public void mainThreadHeartbeat() { } // Paper end final List temp = this.temp; - this.parsePending(); - while (this.isReady(this.currentTick)) { - final CraftTask task = this.pending.remove(); - if (task.getPeriod() < CraftTask.NO_REPEATING) { - if (task.isSync()) { - this.runners.remove(task.getTaskId(), task); + while (true) { + this.parsePending(); + + final List tasks = this.pending.popValid(this.currentTick); + if (tasks.isEmpty()) break; + + for (CraftTask task : tasks) { + if (task.getPeriod() < CraftTask.NO_REPEATING) { + if (task.isSync()) { + this.runners.remove(task.getTaskId(), task); + } + this.parsePending(); + continue; } - this.parsePending(); - continue; - } - if (task.isSync()) { - this.currentTask = task; - try { - task.run(); - } catch (final Throwable throwable) { - // Paper start - final String logMessage = String.format( - "Task #%s for %s generated an exception", - task.getTaskId(), - task.getOwner().getDescription().getFullName()); - task.getOwner().getLogger().log( + if (task.isSync()) { + this.currentTask = task; + try { + task.run(); + } catch (final Throwable throwable) { + // Paper start + final String logMessage = String.format( + "Task #%s for %s generated an exception", + task.getTaskId(), + task.getOwner().getDescription().getFullName()); + task.getOwner().getLogger().log( Level.WARNING, - logMessage, + logMessage, throwable); - org.bukkit.Bukkit.getServer().getPluginManager().callEvent( - new com.destroystokyo.paper.event.server.ServerExceptionEvent(new com.destroystokyo.paper.exception.ServerSchedulerException(logMessage, throwable, task))); - // Paper end - } finally { - this.currentTask = null; + org.bukkit.Bukkit.getServer().getPluginManager().callEvent( + new com.destroystokyo.paper.event.server.ServerExceptionEvent(new com.destroystokyo.paper.exception.ServerSchedulerException(logMessage, throwable, task))); + // Paper end + } finally { + this.currentTask = null; + } + this.parsePending(); + } else { + // this.debugTail = this.debugTail.setNext(new CraftAsyncDebugger(this.currentTick + CraftScheduler.RECENT_TICKS, task.getOwner(), task.getTaskClass())); // Paper + task.getOwner().getLogger().log(Level.SEVERE, "Unexpected Async Task in the Sync Scheduler. Report this to Paper"); // Paper + // We don't need to parse pending + // (async tasks must live with race-conditions if they attempt to cancel between these few lines of code) + } + final long period = task.getPeriod(); // State consistency + if (period > 0) { + task.setNextRun(this.currentTick + period); + temp.add(task); + } else if (task.isSync()) { + this.runners.remove(task.getTaskId()); } - this.parsePending(); - } else { - // this.debugTail = this.debugTail.setNext(new CraftAsyncDebugger(this.currentTick + CraftScheduler.RECENT_TICKS, task.getOwner(), task.getTaskClass())); // Paper - task.getOwner().getLogger().log(Level.SEVERE, "Unexpected Async Task in the Sync Scheduler. Report this to Paper"); // Paper - // We don't need to parse pending - // (async tasks must live with race-conditions if they attempt to cancel between these few lines of code) - } - final long period = task.getPeriod(); // State consistency - if (period > 0) { - task.setNextRun(this.currentTick + period); - temp.add(task); - } else if (task.isSync()) { - this.runners.remove(task.getTaskId()); } + this.pending.addAll(temp, this.currentTick); + temp.clear(); + //this.debugHead = this.debugHead.getNextHead(this.currentTick); // Paper } - this.pending.addAll(temp); - temp.clear(); - //this.debugHead = this.debugHead.getNextHead(this.currentTick); // Paper } protected void addTask(final CraftTask task) { @@ -550,7 +547,7 @@ void parsePending() { // Paper if (task.getTaskId() == -1) { task.run(); } else if (task.getPeriod() >= CraftTask.NO_REPEATING) { - this.pending.add(task); + this.pending.add(task, this.currentTick); this.runners.put(task.getTaskId(), task); } } @@ -564,7 +561,7 @@ void parsePending() { // Paper } private boolean isReady(final int currentTick) { - return !this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick; + return this.pending.isReady(currentTick); } @Override diff --git a/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java b/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java index 17680f112d0c..fcb142b5a985 100644 --- a/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java +++ b/paper-server/src/main/java/org/bukkit/craftbukkit/scheduler/CraftTask.java @@ -2,11 +2,12 @@ import java.util.function.Consumer; +import io.papermc.paper.util.concurrent.TickBoundTask; import org.bukkit.Bukkit; import org.bukkit.plugin.Plugin; import org.bukkit.scheduler.BukkitTask; -public class CraftTask implements BukkitTask, Runnable { // Spigot +public class CraftTask implements BukkitTask, Runnable, TickBoundTask { private volatile CraftTask next = null; public static final int ERROR = 0; @@ -81,7 +82,7 @@ public void run() { } } - long getCreatedAt() { + public long getCreatedAt() { return this.createdAt; } @@ -93,11 +94,11 @@ void setPeriod(long period) { this.period = period; } - long getNextRun() { + public long getNextRun() { return this.nextRun; } - void setNextRun(long nextRun) { + public void setNextRun(long nextRun) { this.nextRun = nextRun; }