Add support for SO_REUSEPORT

This commit is contained in:
Andrew Steinborn
2025-02-27 23:42:39 -05:00
parent 0afe061224
commit d4ea40a4a2
3 changed files with 93 additions and 40 deletions

View File

@@ -407,6 +407,10 @@ public class VelocityConfiguration implements ProxyConfig {
return forceKeyAuthentication; return forceKeyAuthentication;
} }
public boolean isEnableReusePort() {
return advanced.isEnableReusePort();
}
@Override @Override
public String toString() { public String toString() {
return MoreObjects.toStringHelper(this) return MoreObjects.toStringHelper(this)
@@ -716,6 +720,8 @@ public class VelocityConfiguration implements ProxyConfig {
private boolean logPlayerConnections = true; private boolean logPlayerConnections = true;
@Expose @Expose
private boolean acceptTransfers = false; private boolean acceptTransfers = false;
@Expose
private boolean enableReusePort = false;
private Advanced() { private Advanced() {
} }
@@ -741,6 +747,7 @@ public class VelocityConfiguration implements ProxyConfig {
this.logCommandExecutions = config.getOrElse("log-command-executions", false); this.logCommandExecutions = config.getOrElse("log-command-executions", false);
this.logPlayerConnections = config.getOrElse("log-player-connections", true); this.logPlayerConnections = config.getOrElse("log-player-connections", true);
this.acceptTransfers = config.getOrElse("accepts-transfers", false); this.acceptTransfers = config.getOrElse("accepts-transfers", false);
this.enableReusePort = config.getOrElse("enable-reuse-port", false);
} }
} }
@@ -804,6 +811,10 @@ public class VelocityConfiguration implements ProxyConfig {
return this.acceptTransfers; return this.acceptTransfers;
} }
public boolean isEnableReusePort() {
return enableReusePort;
}
@Override @Override
public String toString() { public String toString() {
return "Advanced{" return "Advanced{"
@@ -821,6 +832,7 @@ public class VelocityConfiguration implements ProxyConfig {
+ ", logCommandExecutions=" + logCommandExecutions + ", logCommandExecutions=" + logCommandExecutions
+ ", logPlayerConnections=" + logPlayerConnections + ", logPlayerConnections=" + logPlayerConnections
+ ", acceptTransfers=" + acceptTransfers + ", acceptTransfers=" + acceptTransfers
+ ", enableReusePort=" + enableReusePort
+ '}'; + '}';
} }
} }

View File

@@ -18,6 +18,8 @@
package com.velocitypowered.proxy.network; package com.velocitypowered.proxy.network;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.velocitypowered.api.event.proxy.ListenerBoundEvent; import com.velocitypowered.api.event.proxy.ListenerBoundEvent;
import com.velocitypowered.api.event.proxy.ListenerCloseEvent; import com.velocitypowered.api.event.proxy.ListenerCloseEvent;
import com.velocitypowered.api.network.ListenerType; import com.velocitypowered.api.network.ListenerType;
@@ -28,14 +30,17 @@ import com.velocitypowered.proxy.protocol.netty.GameSpyQueryHandler;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.unix.UnixChannelOption;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.MultithreadEventExecutorGroup;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.http.HttpClient; import java.net.http.HttpClient;
import java.util.HashMap; import java.util.Collection;
import java.util.Map; import java.util.Map;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@@ -49,7 +54,7 @@ public final class ConnectionManager {
private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark(1 << 20, private static final WriteBufferWaterMark SERVER_WRITE_MARK = new WriteBufferWaterMark(1 << 20,
1 << 21); 1 << 21);
private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class); private static final Logger LOGGER = LogManager.getLogger(ConnectionManager.class);
private final Map<InetSocketAddress, Endpoint> endpoints = new HashMap<>(); private final Multimap<InetSocketAddress, Endpoint> endpoints = HashMultimap.create();
private final TransportType transportType; private final TransportType transportType;
private final EventLoopGroup bossGroup; private final EventLoopGroup bossGroup;
private final EventLoopGroup workerGroup; private final EventLoopGroup workerGroup;
@@ -93,7 +98,6 @@ public final class ConnectionManager {
public void bind(final InetSocketAddress address) { public void bind(final InetSocketAddress address) {
final ServerBootstrap bootstrap = new ServerBootstrap() final ServerBootstrap bootstrap = new ServerBootstrap()
.channelFactory(this.transportType.serverSocketChannelFactory) .channelFactory(this.transportType.serverSocketChannelFactory)
.group(this.bossGroup, this.workerGroup)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, SERVER_WRITE_MARK) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, SERVER_WRITE_MARK)
.childHandler(this.serverChannelInitializer.get()) .childHandler(this.serverChannelInitializer.get())
.childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.TCP_NODELAY, true)
@@ -104,26 +108,50 @@ public final class ConnectionManager {
bootstrap.option(ChannelOption.TCP_FASTOPEN, 3); bootstrap.option(ChannelOption.TCP_FASTOPEN, 3);
} }
bootstrap.bind() if (server.getConfiguration().isEnableReusePort()) {
.addListener((ChannelFutureListener) future -> { // We don't need a boss group, since each worker will bind to the socket
final Channel channel = future.channel(); bootstrap.option(UnixChannelOption.SO_REUSEPORT, true)
if (future.isSuccess()) { .group(this.workerGroup);
this.endpoints.put(address, new Endpoint(channel, ListenerType.MINECRAFT)); } else {
bootstrap.group(this.bossGroup, this.workerGroup);
// Warn people with console access that HAProxy is in use, see PR: #1436 }
if (this.server.getConfiguration().isProxyProtocol()) {
LOGGER.warn("Using HAProxy and listening on {}, please ensure this listener is adequately firewalled.", channel.localAddress()); final int binds = server.getConfiguration().isEnableReusePort()
? ((MultithreadEventExecutorGroup) this.workerGroup).executorCount() : 1;
for (int bind = 0; bind < binds; bind++) {
// Wait for each bind to open. If we encounter any errors, don't try to bind again.
int finalBind = bind;
ChannelFuture f = bootstrap.bind()
.addListener((ChannelFutureListener) future -> {
final Channel channel = future.channel();
if (future.isSuccess()) {
this.endpoints.put(address, new Endpoint(channel, ListenerType.MINECRAFT));
LOGGER.info("Listening on {}", channel.localAddress());
if (finalBind == 0) {
// Warn people with console access that HAProxy is in use, see PR: #1436
if (this.server.getConfiguration().isProxyProtocol()) {
LOGGER.warn(
"Using HAProxy and listening on {}, please ensure this listener is adequately firewalled.",
channel.localAddress());
}
// Fire the proxy bound event after the socket is bound
server.getEventManager().fireAndForget(
new ListenerBoundEvent(address, ListenerType.MINECRAFT));
}
} else {
LOGGER.error("Can't bind to {}", address, future.cause());
} }
});
f.syncUninterruptibly();
LOGGER.info("Listening on {}", channel.localAddress()); if (!f.isSuccess()) {
break;
// Fire the proxy bound event after the socket is bound }
server.getEventManager().fireAndForget( }
new ListenerBoundEvent(address, ListenerType.MINECRAFT));
} else {
LOGGER.error("Can't bind to {}", address, future.cause());
}
});
} }
/** /**
@@ -181,17 +209,20 @@ public final class ConnectionManager {
* @param oldBind the endpoint to close * @param oldBind the endpoint to close
*/ */
public void close(InetSocketAddress oldBind) { public void close(InetSocketAddress oldBind) {
Endpoint endpoint = endpoints.remove(oldBind); Collection<Endpoint> endpoints = this.endpoints.removeAll(oldBind);
Preconditions.checkState(!endpoints.isEmpty(), "Endpoint was not registered");
ListenerType type = endpoints.iterator().next().getType();
// Fire proxy close event to notify plugins of socket close. We block since plugins // Fire proxy close event to notify plugins of socket close. We block since plugins
// should have a chance to be notified before the server stops accepting connections. // should have a chance to be notified before the server stops accepting connections.
server.getEventManager().fire(new ListenerCloseEvent(oldBind, endpoint.getType())).join(); server.getEventManager().fire(new ListenerCloseEvent(oldBind, type)).join();
Channel serverChannel = endpoint.getChannel(); for (Endpoint endpoint : endpoints) {
Channel serverChannel = endpoint.getChannel();
Preconditions.checkState(serverChannel != null, "Endpoint %s not registered", oldBind); LOGGER.info("Closing endpoint {}", serverChannel.localAddress());
LOGGER.info("Closing endpoint {}", serverChannel.localAddress()); serverChannel.close().syncUninterruptibly();
serverChannel.close().syncUninterruptibly(); }
} }
/** /**
@@ -200,24 +231,28 @@ public final class ConnectionManager {
* @param interrupt should closing forward interruptions * @param interrupt should closing forward interruptions
*/ */
public void closeEndpoints(boolean interrupt) { public void closeEndpoints(boolean interrupt) {
for (final Map.Entry<InetSocketAddress, Endpoint> entry : this.endpoints.entrySet()) { for (final Map.Entry<InetSocketAddress, Collection<Endpoint>> entry : this.endpoints.asMap()
.entrySet()) {
final InetSocketAddress address = entry.getKey(); final InetSocketAddress address = entry.getKey();
final Endpoint endpoint = entry.getValue(); final Collection<Endpoint> endpoints = entry.getValue();
ListenerType type = endpoints.iterator().next().getType();
// Fire proxy close event to notify plugins of socket close. We block since plugins // Fire proxy close event to notify plugins of socket close. We block since plugins
// should have a chance to be notified before the server stops accepting connections. // should have a chance to be notified before the server stops accepting connections.
server.getEventManager().fire(new ListenerCloseEvent(address, endpoint.getType())).join(); server.getEventManager().fire(new ListenerCloseEvent(address, type)).join();
LOGGER.info("Closing endpoint {}", address); for (Endpoint endpoint : endpoints) {
if (interrupt) { LOGGER.info("Closing endpoint {}", address);
try { if (interrupt) {
endpoint.getChannel().close().sync(); try {
} catch (final InterruptedException e) { endpoint.getChannel().close().sync();
LOGGER.info("Interrupted whilst closing endpoint", e); } catch (final InterruptedException e) {
Thread.currentThread().interrupt(); LOGGER.info("Interrupted whilst closing endpoint", e);
Thread.currentThread().interrupt();
}
} else {
endpoint.getChannel().close().syncUninterruptibly();
} }
} else {
endpoint.getChannel().close().syncUninterruptibly();
} }
} }
this.endpoints.clear(); this.endpoints.clear();

View File

@@ -145,6 +145,12 @@ log-player-connections = true
# Transfer packet (Minecraft 1.20.5) to be received. # Transfer packet (Minecraft 1.20.5) to be received.
accepts-transfers = false accepts-transfers = false
# Enables support for SO_REUSEPORT. This may help the proxy scale better on multicore systems
# with a lot of incoming connections, and provide better CPU utilization than the existing
# strategy of having a single thread accepting connections and distributing them to worker
# threads. Disabled by default. Requires Linux or macOS.
enable-reuse-port = false
[query] [query]
# Whether to enable responding to GameSpy 4 query responses or not. # Whether to enable responding to GameSpy 4 query responses or not.
enabled = false enabled = false