Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.papermc.paper.util.concurrent;

public interface TickBoundTask {
long getNextRun();
void setNextRun(long next);
long getCreatedAt();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package io.papermc.paper.util.concurrent;

import org.jetbrains.annotations.NotNull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.function.Predicate;

/**
* This class schedules tasks in ticks and executes them efficiently using a circular array (the wheel).
* Each slot in the wheel represents a specific tick modulo the wheel size.
* Tasks are placed into slots based on their target execution tick.
* On each tick, the wheel checks the current slot and runs any tasks whose execute tick has been reached.
*
* O(1) task scheduling and retrieval within a single wheel rotation.
* We are using power of 2 for faster operations than modulo.
*
*/
public class TimingWheel<T extends TickBoundTask> implements Iterable<T> {
private final int wheelSize;
private final long mask;
private final LinkedList<T>[] wheel;

private static final Comparator<TickBoundTask> ORDERING = Comparator.comparingLong(TickBoundTask::getCreatedAt);

@SuppressWarnings("unchecked")
public TimingWheel(int exponent) {
this.wheelSize = 1 << exponent;
this.mask = wheelSize - 1L;

this.wheel = (LinkedList<T>[]) new LinkedList[wheelSize];
for (int i = 0; i < wheelSize; i++) {
wheel[i] = new LinkedList<>();
}
}

public void add(T task, int currentTick) {
long nextRun = task.getNextRun();

if (nextRun <= currentTick) {
nextRun = currentTick;
task.setNextRun(nextRun);
}

int slot = (int) (nextRun & mask);
LinkedList<T> bucket = wheel[slot];
bucket.add(task);
}

public void addAll(Collection<? extends T> tasks, int currentTick) {
for (T task : tasks) {
this.add(task, currentTick);
}
}

public @NotNull List<T> popValid(int currentTick) {
int slot = (int) (currentTick & mask);
LinkedList<T> bucket = wheel[slot];
if (bucket.isEmpty()) return Collections.emptyList();

Iterator<T> iter = bucket.iterator();
List<T> list = new ArrayList<>();
while (iter.hasNext()) {
T task = iter.next();

if (task.getNextRun() <= currentTick) {
iter.remove();
list.add(task);
}
}

list.sort(ORDERING);
return list;
}

public boolean isReady(int currentTick) {
int slot = (int) (currentTick & mask);
LinkedList<T> bucket = wheel[slot];
if (bucket.isEmpty()) return false;

for (final T task : bucket) {
if (task.getNextRun() <= currentTick) {
return true;
}
}

return false;
}

public void removeIf(Predicate<T> apply) {
Iterator<T> itr = iterator();
while (itr.hasNext()) {
T next = itr.next();
if (apply.test(next)) {
itr.remove();
}
}
}

@SuppressWarnings("unchecked")
private class Itr implements Iterator<T> {
private int index = 0;
private Iterator<T> current = Collections.emptyIterator();
private Iterator<T> lastIterator = null;

@Override
public boolean hasNext() {
if (current.hasNext()) {
return true;
}

for (int i = index; i < wheelSize; i++) {
if (!wheel[i].isEmpty()) {
return true;
}
}

return false;
}

@Override
public T next() {
while (true) {
if (current.hasNext()) {
lastIterator = current;
return current.next();
}

if (index >= wheelSize) {
throw new NoSuchElementException();
}

current = wheel[index++].iterator();
}
}

@Override
public void remove() {
if (lastIterator == null) {
throw new NoSuchElementException();
}

lastIterator.remove();
lastIterator = null;
}
}


@Override
public @NotNull Iterator<T> iterator() {
return new Itr();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,24 @@ public void mainThreadHeartbeat() {

private synchronized void runTasks(int currentTick) {
parsePending();
while (!this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick) {
CraftTask task = this.pending.remove();
if (executeTask(task)) {
final long period = task.getPeriod();
if (period > 0) {
task.setNextRun(currentTick + period);
temp.add(task);
while (true) {
List<CraftTask> tasks = this.pending.popValid(currentTick);
if (tasks.isEmpty()) break;

for (CraftTask task : tasks) {
if (executeTask(task)) {
final long period = task.getPeriod();
if (period > 0) {
task.setNextRun(currentTick + period);
temp.add(task);
}
}
}

parsePending();
}
this.pending.addAll(temp);

this.pending.addAll(temp, this.currentTick);
temp.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.function.Consumer;
import java.util.function.IntUnaryOperator;
import java.util.logging.Level;
import io.papermc.paper.util.concurrent.TimingWheel;
import org.bukkit.plugin.IllegalPluginAccessException;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitRunnable;
Expand Down Expand Up @@ -75,16 +76,7 @@ public class CraftScheduler implements BukkitScheduler {
/**
* Main thread logic only
*/
final PriorityQueue<CraftTask> pending = new PriorityQueue<CraftTask>(10, // Paper
new Comparator<CraftTask>() {
@Override
public int compare(final CraftTask o1, final CraftTask o2) {
int value = Long.compare(o1.getNextRun(), o2.getNextRun());

// If the tasks should run on the same tick they should be run FIFO
return value != 0 ? value : Long.compare(o1.getCreatedAt(), o2.getCreatedAt());
}
});
final TimingWheel<CraftTask> pending = new TimingWheel<>(12);
/**
* Main thread logic only
*/
Expand Down Expand Up @@ -458,54 +450,59 @@ public void mainThreadHeartbeat() {
}
// Paper end
final List<CraftTask> temp = this.temp;
this.parsePending();
while (this.isReady(this.currentTick)) {
final CraftTask task = this.pending.remove();
if (task.getPeriod() < CraftTask.NO_REPEATING) {
if (task.isSync()) {
this.runners.remove(task.getTaskId(), task);
while (true) {
this.parsePending();

final List<CraftTask> tasks = this.pending.popValid(this.currentTick);
if (tasks.isEmpty()) break;

for (CraftTask task : tasks) {
if (task.getPeriod() < CraftTask.NO_REPEATING) {
if (task.isSync()) {
this.runners.remove(task.getTaskId(), task);
}
this.parsePending();
continue;
}
this.parsePending();
continue;
}
if (task.isSync()) {
this.currentTask = task;
try {
task.run();
} catch (final Throwable throwable) {
// Paper start
final String logMessage = String.format(
"Task #%s for %s generated an exception",
task.getTaskId(),
task.getOwner().getDescription().getFullName());
task.getOwner().getLogger().log(
if (task.isSync()) {
this.currentTask = task;
try {
task.run();
} catch (final Throwable throwable) {
// Paper start
final String logMessage = String.format(
"Task #%s for %s generated an exception",
task.getTaskId(),
task.getOwner().getDescription().getFullName());
task.getOwner().getLogger().log(
Level.WARNING,
logMessage,
logMessage,
throwable);
org.bukkit.Bukkit.getServer().getPluginManager().callEvent(
new com.destroystokyo.paper.event.server.ServerExceptionEvent(new com.destroystokyo.paper.exception.ServerSchedulerException(logMessage, throwable, task)));
// Paper end
} finally {
this.currentTask = null;
org.bukkit.Bukkit.getServer().getPluginManager().callEvent(
new com.destroystokyo.paper.event.server.ServerExceptionEvent(new com.destroystokyo.paper.exception.ServerSchedulerException(logMessage, throwable, task)));
// Paper end
} finally {
this.currentTask = null;
}
this.parsePending();
} else {
// this.debugTail = this.debugTail.setNext(new CraftAsyncDebugger(this.currentTick + CraftScheduler.RECENT_TICKS, task.getOwner(), task.getTaskClass())); // Paper
task.getOwner().getLogger().log(Level.SEVERE, "Unexpected Async Task in the Sync Scheduler. Report this to Paper"); // Paper
// We don't need to parse pending
// (async tasks must live with race-conditions if they attempt to cancel between these few lines of code)
}
final long period = task.getPeriod(); // State consistency
if (period > 0) {
task.setNextRun(this.currentTick + period);
temp.add(task);
} else if (task.isSync()) {
this.runners.remove(task.getTaskId());
}
this.parsePending();
} else {
// this.debugTail = this.debugTail.setNext(new CraftAsyncDebugger(this.currentTick + CraftScheduler.RECENT_TICKS, task.getOwner(), task.getTaskClass())); // Paper
task.getOwner().getLogger().log(Level.SEVERE, "Unexpected Async Task in the Sync Scheduler. Report this to Paper"); // Paper
// We don't need to parse pending
// (async tasks must live with race-conditions if they attempt to cancel between these few lines of code)
}
final long period = task.getPeriod(); // State consistency
if (period > 0) {
task.setNextRun(this.currentTick + period);
temp.add(task);
} else if (task.isSync()) {
this.runners.remove(task.getTaskId());
}
this.pending.addAll(temp, this.currentTick);
temp.clear();
//this.debugHead = this.debugHead.getNextHead(this.currentTick); // Paper
}
this.pending.addAll(temp);
temp.clear();
//this.debugHead = this.debugHead.getNextHead(this.currentTick); // Paper
}

protected void addTask(final CraftTask task) {
Expand Down Expand Up @@ -550,7 +547,7 @@ void parsePending() { // Paper
if (task.getTaskId() == -1) {
task.run();
} else if (task.getPeriod() >= CraftTask.NO_REPEATING) {
this.pending.add(task);
this.pending.add(task, this.currentTick);
this.runners.put(task.getTaskId(), task);
}
}
Expand All @@ -564,7 +561,7 @@ void parsePending() { // Paper
}

private boolean isReady(final int currentTick) {
return !this.pending.isEmpty() && this.pending.peek().getNextRun() <= currentTick;
return this.pending.isReady(currentTick);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import java.util.function.Consumer;

import io.papermc.paper.util.concurrent.TickBoundTask;
import org.bukkit.Bukkit;
import org.bukkit.plugin.Plugin;
import org.bukkit.scheduler.BukkitTask;

public class CraftTask implements BukkitTask, Runnable { // Spigot
public class CraftTask implements BukkitTask, Runnable, TickBoundTask {

private volatile CraftTask next = null;
public static final int ERROR = 0;
Expand Down Expand Up @@ -81,7 +82,7 @@ public void run() {
}
}

long getCreatedAt() {
public long getCreatedAt() {
return this.createdAt;
}

Expand All @@ -93,11 +94,11 @@ void setPeriod(long period) {
this.period = period;
}

long getNextRun() {
public long getNextRun() {
return this.nextRun;
}

void setNextRun(long nextRun) {
public void setNextRun(long nextRun) {
this.nextRun = nextRun;
}

Expand Down
Loading