Improved Scheduler API (#696)

* Improved Scheduler API

- Added `Scheduler#builder(plugin)`
This method allows a more simplified builder while maintaining the main requirement of the executor plugin
- Added `Scheduler#taskByPlugin(plugin)`
Allows to obtain the tasks that a plugin has sent to execute and that are currently active
- Added `TaskBuilder#task(Consumer<SchuledTask>)`
Allows to specify a task with access to the task itself with the ability to cancel itself

* Applied requested changes

- Removed tasks builder method
- Added `Scheduler#buildTask(plugin, Consumer<ScheduledTask>)`

* Removed some unused imports

* Applied suggested change

* Fix possible test bug

* Applied more suggested changes

* Fixed tests inside tasks
This commit is contained in:
4drian3d
2022-06-09 02:27:06 -05:00
committed by GitHub
parent f088bd4443
commit e45ca5f357
9 changed files with 129 additions and 21 deletions

View File

@@ -24,7 +24,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.velocitypowered.api.plugin.PluginContainer;
import com.velocitypowered.api.plugin.PluginManager;
import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.Scheduler;
@@ -32,13 +31,17 @@ import com.velocitypowered.api.scheduler.TaskStatus;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
public class VelocityScheduler implements Scheduler {
@@ -68,7 +71,25 @@ public class VelocityScheduler implements Scheduler {
checkNotNull(plugin, "plugin");
checkNotNull(runnable, "runnable");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, runnable);
return new TaskBuilderImpl(plugin, runnable, null);
}
@Override
public TaskBuilder buildTask(Object plugin, Consumer<ScheduledTask> consumer) {
checkNotNull(plugin, "plugin");
checkNotNull(consumer, "consumer");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
return new TaskBuilderImpl(plugin, null, consumer);
}
@Override
public @NonNull Collection<ScheduledTask> tasksByPlugin(@NonNull Object plugin) {
checkNotNull(plugin, "plugin");
checkArgument(pluginManager.fromInstance(plugin).isPresent(), "plugin is not registered");
final Collection<ScheduledTask> tasks = tasksByPlugin.get(plugin);
synchronized (tasksByPlugin) {
return Set.copyOf(tasks);
}
}
/**
@@ -93,12 +114,14 @@ public class VelocityScheduler implements Scheduler {
private final Object plugin;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private long delay; // ms
private long repeat; // ms
private TaskBuilderImpl(Object plugin, Runnable runnable) {
private TaskBuilderImpl(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer) {
this.plugin = plugin;
this.runnable = runnable;
this.consumer = consumer;
}
@Override
@@ -127,7 +150,7 @@ public class VelocityScheduler implements Scheduler {
@Override
public ScheduledTask schedule() {
VelocityTask task = new VelocityTask(plugin, runnable, delay, repeat);
VelocityTask task = new VelocityTask(plugin, runnable, consumer, delay, repeat);
tasksByPlugin.put(plugin, task);
task.schedule();
return task;
@@ -138,14 +161,16 @@ public class VelocityScheduler implements Scheduler {
private final Object plugin;
private final Runnable runnable;
private final Consumer<ScheduledTask> consumer;
private final long delay;
private final long repeat;
private @Nullable ScheduledFuture<?> future;
private volatile @Nullable Thread currentTaskThread;
private VelocityTask(Object plugin, Runnable runnable, long delay, long repeat) {
private VelocityTask(Object plugin, Runnable runnable, Consumer<ScheduledTask> consumer, long delay, long repeat) {
this.plugin = plugin;
this.runnable = runnable;
this.consumer = consumer;
this.delay = delay;
this.repeat = repeat;
}
@@ -200,7 +225,11 @@ public class VelocityScheduler implements Scheduler {
taskService.execute(() -> {
currentTaskThread = Thread.currentThread();
try {
runnable.run();
if (runnable != null) {
runnable.run();
} else {
consumer.accept(this);
}
} catch (Throwable e) {
//noinspection ConstantConditions
if (e instanceof InterruptedException) {

View File

@@ -22,9 +22,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import com.velocitypowered.api.scheduler.ScheduledTask;
import com.velocitypowered.api.scheduler.TaskStatus;
import com.velocitypowered.proxy.testutil.FakePluginManager;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.jupiter.api.Test;
class VelocitySchedulerTest {
@@ -65,4 +69,65 @@ class VelocitySchedulerTest {
task.cancel();
}
@Test
void obtainTasksFromPlugin() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
AtomicInteger i = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
scheduler.buildTask(FakePluginManager.PLUGIN_A, task -> {
if (i.getAndIncrement() >= 1) {
task.cancel();
latch.countDown();
}
}).delay(50, TimeUnit.MILLISECONDS)
.repeat(Duration.ofMillis(5))
.schedule();
assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 1);
latch.await();
assertEquals(scheduler.tasksByPlugin(FakePluginManager.PLUGIN_A).size(), 0);
}
@Test
void testConsumerCancel() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
CountDownLatch latch = new CountDownLatch(1);
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_B, actualTask -> {
actualTask.cancel();
latch.countDown();
})
.repeat(5, TimeUnit.MILLISECONDS)
.schedule();
assertEquals(TaskStatus.SCHEDULED, task.status());
latch.await();
assertEquals(TaskStatus.CANCELLED, task.status());
}
@Test
void testConsumerEquality() throws Exception {
VelocityScheduler scheduler = new VelocityScheduler(new FakePluginManager());
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<ScheduledTask> consumerTask = new AtomicReference<>();
AtomicReference<ScheduledTask> initialTask = new AtomicReference<>();
ScheduledTask task = scheduler.buildTask(FakePluginManager.PLUGIN_A, scheduledTask -> {
consumerTask.set(scheduledTask);
latch.countDown();
}).delay(60, TimeUnit.MILLISECONDS).schedule();
initialTask.set(task);
latch.await();
assertEquals(consumerTask.get(), initialTask.get());
}
}