diff --git a/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/api/BallAPI.java b/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/api/BallAPI.java index 3f5ee77..783e27a 100644 --- a/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/api/BallAPI.java +++ b/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/api/BallAPI.java @@ -20,11 +20,13 @@ import cn.hamster3.mc.plugin.core.common.data.DisplayMessage; import cn.hamster3.mc.plugin.core.common.util.CoreUtils; import com.google.common.eventbus.AsyncEventBus; import com.google.common.eventbus.EventBus; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; import lombok.Getter; import net.kyori.adventure.text.Component; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import redis.clients.jedis.Jedis; import javax.sql.DataSource; import java.sql.*; @@ -69,7 +71,7 @@ public abstract class BallAPI { private final Map allPlayerInfo; @NotNull - private final Jedis redisSub; + private final StatefulRedisPubSubConnection redisPubSub; @Nullable private ScheduledFuture lockUpdater; @@ -92,7 +94,7 @@ public abstract class BallAPI { eventBus.register(BallCommonListener.INSTANCE); allServerInfo = new ConcurrentHashMap<>(); allPlayerInfo = new ConcurrentHashMap<>(); - redisSub = CoreAPI.getInstance().getJedisPool().getResource(); + redisPubSub = CoreAPI.getInstance().getRedisClient().connectPubSub(); getLogger().info("频道前缀: " + ballConfig.getChannelPrefix()); getLogger().info("启用子服更新玩家状态: " + ballConfig.isGameServerUpdatePlayerInfo()); if (ballConfig.isGameServerUpdatePlayerInfo()) { @@ -102,29 +104,31 @@ public abstract class BallAPI { getLogger().warning("已启用调试模式"); eventBus.register(BallDebugListener.INSTANCE); } - CoreAPI.getInstance().getExecutorService().submit(() -> redisSub.subscribe(BallRedisListener.INSTANCE, BALL_CHANNEL)); + redisPubSub.addListener(BallRedisListener.INSTANCE); + redisPubSub.sync().subscribe(BALL_CHANNEL); } protected void enable() throws SQLException, InterruptedException { - try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { + try (StatefulRedisConnection connect = CoreAPI.getInstance().getRedisClient().connect()) { + RedisCommands redis = connect.sync(); String key = "HamsterBall:ServerInfo:" + localServerInfo.getId(); - if (jedis.exists(key) && ballConfig.isSingletonServerID()) { + if (redis.exists(key) > 0 && ballConfig.isSingletonServerID()) { throw new IllegalStateException("已经有一个服务器占用了该 ID"); } - jedis.hset(key, "id", localServerInfo.getId()); - jedis.hset(key, "name", localServerInfo.getName()); - jedis.hset(key, "type", localServerInfo.getType().name()); - jedis.hset(key, "host", localServerInfo.getHost()); - jedis.hset(key, "port", String.valueOf(localServerInfo.getPort())); - jedis.expire(key, 180); + redis.hset(key, "id", localServerInfo.getId()); + redis.hset(key, "name", localServerInfo.getName()); + redis.hset(key, "type", localServerInfo.getType().name()); + redis.hset(key, "host", localServerInfo.getHost()); + redis.hset(key, "port", String.valueOf(localServerInfo.getPort())); + redis.expire(key, 180); lockUpdater = CoreAPI.getInstance().getScheduledService().scheduleAtFixedRate(LockUpdateThread.INSTANCE, 1, 1, TimeUnit.MINUTES); - for (String serverInfoKey : jedis.keys("HamsterBall:ServerInfo:*")) { + for (String serverInfoKey : redis.keys("HamsterBall:ServerInfo:*")) { BallServerInfo info = new BallServerInfo( - jedis.hget(serverInfoKey, "id"), - jedis.hget(serverInfoKey, "name"), - BallServerType.valueOf(jedis.hget(serverInfoKey, "type")), - jedis.hget(serverInfoKey, "host"), - Integer.parseInt(jedis.hget(serverInfoKey, "port")) + redis.hget(serverInfoKey, "id"), + redis.hget(serverInfoKey, "name"), + BallServerType.valueOf(redis.hget(serverInfoKey, "type")), + redis.hget(serverInfoKey, "host"), + Integer.parseInt(redis.hget(serverInfoKey, "port")) ); allServerInfo.put(info.getId(), info); } @@ -192,9 +196,10 @@ public abstract class BallAPI { if (lockUpdater != null) { lockUpdater.cancel(true); lockUpdater = null; - try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { + try (StatefulRedisConnection connect = CoreAPI.getInstance().getRedisClient().connect()) { + RedisCommands redis = connect.sync(); String key = "HamsterBall:ServerInfo:" + localServerInfo.getId(); - jedis.del(key); + redis.del(key); } } try (Connection connection = getDatasource().getConnection()) { @@ -206,7 +211,7 @@ public abstract class BallAPI { statement.executeUpdate(); } } - redisSub.close(); + redisPubSub.close(); } /** @@ -488,16 +493,12 @@ public abstract class BallAPI { channel = ballConfig.getChannelPrefix() + channel; } if (block) { - try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { - jedis.publish(channel, CoreAPI.getInstance().getGson().toJson(message)); - } + redisPubSub.sync().publish(channel, CoreAPI.getInstance().getGson().toJson(message)); eventBus.post(new MessageSentEvent(channel, message)); } else { @NotNull String finalChannel = channel; - CoreAPI.getInstance().getExecutorService().submit(() -> { - try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { - jedis.publish(finalChannel, CoreAPI.getInstance().getGson().toJson(message)); - } + CoreAPI.getInstance().getExecutorService().execute(() -> { + redisPubSub.sync().publish(finalChannel, CoreAPI.getInstance().getGson().toJson(message)); eventBus.post(new MessageSentEvent(finalChannel, message)); }); } @@ -523,14 +524,7 @@ public abstract class BallAPI { * @param channels 频道名称 */ public void subscribeRaw(@NotNull String... channels) { - BallRedisListener.INSTANCE.subscribe(channels); -// CoreAPI.getInstance().getExecutorService().submit(() -> { -// try { -// redisSub.subscribe(BallRedisListener.INSTANCE, channels); -// } catch (Exception | Error e) { -// e.printStackTrace(); -// } -// }); + redisPubSub.sync().subscribe(channels); } /** @@ -539,10 +533,7 @@ public abstract class BallAPI { * @param patterns 频道名称正则表达式 */ public void subscribePatterns(@NotNull String patterns) { - BallRedisListener.INSTANCE.psubscribe(patterns); -// CoreAPI.getInstance().getExecutorService().submit( -// () -> redisSub.psubscribe(BallRedisListener.INSTANCE, patterns) -// ); + redisPubSub.sync().psubscribe(patterns); } /** @@ -565,7 +556,7 @@ public abstract class BallAPI { * @param channels 频道名称 */ public void unsubscribeRaw(@NotNull String... channels) { - BallRedisListener.INSTANCE.unsubscribe(channels); + redisPubSub.sync().unsubscribe(channels); } /** @@ -574,7 +565,7 @@ public abstract class BallAPI { * @param patterns 频道名称正则表达式 */ public void unsubscribePatterns(@NotNull String patterns) { - BallRedisListener.INSTANCE.punsubscribe(patterns); + redisPubSub.sync().punsubscribe(patterns); } @NotNull diff --git a/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/listener/BallRedisListener.java b/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/listener/BallRedisListener.java index c075702..dfb18cc 100644 --- a/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/listener/BallRedisListener.java +++ b/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/listener/BallRedisListener.java @@ -5,18 +5,17 @@ import cn.hamster3.mc.plugin.ball.common.data.BallMessage; import cn.hamster3.mc.plugin.ball.common.event.message.MessageReceivedEvent; import cn.hamster3.mc.plugin.core.common.api.CoreAPI; import com.google.common.eventbus.EventBus; -import redis.clients.jedis.JedisPubSub; +import io.lettuce.core.pubsub.RedisPubSubListener; import java.util.logging.Level; -public class BallRedisListener extends JedisPubSub { +public class BallRedisListener implements RedisPubSubListener { public static final BallRedisListener INSTANCE = new BallRedisListener(); private BallRedisListener() { } - @Override - public void onMessage(String channel, String message) { + public void handle(String channel, String message) { CoreAPI.getInstance().getExecutorService().submit(() -> { try { String finalChannel = channel; @@ -40,27 +39,32 @@ public class BallRedisListener extends JedisPubSub { } @Override - public void onPMessage(String pattern, String channel, String message) { - onMessage(channel, message); + public void message(String channel, String message) { + handle(channel, message); } @Override - public void onSubscribe(String channel, int subscribedChannels) { + public void message(String pattern, String channel, String message) { + handle(channel, message); + } + + @Override + public void subscribed(String channel, long count) { BallAPI.getInstance().getLogger().info("已订阅 redis 频道 " + channel); } @Override - public void onUnsubscribe(String channel, int subscribedChannels) { - BallAPI.getInstance().getLogger().info("已取消订阅 redis 频道 " + channel); - } - - @Override - public void onPSubscribe(String pattern, int subscribedChannels) { + public void psubscribed(String pattern, long count) { BallAPI.getInstance().getLogger().info("已订阅 redis 频道(正则) " + pattern); } @Override - public void onPUnsubscribe(String pattern, int subscribedChannels) { + public void unsubscribed(String channel, long count) { + BallAPI.getInstance().getLogger().info("已取消订阅 redis 频道 " + channel); + } + + @Override + public void punsubscribed(String pattern, long count) { BallAPI.getInstance().getLogger().info("已取消订阅 redis 频道(正则) " + pattern); } } diff --git a/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/thread/LockUpdateThread.java b/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/thread/LockUpdateThread.java index f839426..c010f73 100644 --- a/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/thread/LockUpdateThread.java +++ b/ball-common/src/main/java/cn/hamster3/mc/plugin/ball/common/thread/LockUpdateThread.java @@ -2,7 +2,8 @@ package cn.hamster3.mc.plugin.ball.common.thread; import cn.hamster3.mc.plugin.ball.common.api.BallAPI; import cn.hamster3.mc.plugin.core.common.api.CoreAPI; -import redis.clients.jedis.Jedis; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.api.sync.RedisCommands; public class LockUpdateThread implements Runnable { public static final LockUpdateThread INSTANCE = new LockUpdateThread(); @@ -13,8 +14,9 @@ public class LockUpdateThread implements Runnable { @Override public void run() { String key = "HamsterBall:ServerInfo:" + BallAPI.getInstance().getLocalServerInfo().getId(); - try (Jedis jedis = CoreAPI.getInstance().getJedisPool().getResource()) { - jedis.expire(key, 180); + try (StatefulRedisConnection connect = CoreAPI.getInstance().getRedisClient().connect()) { + RedisCommands redis = connect.sync(); + redis.expire(key, 180); } } } diff --git a/build.gradle.kts b/build.gradle.kts index 36292a2..07576de 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,7 +11,7 @@ plugins { } group = "cn.hamster3.mc.plugin" -version = "1.7.2" +version = "1.8.0" description = "基于 Redis 的 Minecraft 服务端通用消息中间件" subprojects { @@ -65,10 +65,9 @@ subprojects { repositories { maven { url = uri("https://maven.airgame.net/public") - credentials { - username = rootProject.properties.getOrDefault("maven_username", "").toString() - password = rootProject.properties.getOrDefault("maven_password", "").toString() + username = findProperty("MAVEN_AIRGAME_USERNAME")?.toString() ?: "" + password = findProperty("MAVEN_AIRGAME_PASSWORD")?.toString() ?: "" } } }