diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java index b41a24cc..8678431c 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ClientPlaySessionHandler.java @@ -170,6 +170,7 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { @Override public void deactivated() { + player.discardChatQueue(); for (PluginMessagePacket message : loginPluginMessages) { ReferenceCountUtil.release(message); } @@ -444,6 +445,13 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler { return true; } + @Override + public boolean handle(JoinGamePacket packet) { + // Forward the packet as normal, but discard any chat state we have queued - the client will do this too + player.discardChatQueue(); + return false; + } + @Override public void handleGeneric(MinecraftPacket packet) { VelocityServerConnection serverConnection = player.getConnectedServer(); diff --git a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java index be7a82b2..ccd09218 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/connection/client/ConnectedPlayer.java @@ -190,7 +190,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, private @Nullable Locale effectiveLocale; private final @Nullable IdentifiedKey playerKey; private @Nullable ClientSettingsPacket clientSettingsPacket; - private final ChatQueue chatQueue; + private volatile ChatQueue chatQueue; private final ChatBuilderFactory chatBuilderFactory; ConnectedPlayer(VelocityServer server, GameProfile profile, MinecraftConnection connection, @@ -236,6 +236,17 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player, return chatQueue; } + /** + * Discards any messages still being processed by the {@link ChatQueue}, and creates a fresh state for future packets. + * This should be used on server switches, or whenever the client resets its own 'last seen' state. + */ + public void discardChatQueue() { + // No need for atomic swap, should only be called from event loop + final ChatQueue oldChatQueue = chatQueue; + chatQueue = new ChatQueue(this); + oldChatQueue.close(); + } + public BundleDelimiterHandler getBundleHandler() { return this.bundleHandler; } diff --git a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java index 5928bf36..93ace258 100644 --- a/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java +++ b/proxy/src/main/java/com/velocitypowered/proxy/protocol/packet/chat/ChatQueue.java @@ -32,13 +32,15 @@ import java.util.function.Function; * A precisely ordered queue which allows for outside entries into the ordered queue through * piggybacking timestamps. */ -public class ChatQueue { +public class ChatQueue implements AutoCloseable { private final Object internalLock = new Object(); private final ConnectedPlayer player; private final ChatState chatState = new ChatState(); private CompletableFuture head = CompletableFuture.completedFuture(null); + private volatile boolean closed; + /** * Instantiates a {@link ChatQueue} for a specific {@link ConnectedPlayer}. * @@ -50,8 +52,14 @@ public class ChatQueue { private void queueTask(Task task) { synchronized (internalLock) { + if (closed) { + throw new IllegalStateException("ChatQueue has already been closed"); + } MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected(); head = head.thenCompose(v -> { + if (closed) { + return CompletableFuture.completedFuture(null); + } try { return task.update(chatState, smc).exceptionally(ignored -> null); } catch (Throwable ignored) { @@ -102,9 +110,9 @@ public class ChatQueue { }); } - private static CompletableFuture writePacket(T packet, MinecraftConnection smc) { + private CompletableFuture writePacket(T packet, MinecraftConnection smc) { return CompletableFuture.runAsync(() -> { - if (!smc.isClosed()) { + if (!closed && !smc.isClosed()) { ChannelFuture future = smc.write(packet); if (future != null) { future.awaitUninterruptibly(); @@ -113,6 +121,11 @@ public class ChatQueue { }, smc.eventLoop()); } + @Override + public void close() { + closed = true; + } + private interface Task { CompletableFuture update(ChatState chatState, MinecraftConnection smc); }