feat: 使用 lettuce-core 连接 redis 提升稳定性

This commit is contained in:
2025-07-03 02:36:20 +08:00
parent 95c0a23857
commit 25828267f6
4 changed files with 59 additions and 63 deletions

View File

@@ -20,11 +20,13 @@ import cn.hamster3.mc.plugin.core.common.data.DisplayMessage;
import cn.hamster3.mc.plugin.core.common.util.CoreUtils; import cn.hamster3.mc.plugin.core.common.util.CoreUtils;
import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus; import com.google.common.eventbus.EventBus;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import lombok.Getter; import lombok.Getter;
import net.kyori.adventure.text.Component; import net.kyori.adventure.text.Component;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.Nullable;
import redis.clients.jedis.Jedis;
import javax.sql.DataSource; import javax.sql.DataSource;
import java.sql.*; import java.sql.*;
@@ -69,7 +71,7 @@ public abstract class BallAPI {
private final Map<UUID, BallPlayerInfo> allPlayerInfo; private final Map<UUID, BallPlayerInfo> allPlayerInfo;
@NotNull @NotNull
private final Jedis redisSub; private final StatefulRedisPubSubConnection<String, String> redisPubSub;
@Nullable @Nullable
private ScheduledFuture<?> lockUpdater; private ScheduledFuture<?> lockUpdater;
@@ -92,7 +94,7 @@ public abstract class BallAPI {
eventBus.register(BallCommonListener.INSTANCE); eventBus.register(BallCommonListener.INSTANCE);
allServerInfo = new ConcurrentHashMap<>(); allServerInfo = new ConcurrentHashMap<>();
allPlayerInfo = new ConcurrentHashMap<>(); allPlayerInfo = new ConcurrentHashMap<>();
redisSub = CoreAPI.getInstance().getJedisPool().getResource(); redisPubSub = CoreAPI.getInstance().getRedisClient().connectPubSub();
getLogger().info("频道前缀: " + ballConfig.getChannelPrefix()); getLogger().info("频道前缀: " + ballConfig.getChannelPrefix());
getLogger().info("启用子服更新玩家状态: " + ballConfig.isGameServerUpdatePlayerInfo()); getLogger().info("启用子服更新玩家状态: " + ballConfig.isGameServerUpdatePlayerInfo());
if (ballConfig.isGameServerUpdatePlayerInfo()) { if (ballConfig.isGameServerUpdatePlayerInfo()) {
@@ -102,29 +104,31 @@ public abstract class BallAPI {
getLogger().warning("已启用调试模式"); getLogger().warning("已启用调试模式");
eventBus.register(BallDebugListener.INSTANCE); eventBus.register(BallDebugListener.INSTANCE);
} }
CoreAPI.getInstance().getExecutorService().submit(() -> redisSub.subscribe(BallRedisListener.INSTANCE, BALL_CHANNEL)); redisPubSub.addListener(BallRedisListener.INSTANCE);
redisPubSub.sync().subscribe(BALL_CHANNEL);
} }
protected void enable() throws SQLException, InterruptedException { protected void enable() throws SQLException, InterruptedException {
try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { try (StatefulRedisConnection<String, String> connect = CoreAPI.getInstance().getRedisClient().connect()) {
RedisCommands<String, String> redis = connect.sync();
String key = "HamsterBall:ServerInfo:" + localServerInfo.getId(); String key = "HamsterBall:ServerInfo:" + localServerInfo.getId();
if (jedis.exists(key) && ballConfig.isSingletonServerID()) { if (redis.exists(key) > 0 && ballConfig.isSingletonServerID()) {
throw new IllegalStateException("已经有一个服务器占用了该 ID"); throw new IllegalStateException("已经有一个服务器占用了该 ID");
} }
jedis.hset(key, "id", localServerInfo.getId()); redis.hset(key, "id", localServerInfo.getId());
jedis.hset(key, "name", localServerInfo.getName()); redis.hset(key, "name", localServerInfo.getName());
jedis.hset(key, "type", localServerInfo.getType().name()); redis.hset(key, "type", localServerInfo.getType().name());
jedis.hset(key, "host", localServerInfo.getHost()); redis.hset(key, "host", localServerInfo.getHost());
jedis.hset(key, "port", String.valueOf(localServerInfo.getPort())); redis.hset(key, "port", String.valueOf(localServerInfo.getPort()));
jedis.expire(key, 180); redis.expire(key, 180);
lockUpdater = CoreAPI.getInstance().getScheduledService().scheduleAtFixedRate(LockUpdateThread.INSTANCE, 1, 1, TimeUnit.MINUTES); lockUpdater = CoreAPI.getInstance().getScheduledService().scheduleAtFixedRate(LockUpdateThread.INSTANCE, 1, 1, TimeUnit.MINUTES);
for (String serverInfoKey : jedis.keys("HamsterBall:ServerInfo:*")) { for (String serverInfoKey : redis.keys("HamsterBall:ServerInfo:*")) {
BallServerInfo info = new BallServerInfo( BallServerInfo info = new BallServerInfo(
jedis.hget(serverInfoKey, "id"), redis.hget(serverInfoKey, "id"),
jedis.hget(serverInfoKey, "name"), redis.hget(serverInfoKey, "name"),
BallServerType.valueOf(jedis.hget(serverInfoKey, "type")), BallServerType.valueOf(redis.hget(serverInfoKey, "type")),
jedis.hget(serverInfoKey, "host"), redis.hget(serverInfoKey, "host"),
Integer.parseInt(jedis.hget(serverInfoKey, "port")) Integer.parseInt(redis.hget(serverInfoKey, "port"))
); );
allServerInfo.put(info.getId(), info); allServerInfo.put(info.getId(), info);
} }
@@ -192,9 +196,10 @@ public abstract class BallAPI {
if (lockUpdater != null) { if (lockUpdater != null) {
lockUpdater.cancel(true); lockUpdater.cancel(true);
lockUpdater = null; lockUpdater = null;
try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { try (StatefulRedisConnection<String, String> connect = CoreAPI.getInstance().getRedisClient().connect()) {
RedisCommands<String, String> redis = connect.sync();
String key = "HamsterBall:ServerInfo:" + localServerInfo.getId(); String key = "HamsterBall:ServerInfo:" + localServerInfo.getId();
jedis.del(key); redis.del(key);
} }
} }
try (Connection connection = getDatasource().getConnection()) { try (Connection connection = getDatasource().getConnection()) {
@@ -206,7 +211,7 @@ public abstract class BallAPI {
statement.executeUpdate(); statement.executeUpdate();
} }
} }
redisSub.close(); redisPubSub.close();
} }
/** /**
@@ -488,16 +493,12 @@ public abstract class BallAPI {
channel = ballConfig.getChannelPrefix() + channel; channel = ballConfig.getChannelPrefix() + channel;
} }
if (block) { if (block) {
try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { redisPubSub.sync().publish(channel, CoreAPI.getInstance().getGson().toJson(message));
jedis.publish(channel, CoreAPI.getInstance().getGson().toJson(message));
}
eventBus.post(new MessageSentEvent(channel, message)); eventBus.post(new MessageSentEvent(channel, message));
} else { } else {
@NotNull String finalChannel = channel; @NotNull String finalChannel = channel;
CoreAPI.getInstance().getExecutorService().submit(() -> { CoreAPI.getInstance().getExecutorService().execute(() -> {
try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { redisPubSub.sync().publish(finalChannel, CoreAPI.getInstance().getGson().toJson(message));
jedis.publish(finalChannel, CoreAPI.getInstance().getGson().toJson(message));
}
eventBus.post(new MessageSentEvent(finalChannel, message)); eventBus.post(new MessageSentEvent(finalChannel, message));
}); });
} }
@@ -523,14 +524,7 @@ public abstract class BallAPI {
* @param channels 频道名称 * @param channels 频道名称
*/ */
public void subscribeRaw(@NotNull String... channels) { public void subscribeRaw(@NotNull String... channels) {
BallRedisListener.INSTANCE.subscribe(channels); redisPubSub.sync().subscribe(channels);
// CoreAPI.getInstance().getExecutorService().submit(() -> {
// try {
// redisSub.subscribe(BallRedisListener.INSTANCE, channels);
// } catch (Exception | Error e) {
// e.printStackTrace();
// }
// });
} }
/** /**
@@ -539,10 +533,7 @@ public abstract class BallAPI {
* @param patterns 频道名称正则表达式 * @param patterns 频道名称正则表达式
*/ */
public void subscribePatterns(@NotNull String patterns) { public void subscribePatterns(@NotNull String patterns) {
BallRedisListener.INSTANCE.psubscribe(patterns); redisPubSub.sync().psubscribe(patterns);
// CoreAPI.getInstance().getExecutorService().submit(
// () -> redisSub.psubscribe(BallRedisListener.INSTANCE, patterns)
// );
} }
/** /**
@@ -565,7 +556,7 @@ public abstract class BallAPI {
* @param channels 频道名称 * @param channels 频道名称
*/ */
public void unsubscribeRaw(@NotNull String... channels) { public void unsubscribeRaw(@NotNull String... channels) {
BallRedisListener.INSTANCE.unsubscribe(channels); redisPubSub.sync().unsubscribe(channels);
} }
/** /**
@@ -574,7 +565,7 @@ public abstract class BallAPI {
* @param patterns 频道名称正则表达式 * @param patterns 频道名称正则表达式
*/ */
public void unsubscribePatterns(@NotNull String patterns) { public void unsubscribePatterns(@NotNull String patterns) {
BallRedisListener.INSTANCE.punsubscribe(patterns); redisPubSub.sync().punsubscribe(patterns);
} }
@NotNull @NotNull

View File

@@ -5,18 +5,17 @@ import cn.hamster3.mc.plugin.ball.common.data.BallMessage;
import cn.hamster3.mc.plugin.ball.common.event.message.MessageReceivedEvent; import cn.hamster3.mc.plugin.ball.common.event.message.MessageReceivedEvent;
import cn.hamster3.mc.plugin.core.common.api.CoreAPI; import cn.hamster3.mc.plugin.core.common.api.CoreAPI;
import com.google.common.eventbus.EventBus; import com.google.common.eventbus.EventBus;
import redis.clients.jedis.JedisPubSub; import io.lettuce.core.pubsub.RedisPubSubListener;
import java.util.logging.Level; import java.util.logging.Level;
public class BallRedisListener extends JedisPubSub { public class BallRedisListener implements RedisPubSubListener<String, String> {
public static final BallRedisListener INSTANCE = new BallRedisListener(); public static final BallRedisListener INSTANCE = new BallRedisListener();
private BallRedisListener() { private BallRedisListener() {
} }
@Override public void handle(String channel, String message) {
public void onMessage(String channel, String message) {
CoreAPI.getInstance().getExecutorService().submit(() -> { CoreAPI.getInstance().getExecutorService().submit(() -> {
try { try {
String finalChannel = channel; String finalChannel = channel;
@@ -40,27 +39,32 @@ public class BallRedisListener extends JedisPubSub {
} }
@Override @Override
public void onPMessage(String pattern, String channel, String message) { public void message(String channel, String message) {
onMessage(channel, message); handle(channel, message);
} }
@Override @Override
public void onSubscribe(String channel, int subscribedChannels) { public void message(String pattern, String channel, String message) {
handle(channel, message);
}
@Override
public void subscribed(String channel, long count) {
BallAPI.getInstance().getLogger().info("已订阅 redis 频道 " + channel); BallAPI.getInstance().getLogger().info("已订阅 redis 频道 " + channel);
} }
@Override @Override
public void onUnsubscribe(String channel, int subscribedChannels) { public void psubscribed(String pattern, long count) {
BallAPI.getInstance().getLogger().info("已取消订阅 redis 频道 " + channel);
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
BallAPI.getInstance().getLogger().info("已订阅 redis 频道(正则) " + pattern); BallAPI.getInstance().getLogger().info("已订阅 redis 频道(正则) " + pattern);
} }
@Override @Override
public void onPUnsubscribe(String pattern, int subscribedChannels) { public void unsubscribed(String channel, long count) {
BallAPI.getInstance().getLogger().info("已取消订阅 redis 频道 " + channel);
}
@Override
public void punsubscribed(String pattern, long count) {
BallAPI.getInstance().getLogger().info("已取消订阅 redis 频道(正则) " + pattern); BallAPI.getInstance().getLogger().info("已取消订阅 redis 频道(正则) " + pattern);
} }
} }

View File

@@ -2,7 +2,8 @@ package cn.hamster3.mc.plugin.ball.common.thread;
import cn.hamster3.mc.plugin.ball.common.api.BallAPI; import cn.hamster3.mc.plugin.ball.common.api.BallAPI;
import cn.hamster3.mc.plugin.core.common.api.CoreAPI; import cn.hamster3.mc.plugin.core.common.api.CoreAPI;
import redis.clients.jedis.Jedis; import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
public class LockUpdateThread implements Runnable { public class LockUpdateThread implements Runnable {
public static final LockUpdateThread INSTANCE = new LockUpdateThread(); public static final LockUpdateThread INSTANCE = new LockUpdateThread();
@@ -13,8 +14,9 @@ public class LockUpdateThread implements Runnable {
@Override @Override
public void run() { public void run() {
String key = "HamsterBall:ServerInfo:" + BallAPI.getInstance().getLocalServerInfo().getId(); String key = "HamsterBall:ServerInfo:" + BallAPI.getInstance().getLocalServerInfo().getId();
try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { try (StatefulRedisConnection<String, String> connect = CoreAPI.getInstance().getRedisClient().connect()) {
jedis.expire(key, 180); RedisCommands<String, String> redis = connect.sync();
redis.expire(key, 180);
} }
} }
} }

View File

@@ -11,7 +11,7 @@ plugins {
} }
group = "cn.hamster3.mc.plugin" group = "cn.hamster3.mc.plugin"
version = "1.7.2" version = "1.8.0"
description = "基于 Redis 的 Minecraft 服务端通用消息中间件" description = "基于 Redis 的 Minecraft 服务端通用消息中间件"
subprojects { subprojects {
@@ -65,10 +65,9 @@ subprojects {
repositories { repositories {
maven { maven {
url = uri("https://maven.airgame.net/public") url = uri("https://maven.airgame.net/public")
credentials { credentials {
username = rootProject.properties.getOrDefault("maven_username", "").toString() username = findProperty("MAVEN_AIRGAME_USERNAME")?.toString() ?: ""
password = rootProject.properties.getOrDefault("maven_password", "").toString() password = findProperty("MAVEN_AIRGAME_PASSWORD")?.toString() ?: ""
} }
} }
} }