Fix: discard chat queue and chat state when switching servers (#1534)
This has two effects: - Will no longer send queued chat packets from previous server after switch (race condition) - The offset in 'last seen' updates will be corrected, as the internal ChatState will be reset (only applied if the player had not sent a message in a while, and >20 messages had been received)
This commit is contained in:
@@ -170,6 +170,7 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deactivated() {
|
public void deactivated() {
|
||||||
|
player.discardChatQueue();
|
||||||
for (PluginMessagePacket message : loginPluginMessages) {
|
for (PluginMessagePacket message : loginPluginMessages) {
|
||||||
ReferenceCountUtil.release(message);
|
ReferenceCountUtil.release(message);
|
||||||
}
|
}
|
||||||
@@ -444,6 +445,13 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean handle(JoinGamePacket packet) {
|
||||||
|
// Forward the packet as normal, but discard any chat state we have queued - the client will do this too
|
||||||
|
player.discardChatQueue();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleGeneric(MinecraftPacket packet) {
|
public void handleGeneric(MinecraftPacket packet) {
|
||||||
VelocityServerConnection serverConnection = player.getConnectedServer();
|
VelocityServerConnection serverConnection = player.getConnectedServer();
|
||||||
|
@@ -190,7 +190,7 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player,
|
|||||||
private @Nullable Locale effectiveLocale;
|
private @Nullable Locale effectiveLocale;
|
||||||
private final @Nullable IdentifiedKey playerKey;
|
private final @Nullable IdentifiedKey playerKey;
|
||||||
private @Nullable ClientSettingsPacket clientSettingsPacket;
|
private @Nullable ClientSettingsPacket clientSettingsPacket;
|
||||||
private final ChatQueue chatQueue;
|
private volatile ChatQueue chatQueue;
|
||||||
private final ChatBuilderFactory chatBuilderFactory;
|
private final ChatBuilderFactory chatBuilderFactory;
|
||||||
|
|
||||||
ConnectedPlayer(VelocityServer server, GameProfile profile, MinecraftConnection connection,
|
ConnectedPlayer(VelocityServer server, GameProfile profile, MinecraftConnection connection,
|
||||||
@@ -236,6 +236,17 @@ public class ConnectedPlayer implements MinecraftConnectionAssociation, Player,
|
|||||||
return chatQueue;
|
return chatQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Discards any messages still being processed by the {@link ChatQueue}, and creates a fresh state for future packets.
|
||||||
|
* This should be used on server switches, or whenever the client resets its own 'last seen' state.
|
||||||
|
*/
|
||||||
|
public void discardChatQueue() {
|
||||||
|
// No need for atomic swap, should only be called from event loop
|
||||||
|
final ChatQueue oldChatQueue = chatQueue;
|
||||||
|
chatQueue = new ChatQueue(this);
|
||||||
|
oldChatQueue.close();
|
||||||
|
}
|
||||||
|
|
||||||
public BundleDelimiterHandler getBundleHandler() {
|
public BundleDelimiterHandler getBundleHandler() {
|
||||||
return this.bundleHandler;
|
return this.bundleHandler;
|
||||||
}
|
}
|
||||||
|
@@ -32,13 +32,15 @@ import java.util.function.Function;
|
|||||||
* A precisely ordered queue which allows for outside entries into the ordered queue through
|
* A precisely ordered queue which allows for outside entries into the ordered queue through
|
||||||
* piggybacking timestamps.
|
* piggybacking timestamps.
|
||||||
*/
|
*/
|
||||||
public class ChatQueue {
|
public class ChatQueue implements AutoCloseable {
|
||||||
|
|
||||||
private final Object internalLock = new Object();
|
private final Object internalLock = new Object();
|
||||||
private final ConnectedPlayer player;
|
private final ConnectedPlayer player;
|
||||||
private final ChatState chatState = new ChatState();
|
private final ChatState chatState = new ChatState();
|
||||||
private CompletableFuture<Void> head = CompletableFuture.completedFuture(null);
|
private CompletableFuture<Void> head = CompletableFuture.completedFuture(null);
|
||||||
|
|
||||||
|
private volatile boolean closed;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Instantiates a {@link ChatQueue} for a specific {@link ConnectedPlayer}.
|
* Instantiates a {@link ChatQueue} for a specific {@link ConnectedPlayer}.
|
||||||
*
|
*
|
||||||
@@ -50,8 +52,14 @@ public class ChatQueue {
|
|||||||
|
|
||||||
private void queueTask(Task task) {
|
private void queueTask(Task task) {
|
||||||
synchronized (internalLock) {
|
synchronized (internalLock) {
|
||||||
|
if (closed) {
|
||||||
|
throw new IllegalStateException("ChatQueue has already been closed");
|
||||||
|
}
|
||||||
MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected();
|
MinecraftConnection smc = player.ensureAndGetCurrentServer().ensureConnected();
|
||||||
head = head.thenCompose(v -> {
|
head = head.thenCompose(v -> {
|
||||||
|
if (closed) {
|
||||||
|
return CompletableFuture.completedFuture(null);
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
return task.update(chatState, smc).exceptionally(ignored -> null);
|
return task.update(chatState, smc).exceptionally(ignored -> null);
|
||||||
} catch (Throwable ignored) {
|
} catch (Throwable ignored) {
|
||||||
@@ -102,9 +110,9 @@ public class ChatQueue {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T extends MinecraftPacket> CompletableFuture<Void> writePacket(T packet, MinecraftConnection smc) {
|
private <T extends MinecraftPacket> CompletableFuture<Void> writePacket(T packet, MinecraftConnection smc) {
|
||||||
return CompletableFuture.runAsync(() -> {
|
return CompletableFuture.runAsync(() -> {
|
||||||
if (!smc.isClosed()) {
|
if (!closed && !smc.isClosed()) {
|
||||||
ChannelFuture future = smc.write(packet);
|
ChannelFuture future = smc.write(packet);
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
future.awaitUninterruptibly();
|
future.awaitUninterruptibly();
|
||||||
@@ -113,6 +121,11 @@ public class ChatQueue {
|
|||||||
}, smc.eventLoop());
|
}, smc.eventLoop());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
|
|
||||||
private interface Task {
|
private interface Task {
|
||||||
CompletableFuture<Void> update(ChatState chatState, MinecraftConnection smc);
|
CompletableFuture<Void> update(ChatState chatState, MinecraftConnection smc);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user