Merge branch 'decode-multiple' into dev/1.1.0
This commit is contained in:
@@ -48,8 +48,7 @@ dependencies {
|
||||
compile "io.netty:netty-handler:${nettyVersion}"
|
||||
compile "io.netty:netty-transport-native-epoll:${nettyVersion}"
|
||||
compile "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-x86_64"
|
||||
compile "io.netty:netty-transport-native-kqueue:${nettyVersion}"
|
||||
compile "io.netty:netty-transport-native-kqueue:${nettyVersion}:osx-x86_64"
|
||||
compile "io.netty:netty-transport-native-epoll:${nettyVersion}:linux-aarch64"
|
||||
compile "io.netty:netty-resolver-dns:${nettyVersion}"
|
||||
|
||||
compile "org.apache.logging.log4j:log4j-api:${log4jVersion}"
|
||||
|
@@ -120,12 +120,12 @@ public class VelocityServer implements ProxyServer {
|
||||
}
|
||||
|
||||
public KeyPair getServerKeyPair() {
|
||||
return ensureInitialized(serverKeyPair);
|
||||
return serverKeyPair;
|
||||
}
|
||||
|
||||
@Override
|
||||
public VelocityConfiguration getConfiguration() {
|
||||
return ensureInitialized(this.configuration);
|
||||
return this.configuration;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -229,7 +229,6 @@ public class VelocityServer implements ProxyServer {
|
||||
Metrics.VelocityMetrics.startMetrics(this, configuration.getMetrics());
|
||||
}
|
||||
|
||||
@RequiresNonNull({"pluginManager", "eventManager"})
|
||||
private void loadPlugins() {
|
||||
logger.info("Loading plugins...");
|
||||
|
||||
@@ -443,18 +442,11 @@ public class VelocityServer implements ProxyServer {
|
||||
}
|
||||
|
||||
public AsyncHttpClient getAsyncHttpClient() {
|
||||
return ensureInitialized(cm).getHttpClient();
|
||||
return cm.getHttpClient();
|
||||
}
|
||||
|
||||
public Ratelimiter getIpAttemptLimiter() {
|
||||
return ensureInitialized(ipAttemptLimiter);
|
||||
}
|
||||
|
||||
private static <T> T ensureInitialized(T o) {
|
||||
if (o == null) {
|
||||
throw new IllegalStateException("The proxy isn't fully initialized.");
|
||||
}
|
||||
return o;
|
||||
return ipAttemptLimiter;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@@ -465,7 +465,7 @@ public class VelocityConfiguration extends AnnotatedConfig implements ProxyConfi
|
||||
Advanced advanced = new Advanced(toml.getTable("advanced"));
|
||||
Query query = new Query(toml.getTable("query"));
|
||||
Metrics metrics = new Metrics(toml.getTable("metrics"));
|
||||
byte[] forwardingSecret = toml.getString("forwarding-secret", "5up3r53cr3t")
|
||||
byte[] forwardingSecret = toml.getString("forwarding-secret", generateRandomString(12))
|
||||
.getBytes(StandardCharsets.UTF_8);
|
||||
|
||||
String forwardingModeName = toml.getString("player-info-forwarding-mode", "MODERN")
|
||||
|
@@ -16,7 +16,6 @@ import com.velocitypowered.natives.encryption.VelocityCipher;
|
||||
import com.velocitypowered.natives.encryption.VelocityCipherFactory;
|
||||
import com.velocitypowered.natives.util.Natives;
|
||||
import com.velocitypowered.proxy.VelocityServer;
|
||||
import com.velocitypowered.proxy.network.netty.DiscardHandler;
|
||||
import com.velocitypowered.proxy.protocol.MinecraftPacket;
|
||||
import com.velocitypowered.proxy.protocol.StateRegistry;
|
||||
import com.velocitypowered.proxy.protocol.netty.MinecraftCipherDecoder;
|
||||
@@ -125,6 +124,13 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
|
||||
if (sessionHandler != null) {
|
||||
sessionHandler.readCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
if (ctx.channel().isActive()) {
|
||||
@@ -145,7 +151,6 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter {
|
||||
}
|
||||
}
|
||||
|
||||
installDiscardHandler(ctx);
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
@@ -161,18 +166,6 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter {
|
||||
Preconditions.checkState(this.channel.eventLoop().inEventLoop(), "Not in event loop");
|
||||
}
|
||||
|
||||
private void installDiscardHandler(ChannelHandlerContext ctx) {
|
||||
if (ctx.pipeline().get("discard") == null) {
|
||||
ctx.pipeline().addBefore(MINECRAFT_DECODER, "discard", DiscardHandler.HANDLER);
|
||||
}
|
||||
}
|
||||
|
||||
private void installDiscardHandler() {
|
||||
if (channel.pipeline().get("discard") == null) {
|
||||
channel.pipeline().addBefore(MINECRAFT_DECODER, "discard", DiscardHandler.HANDLER);
|
||||
}
|
||||
}
|
||||
|
||||
public EventLoop eventLoop() {
|
||||
return channel.eventLoop();
|
||||
}
|
||||
@@ -212,9 +205,10 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter {
|
||||
*/
|
||||
public void closeWith(Object msg) {
|
||||
if (channel.isActive()) {
|
||||
knownDisconnect = true;
|
||||
installDiscardHandler();
|
||||
channel.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
|
||||
channel.eventLoop().execute(() -> {
|
||||
knownDisconnect = true;
|
||||
channel.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,7 +217,6 @@ public class MinecraftConnection extends ChannelInboundHandlerAdapter {
|
||||
*/
|
||||
public void close() {
|
||||
if (channel.isActive()) {
|
||||
installDiscardHandler();
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
|
@@ -70,6 +70,10 @@ public interface MinecraftSessionHandler {
|
||||
|
||||
}
|
||||
|
||||
default void readCompleted() {
|
||||
|
||||
}
|
||||
|
||||
default boolean handle(AvailableCommands commands) {
|
||||
return false;
|
||||
}
|
||||
|
@@ -188,12 +188,17 @@ public class BackendPlaySessionHandler implements MinecraftSessionHandler {
|
||||
if (packet instanceof PluginMessage) {
|
||||
((PluginMessage) packet).retain();
|
||||
}
|
||||
playerConnection.write(packet);
|
||||
playerConnection.delayedWrite(packet);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleUnknown(ByteBuf buf) {
|
||||
playerConnection.write(buf.retain());
|
||||
playerConnection.delayedWrite(buf.retain());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readCompleted() {
|
||||
playerConnection.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@@ -170,59 +170,45 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
|
||||
public boolean handle(PluginMessage packet) {
|
||||
VelocityServerConnection serverConn = player.getConnectedServer();
|
||||
MinecraftConnection backendConn = serverConn != null ? serverConn.getConnection() : null;
|
||||
if (serverConn != null && backendConn != null) {
|
||||
if (backendConn.getState() != StateRegistry.PLAY) {
|
||||
logger.warn("A plugin message was received while the backend server was not "
|
||||
+ "ready. Channel: {}. Packet discarded.", packet.getChannel());
|
||||
} else if (PluginMessageUtil.isRegister(packet)) {
|
||||
player.getKnownChannels().addAll(PluginMessageUtil.getChannels(packet));
|
||||
backendConn.write(packet.retain());
|
||||
} else if (PluginMessageUtil.isUnregister(packet)) {
|
||||
player.getKnownChannels().removeAll(PluginMessageUtil.getChannels(packet));
|
||||
backendConn.write(packet.retain());
|
||||
} else if (PluginMessageUtil.isMcBrand(packet)) {
|
||||
backendConn.write(PluginMessageUtil
|
||||
.rewriteMinecraftBrand(packet, server.getVersion(), player.getProtocolVersion()));
|
||||
} else {
|
||||
if (serverConn.getPhase() == BackendConnectionPhases.IN_TRANSITION) {
|
||||
// We must bypass the currently-connected server when forwarding Forge packets.
|
||||
VelocityServerConnection inFlight = player.getConnectionInFlight();
|
||||
if (inFlight != null) {
|
||||
player.getPhase().handle(player, packet, inFlight);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!player.getPhase().handle(player, packet, serverConn)) {
|
||||
if (!player.getPhase().consideredComplete() || !serverConn.getPhase()
|
||||
.consideredComplete()) {
|
||||
// The client is trying to send messages too early. This is primarily caused by mods,
|
||||
// but further aggravated by Velocity. To work around these issues, we will queue any
|
||||
// non-FML handshake messages to be sent once the FML handshake has completed or the
|
||||
// JoinGame packet has been received by the proxy, whichever comes first.
|
||||
//
|
||||
// We also need to make sure to retain these packets so they can be flushed
|
||||
// appropriately.
|
||||
loginPluginMessages.add(packet.retain());
|
||||
} else {
|
||||
ChannelIdentifier id = server.getChannelRegistrar().getFromId(packet.getChannel());
|
||||
if (id == null) {
|
||||
backendConn.write(packet.retain());
|
||||
} else {
|
||||
byte[] copy = ByteBufUtil.getBytes(packet.content());
|
||||
PluginMessageEvent event = new PluginMessageEvent(player, serverConn, id,
|
||||
ByteBufUtil.getBytes(packet.content()));
|
||||
server.getEventManager().fire(event).thenAcceptAsync(pme -> {
|
||||
PluginMessage message = new PluginMessage(packet.getChannel(),
|
||||
Unpooled.wrappedBuffer(copy));
|
||||
backendConn.write(message);
|
||||
}, backendConn.eventLoop());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (serverConn == null || backendConn == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (backendConn.getState() != StateRegistry.PLAY) {
|
||||
logger.warn("A plugin message was received while the backend server was not "
|
||||
+ "ready. Channel: {}. Packet discarded.", packet.getChannel());
|
||||
return true;
|
||||
}
|
||||
|
||||
if (this.tryHandleVanillaPluginMessageChannel(packet, backendConn)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (serverConn.getPhase() == BackendConnectionPhases.IN_TRANSITION) {
|
||||
// We must bypass the currently-connected server when forwarding Forge packets.
|
||||
VelocityServerConnection inFlight = player.getConnectionInFlight();
|
||||
if (inFlight != null) {
|
||||
if (player.getPhase().handle(player, packet, inFlight)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
} else if (!this.tryHandleForgeMessage(packet, serverConn)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
ChannelIdentifier id = server.getChannelRegistrar().getFromId(packet.getChannel());
|
||||
if (id == null) {
|
||||
backendConn.write(packet.retain());
|
||||
} else {
|
||||
byte[] copy = ByteBufUtil.getBytes(packet.content());
|
||||
PluginMessageEvent event = new PluginMessageEvent(player, serverConn, id,
|
||||
ByteBufUtil.getBytes(packet.content()));
|
||||
server.getEventManager().fire(event).thenAcceptAsync(pme -> {
|
||||
PluginMessage message = new PluginMessage(packet.getChannel(),
|
||||
Unpooled.wrappedBuffer(copy));
|
||||
backendConn.write(message);
|
||||
}, backendConn.eventLoop());
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@@ -277,9 +263,15 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
|
||||
|
||||
@Override
|
||||
public void writabilityChanged() {
|
||||
boolean writable = player.getConnection().getChannel().isWritable();
|
||||
|
||||
if (!writable) {
|
||||
// We might have packets queued for the server, so flush them now to free up memory.
|
||||
player.getConnection().flush();
|
||||
}
|
||||
|
||||
VelocityServerConnection serverConn = player.getConnectedServer();
|
||||
if (serverConn != null) {
|
||||
boolean writable = player.getConnection().getChannel().isWritable();
|
||||
MinecraftConnection smc = serverConn.getConnection();
|
||||
if (smc != null) {
|
||||
smc.setAutoReading(writable);
|
||||
@@ -287,6 +279,44 @@ public class ClientPlaySessionHandler implements MinecraftSessionHandler {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean tryHandleVanillaPluginMessageChannel(PluginMessage packet,
|
||||
MinecraftConnection backendConn) {
|
||||
if (PluginMessageUtil.isRegister(packet)) {
|
||||
player.getKnownChannels().addAll(PluginMessageUtil.getChannels(packet));
|
||||
backendConn.write(packet.retain());
|
||||
return true;
|
||||
} else if (PluginMessageUtil.isUnregister(packet)) {
|
||||
player.getKnownChannels().removeAll(PluginMessageUtil.getChannels(packet));
|
||||
backendConn.write(packet.retain());
|
||||
return true;
|
||||
} else if (PluginMessageUtil.isMcBrand(packet)) {
|
||||
backendConn.write(PluginMessageUtil.rewriteMinecraftBrand(packet, server.getVersion(),
|
||||
player.getProtocolVersion()));
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean tryHandleForgeMessage(PluginMessage packet, VelocityServerConnection serverConn) {
|
||||
if (player.getPhase().handle(player, packet, serverConn)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (!player.getPhase().consideredComplete() || !serverConn.getPhase().consideredComplete()) {
|
||||
// The client is trying to send messages too early. This is primarily caused by mods,
|
||||
// but further aggravated by Velocity. To work around these issues, we will queue any
|
||||
// non-FML handshake messages to be sent once the FML handshake has completed or the
|
||||
// JoinGame packet has been received by the proxy, whichever comes first.
|
||||
//
|
||||
// We also need to make sure to retain these packets so they can be flushed
|
||||
// appropriately.
|
||||
loginPluginMessages.add(packet.retain());
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the {@code JoinGame} packet. This function is responsible for handling the client-side
|
||||
* switching servers in Velocity.
|
||||
|
@@ -6,7 +6,7 @@ import static org.asynchttpclient.Dsl.config;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.velocitypowered.natives.util.Natives;
|
||||
import com.velocitypowered.proxy.VelocityServer;
|
||||
import com.velocitypowered.proxy.network.netty.DnsAddressResolverGroupNameResolverAdapter;
|
||||
import com.velocitypowered.proxy.network.netty.SeparatePoolInetNameResolver;
|
||||
import com.velocitypowered.proxy.protocol.netty.GS4QueryHandler;
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
@@ -18,6 +18,7 @@ import io.netty.channel.WriteBufferWaterMark;
|
||||
import io.netty.channel.epoll.EpollChannelOption;
|
||||
import io.netty.resolver.dns.DnsAddressResolverGroup;
|
||||
import io.netty.resolver.dns.DnsNameResolverBuilder;
|
||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
@@ -48,7 +49,7 @@ public final class ConnectionManager {
|
||||
@SuppressWarnings("WeakerAccess")
|
||||
public final BackendChannelInitializerHolder backendChannelInitializer;
|
||||
|
||||
private final DnsAddressResolverGroup resolverGroup;
|
||||
private final SeparatePoolInetNameResolver resolver;
|
||||
private final AsyncHttpClient httpClient;
|
||||
|
||||
/**
|
||||
@@ -65,21 +66,16 @@ public final class ConnectionManager {
|
||||
new ServerChannelInitializer(this.server));
|
||||
this.backendChannelInitializer = new BackendChannelInitializerHolder(
|
||||
new BackendChannelInitializer(this.server));
|
||||
this.resolverGroup = new DnsAddressResolverGroup(new DnsNameResolverBuilder()
|
||||
.channelType(this.transportType.datagramChannelClass)
|
||||
.negativeTtl(15)
|
||||
.ndots(1));
|
||||
this.resolver = new SeparatePoolInetNameResolver(GlobalEventExecutor.INSTANCE);
|
||||
this.httpClient = asyncHttpClient(config()
|
||||
.setEventLoopGroup(this.workerGroup)
|
||||
.setUserAgent(server.getVersion().getName() + "/" + server.getVersion().getVersion())
|
||||
.addRequestFilter(new RequestFilter() {
|
||||
@Override
|
||||
public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException {
|
||||
public <T> FilterContext<T> filter(FilterContext<T> ctx) {
|
||||
return new FilterContextBuilder<>(ctx)
|
||||
.request(new RequestBuilder(ctx.getRequest())
|
||||
.setNameResolver(
|
||||
new DnsAddressResolverGroupNameResolverAdapter(resolverGroup, workerGroup)
|
||||
)
|
||||
.setNameResolver(resolver)
|
||||
.build())
|
||||
.build();
|
||||
}
|
||||
@@ -162,7 +158,7 @@ public final class ConnectionManager {
|
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
|
||||
this.server.getConfiguration().getConnectTimeout())
|
||||
.group(group == null ? this.workerGroup : group)
|
||||
.resolver(this.resolverGroup);
|
||||
.resolver(this.resolver.asGroup());
|
||||
if (transportType == TransportType.EPOLL && server.getConfiguration().useTcpFastOpen()) {
|
||||
bootstrap.option(EpollChannelOption.TCP_FASTOPEN_CONNECT, true);
|
||||
}
|
||||
@@ -194,6 +190,8 @@ public final class ConnectionManager {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
this.resolver.shutdown();
|
||||
}
|
||||
|
||||
public EventLoopGroup getBossGroup() {
|
||||
|
@@ -7,11 +7,6 @@ import io.netty.channel.epoll.EpollDatagramChannel;
|
||||
import io.netty.channel.epoll.EpollEventLoopGroup;
|
||||
import io.netty.channel.epoll.EpollServerSocketChannel;
|
||||
import io.netty.channel.epoll.EpollSocketChannel;
|
||||
import io.netty.channel.kqueue.KQueue;
|
||||
import io.netty.channel.kqueue.KQueueDatagramChannel;
|
||||
import io.netty.channel.kqueue.KQueueEventLoopGroup;
|
||||
import io.netty.channel.kqueue.KQueueServerSocketChannel;
|
||||
import io.netty.channel.kqueue.KQueueSocketChannel;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.DatagramChannel;
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
@@ -27,11 +22,7 @@ enum TransportType {
|
||||
(name, type) -> new NioEventLoopGroup(0, createThreadFactory(name, type))),
|
||||
EPOLL("epoll", EpollServerSocketChannel.class, EpollSocketChannel.class,
|
||||
EpollDatagramChannel.class,
|
||||
(name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type))),
|
||||
KQUEUE("Kqueue", KQueueServerSocketChannel.class, KQueueSocketChannel.class,
|
||||
KQueueDatagramChannel.class,
|
||||
(name, type) -> new KQueueEventLoopGroup(0, createThreadFactory(name, type)));
|
||||
|
||||
(name, type) -> new EpollEventLoopGroup(0, createThreadFactory(name, type)));
|
||||
|
||||
final String name;
|
||||
final Class<? extends ServerSocketChannel> serverSocketChannelClass;
|
||||
@@ -71,8 +62,6 @@ enum TransportType {
|
||||
|
||||
if (Epoll.isAvailable()) {
|
||||
return EPOLL;
|
||||
} else if (KQueue.isAvailable()) {
|
||||
return KQUEUE;
|
||||
} else {
|
||||
return NIO;
|
||||
}
|
||||
|
@@ -1,17 +0,0 @@
|
||||
package com.velocitypowered.proxy.network.netty;
|
||||
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.util.ReferenceCountUtil;
|
||||
|
||||
@Sharable
|
||||
public class DiscardHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
public static final DiscardHandler HANDLER = new DiscardHandler();
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
||||
ReferenceCountUtil.release(msg);
|
||||
}
|
||||
}
|
@@ -1,68 +0,0 @@
|
||||
package com.velocitypowered.proxy.network.netty;
|
||||
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.resolver.InetNameResolver;
|
||||
import io.netty.resolver.dns.DnsAddressResolverGroup;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.FutureListener;
|
||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import io.netty.util.internal.ThreadExecutorMap;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class DnsAddressResolverGroupNameResolverAdapter extends InetNameResolver {
|
||||
|
||||
private final DnsAddressResolverGroup resolverGroup;
|
||||
private final EventLoopGroup group;
|
||||
|
||||
/**
|
||||
* Creates a DnsAddressResolverGroupNameResolverAdapter.
|
||||
* @param resolverGroup the resolver group to use
|
||||
* @param group the event loop group
|
||||
*/
|
||||
public DnsAddressResolverGroupNameResolverAdapter(
|
||||
DnsAddressResolverGroup resolverGroup, EventLoopGroup group) {
|
||||
super(ImmediateEventExecutor.INSTANCE);
|
||||
this.resolverGroup = resolverGroup;
|
||||
this.group = group;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doResolve(String inetHost, Promise<InetAddress> promise) throws Exception {
|
||||
EventExecutor executor = this.findExecutor();
|
||||
resolverGroup.getResolver(executor).resolve(InetSocketAddress.createUnresolved(inetHost, 17))
|
||||
.addListener((FutureListener<InetSocketAddress>) future -> {
|
||||
if (future.isSuccess()) {
|
||||
promise.trySuccess(future.getNow().getAddress());
|
||||
} else {
|
||||
promise.tryFailure(future.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise)
|
||||
throws Exception {
|
||||
EventExecutor executor = this.findExecutor();
|
||||
resolverGroup.getResolver(executor).resolveAll(InetSocketAddress.createUnresolved(inetHost, 17))
|
||||
.addListener((FutureListener<List<InetSocketAddress>>) future -> {
|
||||
if (future.isSuccess()) {
|
||||
List<InetAddress> addresses = new ArrayList<>(future.getNow().size());
|
||||
for (InetSocketAddress address : future.getNow()) {
|
||||
addresses.add(address.getAddress());
|
||||
}
|
||||
promise.trySuccess(addresses);
|
||||
} else {
|
||||
promise.tryFailure(future.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private EventExecutor findExecutor() {
|
||||
EventExecutor current = ThreadExecutorMap.currentExecutor();
|
||||
return current == null ? group.next() : current;
|
||||
}
|
||||
}
|
@@ -0,0 +1,79 @@
|
||||
package com.velocitypowered.proxy.network.netty;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import io.netty.resolver.AddressResolver;
|
||||
import io.netty.resolver.AddressResolverGroup;
|
||||
import io.netty.resolver.DefaultNameResolver;
|
||||
import io.netty.resolver.InetNameResolver;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
import java.net.InetAddress;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
public final class SeparatePoolInetNameResolver extends InetNameResolver {
|
||||
|
||||
private final ExecutorService resolveExecutor;
|
||||
private final InetNameResolver delegate;
|
||||
private AddressResolverGroup<InetSocketAddress> resolverGroup;
|
||||
|
||||
/**
|
||||
* Creates a new instance of {@code SeparatePoolInetNameResolver}.
|
||||
*
|
||||
* @param executor the {@link EventExecutor} which is used to notify the listeners of the {@link
|
||||
* Future} returned by {@link #resolve(String)}
|
||||
*/
|
||||
public SeparatePoolInetNameResolver(EventExecutor executor) {
|
||||
super(executor);
|
||||
this.resolveExecutor = Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder()
|
||||
.setNameFormat("Velocity DNS Resolver")
|
||||
.setDaemon(true)
|
||||
.build());
|
||||
this.delegate = new DefaultNameResolver(executor);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doResolve(String inetHost, Promise<InetAddress> promise) throws Exception {
|
||||
try {
|
||||
resolveExecutor.execute(() -> this.delegate.resolve(inetHost, promise));
|
||||
} catch (RejectedExecutionException e) {
|
||||
promise.setFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doResolveAll(String inetHost, Promise<List<InetAddress>> promise)
|
||||
throws Exception {
|
||||
try {
|
||||
resolveExecutor.execute(() -> this.delegate.resolveAll(inetHost, promise));
|
||||
} catch (RejectedExecutionException e) {
|
||||
promise.setFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
this.resolveExecutor.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a view of this resolver as a AddressResolverGroup.
|
||||
*
|
||||
* @return a view of this resolver as a AddressResolverGroup
|
||||
*/
|
||||
public AddressResolverGroup<InetSocketAddress> asGroup() {
|
||||
if (this.resolverGroup == null) {
|
||||
this.resolverGroup = new AddressResolverGroup<InetSocketAddress>() {
|
||||
@Override
|
||||
protected AddressResolver<InetSocketAddress> newResolver(EventExecutor executor) {
|
||||
return asAddressResolver();
|
||||
}
|
||||
};
|
||||
}
|
||||
return this.resolverGroup;
|
||||
}
|
||||
}
|
@@ -22,7 +22,6 @@ public class MinecraftCipherDecoder extends MessageToMessageDecoder<ByteBuf> {
|
||||
try {
|
||||
cipher.process(compatible);
|
||||
out.add(compatible);
|
||||
in.skipBytes(in.readableBytes());
|
||||
} catch (Exception e) {
|
||||
compatible.release(); // compatible will never be used if we throw an exception
|
||||
throw e;
|
||||
|
@@ -13,9 +13,13 @@ import java.util.List;
|
||||
|
||||
public class MinecraftCompressDecoder extends MessageToMessageDecoder<ByteBuf> {
|
||||
|
||||
private static final int SOFT_MAXIMUM_UNCOMPRESSED_SIZE = 2 * 1024 * 1024; // 2MiB
|
||||
private static final int VANILLA_MAXIMUM_UNCOMPRESSED_SIZE = 2 * 1024 * 1024; // 2MiB
|
||||
private static final int HARD_MAXIMUM_UNCOMPRESSED_SIZE = 16 * 1024 * 1024; // 16MiB
|
||||
|
||||
private static final int UNCOMPRESSED_CAP =
|
||||
Boolean.getBoolean("velocity.increased-compression-cap")
|
||||
? HARD_MAXIMUM_UNCOMPRESSED_SIZE : VANILLA_MAXIMUM_UNCOMPRESSED_SIZE;
|
||||
|
||||
private final int threshold;
|
||||
private final VelocityCompressor compressor;
|
||||
|
||||
@@ -28,21 +32,21 @@ public class MinecraftCompressDecoder extends MessageToMessageDecoder<ByteBuf> {
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
int claimedUncompressedSize = ProtocolUtils.readVarInt(in);
|
||||
if (claimedUncompressedSize == 0) {
|
||||
// Strip the now-useless uncompressed size, this message is already uncompressed.
|
||||
// This message is not compressed.
|
||||
out.add(in.retainedSlice());
|
||||
in.skipBytes(in.readableBytes());
|
||||
return;
|
||||
}
|
||||
|
||||
checkFrame(claimedUncompressedSize >= threshold, "Uncompressed size %s is less than"
|
||||
+ " threshold %s", claimedUncompressedSize, threshold);
|
||||
int allowedMax = Math.min(claimedUncompressedSize, HARD_MAXIMUM_UNCOMPRESSED_SIZE);
|
||||
int initialCapacity = Math.min(claimedUncompressedSize, SOFT_MAXIMUM_UNCOMPRESSED_SIZE);
|
||||
checkFrame(claimedUncompressedSize <= UNCOMPRESSED_CAP,
|
||||
"Uncompressed size %s exceeds hard threshold of %s", claimedUncompressedSize,
|
||||
UNCOMPRESSED_CAP);
|
||||
|
||||
ByteBuf compatibleIn = ensureCompatible(ctx.alloc(), compressor, in);
|
||||
ByteBuf uncompressed = preferredBuffer(ctx.alloc(), compressor, initialCapacity);
|
||||
ByteBuf uncompressed = preferredBuffer(ctx.alloc(), compressor, claimedUncompressedSize);
|
||||
try {
|
||||
compressor.inflate(compatibleIn, uncompressed, allowedMax);
|
||||
compressor.inflate(compatibleIn, uncompressed, claimedUncompressedSize);
|
||||
out.add(uncompressed);
|
||||
} catch (Exception e) {
|
||||
uncompressed.release();
|
||||
|
@@ -38,8 +38,11 @@ public class MinecraftCompressEncoder extends MessageToByteEncoder<ByteBuf> {
|
||||
@Override
|
||||
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
|
||||
throws Exception {
|
||||
int initialBufferSize = msg.readableBytes() <= threshold ? msg.readableBytes() + 1 :
|
||||
msg.readableBytes() / 3;
|
||||
// Follow the advice of https://github.com/ebiggers/libdeflate/blob/master/libdeflate.h#L103
|
||||
// here for compression. The maximum buffer size if the data compresses well (which is almost
|
||||
// always the case) is one less the input buffer.
|
||||
int offset = msg.readableBytes() < threshold ? 1 : -1;
|
||||
int initialBufferSize = msg.readableBytes() + offset;
|
||||
return MoreByteBufUtils.preferredBuffer(ctx.alloc(), compressor, initialBufferSize);
|
||||
}
|
||||
|
||||
|
@@ -1,43 +1,88 @@
|
||||
package com.velocitypowered.proxy.protocol.netty;
|
||||
|
||||
import com.velocitypowered.proxy.protocol.ProtocolUtils;
|
||||
import com.velocitypowered.proxy.util.except.QuietException;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
import io.netty.handler.codec.CorruptedFrameException;
|
||||
import io.netty.util.ByteProcessor;
|
||||
import java.util.List;
|
||||
|
||||
public class MinecraftVarintFrameDecoder extends ByteToMessageDecoder {
|
||||
|
||||
private static final QuietException BAD_LENGTH_CACHED = new QuietException("Bad packet length");
|
||||
private static final QuietException VARINT_BIG_CACHED = new QuietException("VarInt too big");
|
||||
private final VarintByteDecoder reader = new VarintByteDecoder();
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
|
||||
if (!in.isReadable()) {
|
||||
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
|
||||
if (!ctx.channel().isActive()) {
|
||||
in.skipBytes(in.readableBytes());
|
||||
return;
|
||||
}
|
||||
|
||||
int origReaderIndex = in.readerIndex();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
if (!in.isReadable()) {
|
||||
in.readerIndex(origReaderIndex);
|
||||
while (in.isReadable()) {
|
||||
reader.reset();
|
||||
|
||||
int varintEnd = in.forEachByte(reader);
|
||||
if (varintEnd == -1) {
|
||||
// We tried to go beyond the end of the buffer. This is probably a good sign that the
|
||||
// buffer was too short to hold a proper varint.
|
||||
return;
|
||||
}
|
||||
|
||||
byte read = in.readByte();
|
||||
if (read >= 0) {
|
||||
// Make sure reader index of length buffer is returned to the beginning
|
||||
in.readerIndex(origReaderIndex);
|
||||
int packetLength = ProtocolUtils.readVarInt(in);
|
||||
|
||||
if (in.readableBytes() >= packetLength) {
|
||||
out.add(in.readRetainedSlice(packetLength));
|
||||
if (reader.result == DecodeResult.SUCCESS) {
|
||||
if (reader.readVarint < 0) {
|
||||
throw BAD_LENGTH_CACHED;
|
||||
} else if (reader.readVarint == 0) {
|
||||
// skip over the empty packet and ignore it
|
||||
in.readerIndex(varintEnd + 1);
|
||||
} else {
|
||||
in.readerIndex(origReaderIndex);
|
||||
int minimumRead = reader.bytesRead + reader.readVarint;
|
||||
if (in.isReadable(minimumRead)) {
|
||||
out.add(in.retainedSlice(varintEnd + 1, reader.readVarint));
|
||||
in.skipBytes(minimumRead);
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
} else if (reader.result == DecodeResult.TOO_BIG) {
|
||||
throw VARINT_BIG_CACHED;
|
||||
} else if (reader.result == DecodeResult.TOO_SHORT) {
|
||||
// No-op: we couldn't get a useful result.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
throw new CorruptedFrameException("VarInt too big");
|
||||
private static class VarintByteDecoder implements ByteProcessor {
|
||||
private int readVarint;
|
||||
private int bytesRead;
|
||||
private DecodeResult result = DecodeResult.TOO_SHORT;
|
||||
|
||||
@Override
|
||||
public boolean process(byte k) {
|
||||
readVarint |= (k & 0x7F) << bytesRead++ * 7;
|
||||
if (bytesRead > 3) {
|
||||
result = DecodeResult.TOO_BIG;
|
||||
return false;
|
||||
}
|
||||
if ((k & 0x80) != 128) {
|
||||
result = DecodeResult.SUCCESS;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void reset() {
|
||||
readVarint = 0;
|
||||
bytesRead = 0;
|
||||
result = DecodeResult.TOO_SHORT;
|
||||
}
|
||||
}
|
||||
|
||||
private enum DecodeResult {
|
||||
SUCCESS,
|
||||
TOO_SHORT,
|
||||
TOO_BIG
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user