Add support for sending and receiving login plugin messages from players and servers (#587)

This commit is contained in:
Andrew Steinborn
2021-10-31 16:27:03 -04:00
committed by GitHub
parent 922c001b59
commit cb8781b3c9
12 changed files with 433 additions and 24 deletions

View File

@@ -682,7 +682,7 @@ public class VelocityServer implements ProxyServer, ForwardingAudience {
}
@Override
public EventManager getEventManager() {
public VelocityEventManager getEventManager() {
return eventManager;
}

View File

@@ -17,6 +17,8 @@
package com.velocitypowered.proxy.connection.backend;
import com.velocitypowered.api.event.player.ServerLoginPluginMessageEvent;
import com.velocitypowered.api.proxy.messages.MinecraftChannelIdentifier;
import com.velocitypowered.api.util.GameProfile;
import com.velocitypowered.proxy.VelocityServer;
import com.velocitypowered.proxy.config.PlayerInfoForwarding;
@@ -36,8 +38,8 @@ import com.velocitypowered.proxy.protocol.packet.ServerLoginSuccess;
import com.velocitypowered.proxy.protocol.packet.SetCompression;
import com.velocitypowered.proxy.util.except.QuietRuntimeException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.net.InetSocketAddress;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CompletableFuture;
@@ -45,7 +47,6 @@ import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import net.kyori.adventure.text.Component;
import net.kyori.adventure.text.TextComponent;
public class LoginSessionHandler implements MinecraftSessionHandler {
@@ -82,8 +83,25 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
mc.write(response);
informationForwarded = true;
} else {
// Don't understand
mc.write(new LoginPluginResponse(packet.getId(), false, Unpooled.EMPTY_BUFFER));
// Don't understand, fire event if we have subscribers
if (!this.server.getEventManager().hasSubscribers(ServerLoginPluginMessageEvent.class)) {
mc.write(new LoginPluginResponse(packet.getId(), false, Unpooled.EMPTY_BUFFER));
return true;
}
final byte[] contents = ByteBufUtil.getBytes(packet.content());
final MinecraftChannelIdentifier identifier = MinecraftChannelIdentifier
.from(packet.getChannel());
this.server.getEventManager().fire(new ServerLoginPluginMessageEvent(serverConn, identifier,
contents, packet.getId()))
.thenAcceptAsync(event -> {
if (event.getResult().isAllowed()) {
mc.write(new LoginPluginResponse(packet.getId(), true, Unpooled
.wrappedBuffer(event.getResult().getResponse())));
} else {
mc.write(new LoginPluginResponse(packet.getId(), false, Unpooled.EMPTY_BUFFER));
}
}, mc.eventLoop());
}
return true;
}

View File

@@ -142,8 +142,9 @@ public class HandshakeSessionHandler implements MinecraftSessionHandler {
return;
}
server.getEventManager().fireAndForget(new ConnectionHandshakeEvent(ic));
connection.setSessionHandler(new LoginSessionHandler(server, connection, ic));
LoginInboundConnection lic = new LoginInboundConnection(ic);
server.getEventManager().fireAndForget(new ConnectionHandshakeEvent(lic));
connection.setSessionHandler(new LoginSessionHandler(server, connection, lic));
}
private ConnectionType getHandshakeConnectionType(Handshake handshake) {

View File

@@ -74,6 +74,10 @@ public final class InitialInboundConnection implements InboundConnection,
return "[initial connection] " + connection.getRemoteAddress().toString();
}
public MinecraftConnection getConnection() {
return connection;
}
/**
* Disconnects the connection from the server.
* @param reason the reason for disconnecting

View File

@@ -0,0 +1,147 @@
/*
* Copyright (C) 2018 Velocity Contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.velocitypowered.proxy.connection.client;
import com.velocitypowered.api.network.ProtocolVersion;
import com.velocitypowered.api.proxy.LoginPhaseConnection;
import com.velocitypowered.api.proxy.messages.ChannelIdentifier;
import com.velocitypowered.proxy.protocol.packet.LoginPluginMessage;
import com.velocitypowered.proxy.protocol.packet.LoginPluginResponse;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import net.kyori.adventure.text.Component;
import space.vectrix.flare.fastutil.Int2ObjectSyncMap;
public class LoginInboundConnection implements LoginPhaseConnection {
private static final AtomicIntegerFieldUpdater<LoginInboundConnection> SEQUENCE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(LoginInboundConnection.class, "sequenceCounter");
private final InitialInboundConnection delegate;
private final Int2ObjectMap<MessageConsumer> outstandingResponses;
private volatile int sequenceCounter;
private final Queue<LoginPluginMessage> loginMessagesToSend;
private volatile Runnable onAllMessagesHandled;
private volatile boolean loginEventFired;
LoginInboundConnection(
InitialInboundConnection delegate) {
this.delegate = delegate;
this.outstandingResponses = Int2ObjectSyncMap.hashmap();
this.loginMessagesToSend = new ArrayDeque<>();
}
@Override
public InetSocketAddress getRemoteAddress() {
return delegate.getRemoteAddress();
}
@Override
public Optional<InetSocketAddress> getVirtualHost() {
return delegate.getVirtualHost();
}
@Override
public boolean isActive() {
return delegate.isActive();
}
@Override
public ProtocolVersion getProtocolVersion() {
return delegate.getProtocolVersion();
}
@Override
public void sendLoginPluginMessage(ChannelIdentifier identifier, byte[] contents,
MessageConsumer consumer) {
if (identifier == null) {
throw new NullPointerException("identifier");
}
if (contents == null) {
throw new NullPointerException("contents");
}
if (consumer == null) {
throw new NullPointerException("consumer");
}
if (delegate.getProtocolVersion().compareTo(ProtocolVersion.MINECRAFT_1_13) < 0) {
throw new IllegalStateException("Login plugin messages can only be sent to clients running "
+ "Minecraft 1.13 and above");
}
final int id = SEQUENCE_UPDATER.incrementAndGet(this);
this.outstandingResponses.put(id, consumer);
final LoginPluginMessage message = new LoginPluginMessage(id, identifier.getId(),
Unpooled.wrappedBuffer(contents));
if (!this.loginEventFired) {
this.loginMessagesToSend.add(message);
} else {
this.delegate.getConnection().write(message);
}
}
/**
* Disconnects the connection from the server.
* @param reason the reason for disconnecting
*/
public void disconnect(Component reason) {
this.delegate.disconnect(reason);
this.cleanup();
}
void cleanup() {
this.loginMessagesToSend.clear();
this.outstandingResponses.clear();
this.onAllMessagesHandled = null;
}
void handleLoginPluginResponse(final LoginPluginResponse response) {
final MessageConsumer consumer = this.outstandingResponses.remove(response.getId());
if (consumer != null) {
try {
consumer.onMessageResponse(response.isSuccess() ? ByteBufUtil.getBytes(response.content())
: null);
} finally {
final Runnable onAllMessagesHandled = this.onAllMessagesHandled;
if (this.outstandingResponses.isEmpty() && onAllMessagesHandled != null) {
onAllMessagesHandled.run();
}
}
}
}
void loginEventFired(final Runnable onAllMessagesHandled) {
this.loginEventFired = true;
this.onAllMessagesHandled = onAllMessagesHandled;
if (!this.loginMessagesToSend.isEmpty()) {
LoginPluginMessage message;
while ((message = this.loginMessagesToSend.poll()) != null) {
this.delegate.getConnection().delayedWrite(message);
}
this.delegate.getConnection().flush();
} else {
onAllMessagesHandled.run();
}
}
}

View File

@@ -44,9 +44,9 @@ import com.velocitypowered.proxy.config.VelocityConfiguration;
import com.velocitypowered.proxy.connection.MinecraftConnection;
import com.velocitypowered.proxy.connection.MinecraftSessionHandler;
import com.velocitypowered.proxy.protocol.StateRegistry;
import com.velocitypowered.proxy.protocol.packet.Disconnect;
import com.velocitypowered.proxy.protocol.packet.EncryptionRequest;
import com.velocitypowered.proxy.protocol.packet.EncryptionResponse;
import com.velocitypowered.proxy.protocol.packet.LoginPluginResponse;
import com.velocitypowered.proxy.protocol.packet.ServerLogin;
import com.velocitypowered.proxy.protocol.packet.ServerLoginSuccess;
import com.velocitypowered.proxy.protocol.packet.SetCompression;
@@ -56,7 +56,6 @@ import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.MessageDigest;
import java.util.Arrays;
import java.util.Locale;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -64,7 +63,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import net.kyori.adventure.text.Component;
import net.kyori.adventure.text.format.NamedTextColor;
import net.kyori.adventure.translation.GlobalTranslator;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.asynchttpclient.ListenableFuture;
@@ -80,13 +78,13 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
private final VelocityServer server;
private final MinecraftConnection mcConnection;
private final InitialInboundConnection inbound;
private final LoginInboundConnection inbound;
private @MonotonicNonNull ServerLogin login;
private byte[] verify = EMPTY_BYTE_ARRAY;
private @MonotonicNonNull ConnectedPlayer connectedPlayer;
LoginSessionHandler(VelocityServer server, MinecraftConnection mcConnection,
InitialInboundConnection inbound) {
LoginInboundConnection inbound) {
this.server = Preconditions.checkNotNull(server, "server");
this.mcConnection = Preconditions.checkNotNull(mcConnection, "mcConnection");
this.inbound = Preconditions.checkNotNull(inbound, "inbound");
@@ -99,6 +97,12 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
return true;
}
@Override
public boolean handle(LoginPluginResponse packet) {
this.inbound.handleLoginPluginResponse(packet);
return true;
}
@Override
public boolean handle(EncryptionResponse packet) {
ServerLogin login = this.login;
@@ -197,15 +201,24 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
return;
}
if (!result.isForceOfflineMode() && (server.getConfiguration().isOnlineMode() || result
.isOnlineModeAllowed())) {
// Request encryption.
EncryptionRequest request = generateEncryptionRequest();
this.verify = Arrays.copyOf(request.getVerifyToken(), 4);
mcConnection.write(request);
} else {
initializePlayer(GameProfile.forOfflinePlayer(login.getUsername()), false);
}
inbound.loginEventFired(() -> {
if (mcConnection.isClosed()) {
// The player was disconnected
return;
}
mcConnection.eventLoop().execute(() -> {
if (!result.isForceOfflineMode() && (server.getConfiguration().isOnlineMode()
|| result.isOnlineModeAllowed())) {
// Request encryption.
EncryptionRequest request = generateEncryptionRequest();
this.verify = Arrays.copyOf(request.getVerifyToken(), 4);
mcConnection.write(request);
} else {
initializePlayer(GameProfile.forOfflinePlayer(login.getUsername()), false);
}
});
});
}, mcConnection.eventLoop())
.exceptionally((ex) -> {
logger.error("Exception in pre-login stage", ex);
@@ -354,5 +367,6 @@ public class LoginSessionHandler implements MinecraftSessionHandler {
if (connectedPlayer != null) {
connectedPlayer.teardown();
}
this.inbound.cleanup();
}
}

View File

@@ -434,11 +434,24 @@ public class VelocityEventManager implements EventManager {
.collect(Collectors.toList()));
}
/**
* Determines whether the given event class has any subscribers. This may bake the list of event
* handlers.
*
* @param eventClass the class of the event to check
* @return {@code true} if any subscribers were found, else {@code false}
*/
public boolean hasSubscribers(final Class<?> eventClass) {
requireNonNull(eventClass, "eventClass");
final HandlersCache handlersCache = this.handlersCache.get(eventClass);
return handlersCache != null && handlersCache.handlers.length > 0;
}
@Override
public void fireAndForget(final Object event) {
requireNonNull(event, "event");
final HandlersCache handlersCache = this.handlersCache.get(event.getClass());
if (handlersCache == null) {
if (handlersCache == null || handlersCache.handlers.length == 0) {
// Optimization: nobody's listening.
return;
}
@@ -449,7 +462,7 @@ public class VelocityEventManager implements EventManager {
public <E> CompletableFuture<E> fire(final E event) {
requireNonNull(event, "event");
final HandlersCache handlersCache = this.handlersCache.get(event.getClass());
if (handlersCache == null) {
if (handlersCache == null || handlersCache.handlers.length == 0) {
// Optimization: nobody's listening.
return CompletableFuture.completedFuture(event);
}

View File

@@ -71,7 +71,7 @@ public class LoginPluginResponse extends DeferredByteBufHolder implements Minecr
this.id = ProtocolUtils.readVarInt(buf);
this.success = buf.readBoolean();
if (buf.isReadable()) {
this.replace(buf.readSlice(buf.readableBytes()));
this.replace(buf.readRetainedSlice(buf.readableBytes()));
} else {
this.replace(Unpooled.EMPTY_BUFFER);
}