瀏覽代碼

feat:完善功能

王智勇 5 年之前
父節點
當前提交
0e032b021d

+ 21 - 21
README.md

@@ -10,27 +10,27 @@
 
 #### 基础功能
 
-- 动态调整服务端设置提高性能
-- 心跳检测
-- 闲时检测 清除或处理长时间无响应客户端
-- 自动编/解码
-- 自定义数据包编码 实现 256 种操作的解析(可拓展)
-- 支持任意类型数据的发送,配合操作解析
-- 基于JWT 的登录认证
-- 授权检查,清除未授权客户端
-- 消息确认ACK
-- 非主动下线的断线检测与重连
-- 支持客户端查询与下线等操作
-- 微服务化 支持其他服务动态调用
-- 支持多种序列化方式(现暂json)
-- ssl 通道加密 保证数据安全
-- data数据加密 保证数据安全
-- ID 规则设计 便于后期分布式拓展
-- session信息持久化或缓存
-- 异常监控 便于错误发现
-- 日志 便于错误处理
-- 线程池 提高执行效率 缓解通道压力
-- 数据压缩 提高传输效率
+- [x] 动态调整服务端设置提高性能
+- [x] 心跳检测
+- [x] 闲时检测 清除或处理长时间无响应客户端
+- [x] 自动编/解码
+- [x] 自定义数据包编码 实现 256 种操作的解析(可拓展)
+- [x] 支持任意类型数据的发送,配合操作解析
+- [x] 基于JWT 的登录认证
+- [x] 授权检查,清除未授权客户端
+- [x] 消息确认ACK
+- [x] 非主动下线的断线检测与重连
+- [x] 支持客户端查询与下线等操作
+- [ ] 微服务化 支持其他服务动态调用
+- [ ] 支持多种序列化方式(现暂json)
+- [ ] ssl 通道加密 保证数据安全
+- [ ] data数据加密 保证数据安全
+- [ ] ID 规则设计 便于后期分布式拓展
+- [ ] session信息持久化或缓存
+- [ ] 异常监控 便于错误发现
+- [ ] 日志 便于错误处理
+- [ ] 线程池 提高执行效率 缓解通道压力
+- [ ] 数据压缩 提高传输效率
 
 #### 拓展功能
 

+ 1 - 1
pom.xml

@@ -14,7 +14,7 @@
     <groupId>dzdy</groupId>
     <artifactId>dim</artifactId>
     <version>1.0-SNAPSHOT</version>
-    <description> push service basic on netty</description>
+    <description>push service power by netty</description>
 
     <properties>
         <java.version>1.8</java.version>

+ 1 - 1
src/main/java/com/dzdy/dim/api/ClientApi.java

@@ -1,6 +1,6 @@
 package com.dzdy.dim.api;
 
-import com.dzdy.dim.session.Session;
+import com.dzdy.dim.contants.Session;
 
 import java.util.List;
 

+ 7 - 9
src/main/java/com/dzdy/dim/api/PushApi.java

@@ -1,8 +1,6 @@
 package com.dzdy.dim.api;
 
-import com.dzdy.dim.session.Session;
-
-import java.util.List;
+import com.dzdy.dim.contants.PushParam;
 
 /**
  * 推送api
@@ -16,18 +14,18 @@ public interface PushApi {
     /**
      * 推送数据
      *
-     * @param sessions 设备集合
-     * @param data     数据
+     * @param pushParam #sessions 设备集合
+     * @param pushParam #data     数据
      * @return
      */
-    public R<Boolean> pushBySession(List<Session> sessions, Object data);
+    public R<Boolean> pushBySession(PushParam pushParam);
 
     /**
      * 推送数据
      *
-     * @param userId 用户id
-     * @param data   数据
+     * @param pushParam #userId 用户id
+     * @param pushParam #data   数据
      * @return
      */
-    public R<Boolean> pushByUserId(String userId, Object data);
+    public R<Boolean> pushByUserId(PushParam pushParam);
 }

+ 8 - 4
src/main/java/com/dzdy/dim/api/impl/ClientApiImpl.java

@@ -2,7 +2,8 @@ package com.dzdy.dim.api.impl;
 
 import com.dzdy.dim.api.ClientApi;
 import com.dzdy.dim.api.R;
-import com.dzdy.dim.session.Session;
+import com.dzdy.dim.contants.Session;
+import com.dzdy.dim.protocol.response.LogoutResponsePacket;
 import com.dzdy.dim.util.SessionUtil;
 import io.netty.channel.Channel;
 import org.springframework.web.bind.annotation.*;
@@ -29,11 +30,14 @@ public class ClientApiImpl implements ClientApi {
     @DeleteMapping("/userId/{sessionId}")
     public R<Boolean> removeSession(@PathVariable String sessionId) {
         Channel channel = SessionUtil.getChannel(sessionId);
-        channel.close().addListener(future -> {
+        SessionUtil.unBind(channel);
+        LogoutResponsePacket logoutResponsePacket = new LogoutResponsePacket();
+        logoutResponsePacket.setSuccess(true);
+        channel.writeAndFlush(logoutResponsePacket).addListener(future -> {
             if (future.isSuccess()) {
-                System.out.println("客户端关闭成功");
+                System.out.println("客户端下线成功");
             } else {
-                System.out.println("客户端关闭失败:" + future.cause());
+                System.out.println("客户端下线失败:" + future.cause());
             }
         });
         return R.ok(true);

+ 9 - 6
src/main/java/com/dzdy/dim/api/impl/PushApiImpl.java

@@ -2,14 +2,15 @@ package com.dzdy.dim.api.impl;
 
 import com.dzdy.dim.api.PushApi;
 import com.dzdy.dim.api.R;
-import com.dzdy.dim.session.Session;
+import com.dzdy.dim.contants.PushParam;
 import com.dzdy.dim.util.PushUtil;
 import com.dzdy.dim.util.SessionUtil;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.util.List;
+import javax.validation.Valid;
 
 /**
  * 推送消息
@@ -22,17 +23,19 @@ import java.util.List;
 @RestController
 @RequestMapping("push")
 public class PushApiImpl implements PushApi {
+
     @Override
     @PostMapping("pushBySession")
-    public R<Boolean> pushBySession(List<Session> sessions, Object data) {
-        sessions.forEach(session -> PushUtil.push(session, data));
+    public R<Boolean> pushBySession(@Valid @RequestBody PushParam pushParam) {
+        pushParam.getSessions().forEach(session -> PushUtil.push(session, pushParam.getCommand(), pushParam.getPushData()));
         return R.ok(true);
     }
 
     @Override
     @PostMapping("pushByUserId")
-    public R<Boolean> pushByUserId(String userId, Object data) {
-        SessionUtil.getSession(userId).forEach(session -> PushUtil.push(session, data));
+    public R<Boolean> pushByUserId(@Valid @RequestBody PushParam pushParam) {
+        pushParam.getUserIds().forEach(userId -> SessionUtil.getSession(userId)
+                .forEach(session -> PushUtil.push(session, pushParam.getCommand(), pushParam.getPushData())));
         return R.ok(true);
     }
 }

+ 14 - 16
src/main/java/com/dzdy/dim/client/NettyClient.java

@@ -1,9 +1,6 @@
 package com.dzdy.dim.client;
 
-import com.dzdy.dim.client.handler.HeartBeatTimerHandler;
-import com.dzdy.dim.client.handler.LoginResponseHandler;
-import com.dzdy.dim.client.handler.LogoutResponseHandler;
-import com.dzdy.dim.client.handler.SockDataRequestHandler;
+import com.dzdy.dim.client.handler.*;
 import com.dzdy.dim.handler.IMIdleStateHandler;
 import com.dzdy.dim.handler.PacketDecoder;
 import com.dzdy.dim.handler.PacketEncoder;
@@ -29,18 +26,20 @@ public class NettyClient {
     /**
      * 最大连接重试次数
      */
-    private static final int MAX_RETRY = 5;
+    public static final int MAX_RETRY = 5;
+    private static final int PORT = 9002;
+    private static final String HOST = "127.0.0.1";
+    private static Bootstrap BOOTSTRAP = new Bootstrap();
 
     public static void main(String[] args) {
         connect();
     }
 
     private static void connect() {
-        Bootstrap bootstrap = new Bootstrap();
 
         NioEventLoopGroup group = new NioEventLoopGroup();
 
-        bootstrap.group(group).channel(NioSocketChannel.class)
+        BOOTSTRAP.group(group).channel(NioSocketChannel.class)
                 // I/O 处理逻辑
                 .handler(new ChannelInitializer<Channel>() {
                     @Override
@@ -48,6 +47,8 @@ public class NettyClient {
                         ch.pipeline().addLast(new IMIdleStateHandler());
                         ch.pipeline().addLast(new Spliter());
                         ch.pipeline().addLast(new PacketDecoder());
+                        // 心跳检测响应
+                        ch.pipeline().addLast(new HeartBeatResponseHandler());
                         // 登录响应
                         ch.pipeline().addLast(new LoginResponseHandler());
                         // 登出响应
@@ -59,10 +60,10 @@ public class NettyClient {
                     }
                 });
 
-        connect(bootstrap, "127.0.0.1", 9002, MAX_RETRY);
+        connect(MAX_RETRY);
 
         //连接超时
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
+        BOOTSTRAP.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                 .option(ChannelOption.SO_KEEPALIVE, true)
                 .option(ChannelOption.TCP_NODELAY, true);
 
@@ -75,7 +76,7 @@ public class NettyClient {
      */
     private static void login(Channel channel) {
         LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
-        loginRequestPacket.setToken("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NTk1NDY5MDMsInVzZXJfbmFtZSI6ImpnemgwMSIsImF1dGhvcml0aWVzIjpbIlJPTEVfT1BFUkFUT1IiXSwianRpIjoiZjA2NmQxMGMtZWVmMy00Mzk4LTg0NjctOTNiNGYwNWRiMTEwIiwiY2xpZW50X2lkIjoiYW5waW4tb3BzLXBjIiwic2NvcGUiOlsic2VsZWN0Il19.jKTFpZitBmfnLsjssOa5EBNmSwjYu90AgcfDy35fqN4");
+        loginRequestPacket.setToken("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjI1NTk1NDY5MDMsInVzZXJfbmFtZSI6ImpnemgwMSIsImF1dGhvcml0aWVzIjpbIlJPTEVfT1BFUkFUT1IiXSwianRpIjoiZjA2NmQxMGMtZWVmMy00Mzk4LTg0NjctOTNiNGYwNWRiMTEwIiwiY2xpZW50X2lkIjoiYW5waW4tb3BzLXBjIiwic2NvcGUiOlsic2VsZWN0Il19.yB-K7s25d9cVUCT4Ec4ag5T4uJSa0X2hWNM5UJdinLk");
         channel.writeAndFlush(loginRequestPacket);
     }
 
@@ -83,13 +84,10 @@ public class NettyClient {
     /**
      * 连接重试
      *
-     * @param bootstrap
-     * @param host
-     * @param port
      * @param retry
      */
-    private static void connect(Bootstrap bootstrap, String host, int port, int retry) {
-        bootstrap.connect(host, port).addListener(future -> {
+    public static void connect(int retry) {
+        BOOTSTRAP.connect(HOST, PORT).addListener(future -> {
             if (future.isSuccess()) {
                 Channel channel = ((ChannelFuture) future).channel();
                 login(channel);
@@ -101,7 +99,7 @@ public class NettyClient {
                 // 本次重连的间隔
                 int delay = 1 << order;
                 System.err.println(System.currentTimeMillis() + ": 连接失败,第" + order + "次重连……");
-                bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit
+                BOOTSTRAP.config().group().schedule(() -> connect(retry - 1), delay, TimeUnit
                         .SECONDS);
             }
         });

+ 19 - 0
src/main/java/com/dzdy/dim/client/handler/HeartBeatResponseHandler.java

@@ -0,0 +1,19 @@
+package com.dzdy.dim.client.handler;
+
+import com.dzdy.dim.protocol.response.HeartBeatResponsePacket;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+
+import java.util.Date;
+
+/**
+ * @author : wangzhiyong
+ * @date : 2019/6/4 15:03
+ * description :
+ */
+public class HeartBeatResponseHandler extends SimpleChannelInboundHandler<HeartBeatResponsePacket> {
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, HeartBeatResponsePacket msg) throws Exception {
+        System.out.println(new Date() + " 客户端收到服务端心跳返回");
+    }
+}

+ 5 - 1
src/main/java/com/dzdy/dim/client/handler/LoginResponseHandler.java

@@ -1,10 +1,13 @@
 package com.dzdy.dim.client.handler;
 
+import com.dzdy.dim.client.NettyClient;
 import com.dzdy.dim.protocol.response.LoginResponsePacket;
 import com.dzdy.dim.util.SessionUtil;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 
+import static com.dzdy.dim.client.NettyClient.MAX_RETRY;
+
 /**
  * @author : wangzhiyong
  * @date : 2019/5/28 17:40
@@ -23,6 +26,7 @@ public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginRespo
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) {
-        System.out.println("客户端连接关闭 - LoginResponseHandler");
+        System.out.println("客户端连接关闭");
+        NettyClient.connect(MAX_RETRY);
     }
 }

+ 1 - 0
src/main/java/com/dzdy/dim/client/handler/LogoutResponseHandler.java

@@ -14,6 +14,7 @@ public class LogoutResponseHandler extends SimpleChannelInboundHandler<LogoutRes
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, LogoutResponsePacket msg) {
         SessionUtil.unBind(ctx.channel());
+        System.out.println("客户端下线成功");
     }
 
 }

+ 2 - 4
src/main/java/com/dzdy/dim/config/NettyConfig.java

@@ -3,10 +3,7 @@ package com.dzdy.dim.config;
 import com.dzdy.dim.handler.IMIdleStateHandler;
 import com.dzdy.dim.handler.PacketCodecHandler;
 import com.dzdy.dim.handler.Spliter;
-import com.dzdy.dim.service.handler.AuthHandler;
-import com.dzdy.dim.service.handler.ConfirmRequestHandler;
-import com.dzdy.dim.service.handler.HeartBeatRequestHandler;
-import com.dzdy.dim.service.handler.LoginRequestHandler;
+import com.dzdy.dim.service.handler.*;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
@@ -75,6 +72,7 @@ public class NettyConfig {
                 ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
                 ch.pipeline().addLast(HeartBeatRequestHandler.INSTANCE);
                 ch.pipeline().addLast(AuthHandler.INSTANCE);
+                ch.pipeline().addLast(LogoutRequestHandler.INSTANCE);
                 ch.pipeline().addLast(ConfirmRequestHandler.INSTANCE);
             }
         };

+ 3 - 1
src/main/java/com/dzdy/dim/contants/AttrContants.java

@@ -1,6 +1,5 @@
 package com.dzdy.dim.contants;
 
-import com.dzdy.dim.session.Session;
 import io.netty.util.AttributeKey;
 
 /**
@@ -12,5 +11,8 @@ import io.netty.util.AttributeKey;
  */
 public class AttrContants {
 
+    /**
+     * session key
+     */
     public static final AttributeKey<Session> SESSION = AttributeKey.newInstance("session");
 }

+ 44 - 0
src/main/java/com/dzdy/dim/contants/PushParam.java

@@ -0,0 +1,44 @@
+package com.dzdy.dim.contants;
+
+import com.dzdy.dim.protocol.command.Command;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import javax.validation.constraints.NotNull;
+import java.util.List;
+
+/**
+ * 推送数据封装类
+ *
+ * @author : wangzhiyong
+ * @date : 2019/6/4 14:25
+ * description :
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class PushParam {
+
+    /**
+     * session 集合
+     */
+    List<Session> sessions;
+
+    /**
+     * 用户id 集合
+     */
+    List<String> userIds;
+
+    /**
+     * 推送数据
+     */
+    @NotNull(message = "推送数据不能为空")
+    Object pushData;
+
+    /**
+     * 推送指令
+     */
+    @NotNull(message = "推送指令不能为空")
+    Command command;
+}

+ 1 - 1
src/main/java/com/dzdy/dim/session/Session.java → src/main/java/com/dzdy/dim/contants/Session.java

@@ -1,4 +1,4 @@
-package com.dzdy.dim.session;
+package com.dzdy.dim.contants;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;

+ 1 - 1
src/main/java/com/dzdy/dim/protocol/response/LoginResponsePacket.java

@@ -1,7 +1,7 @@
 package com.dzdy.dim.protocol.response;
 
+import com.dzdy.dim.contants.Session;
 import com.dzdy.dim.protocol.Packet;
-import com.dzdy.dim.session.Session;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 

+ 1 - 1
src/main/java/com/dzdy/dim/service/handler/LoginRequestHandler.java

@@ -1,9 +1,9 @@
 package com.dzdy.dim.service.handler;
 
+import com.dzdy.dim.contants.Session;
 import com.dzdy.dim.protocol.command.JWTUser;
 import com.dzdy.dim.protocol.request.LoginRequestPacket;
 import com.dzdy.dim.protocol.response.LoginResponsePacket;
-import com.dzdy.dim.session.Session;
 import com.dzdy.dim.util.IDUtil;
 import com.dzdy.dim.util.JWTUtil;
 import com.dzdy.dim.util.SessionUtil;

+ 6 - 14
src/main/java/com/dzdy/dim/service/handler/LogoutRequestHandler.java

@@ -1,9 +1,8 @@
 package com.dzdy.dim.service.handler;
 
-import com.dzdy.dim.protocol.SockData;
-import com.dzdy.dim.protocol.command.Command;
-import com.dzdy.dim.protocol.command.JWTUser;
 import com.dzdy.dim.protocol.request.LogoutRequestPacket;
+import com.dzdy.dim.protocol.response.LogoutResponsePacket;
+import com.dzdy.dim.util.SessionUtil;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -24,16 +23,9 @@ public class LogoutRequestHandler extends SimpleChannelInboundHandler<LogoutRequ
 
     @Override
     protected void channelRead0(ChannelHandlerContext ctx, LogoutRequestPacket msg) {
-        //测试后期删除
-//        SessionUtil.unBind(ctx.channel());
-//        LogoutResponsePacket logoutResponsePacket = new LogoutResponsePacket();
-//        logoutResponsePacket.setSuccess(true);
-//        ctx.writeAndFlush(logoutResponsePacket);
-        JWTUser user = new JWTUser();
-        user.setClient_id("2342342342");
-        user.setExp(323423L);
-        user.setUser_name("AAA");
-        SockData<JWTUser> data = SockData.newData(Command.MESSAGE, user);
-        ctx.writeAndFlush(data);
+        SessionUtil.unBind(ctx.channel());
+        LogoutResponsePacket logoutResponsePacket = new LogoutResponsePacket();
+        logoutResponsePacket.setSuccess(true);
+        ctx.writeAndFlush(logoutResponsePacket);
     }
 }

+ 1 - 1
src/main/java/com/dzdy/dim/util/JWTUtil.java

@@ -49,7 +49,7 @@ public class JWTUtil {
 //        System.out.println(encode(encode));
         String encode = encode("123");
         System.out.println(encode);
-        JWTUser decode = decode("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NTk1NDY5MDMsInVzZXJfbmFtZSI6ImpnemgwMSIsImF1dGhvcml0aWVzIjpbIlJPTEVfT1BFUkFUT1IiXSwianRpIjoiZjA2NmQxMGMtZWVmMy00Mzk4LTg0NjctOTNiNGYwNWRiMTEwIiwiY2xpZW50X2lkIjoiYW5waW4tb3BzLXBjIiwic2NvcGUiOlsic2VsZWN0Il19.jKTFpZitBmfnLsjssOa5EBNmSwjYu90AgcfDy35fqN4");
+        JWTUser decode = decode("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjI1NTk1NDY5MDMsInVzZXJfbmFtZSI6ImpnemgwMSIsImF1dGhvcml0aWVzIjpbIlJPTEVfT1BFUkFUT1IiXSwianRpIjoiZjA2NmQxMGMtZWVmMy00Mzk4LTg0NjctOTNiNGYwNWRiMTEwIiwiY2xpZW50X2lkIjoiYW5waW4tb3BzLXBjIiwic2NvcGUiOlsic2VsZWN0Il19.yB-K7s25d9cVUCT4Ec4ag5T4uJSa0X2hWNM5UJdinLk");
         System.out.println("111");
     }
 }

+ 7 - 3
src/main/java/com/dzdy/dim/util/PushUtil.java

@@ -1,6 +1,8 @@
 package com.dzdy.dim.util;
 
-import com.dzdy.dim.session.Session;
+import com.dzdy.dim.contants.Session;
+import com.dzdy.dim.protocol.SockData;
+import com.dzdy.dim.protocol.command.Command;
 import io.netty.channel.Channel;
 
 /**
@@ -14,12 +16,14 @@ public class PushUtil {
 
     /**
      * 发送消息
+     *
      * @param session
      * @param data
      */
-    public static void push(Session session, Object data) {
+    public static void push(Session session, Command command, Object data) {
+        SockData<Object> sockData = SockData.newData(command, data);
         Channel channel = SessionUtil.getChannel(session.getSessionId());
-        channel.writeAndFlush(data).addListener(future -> {
+        channel.writeAndFlush(sockData).addListener(future -> {
             if (future.isSuccess()) {
                 System.out.println(session.getUserId() + " 发送成功");
             } else {

+ 1 - 1
src/main/java/com/dzdy/dim/util/SessionUtil.java

@@ -1,7 +1,7 @@
 package com.dzdy.dim.util;
 
 import com.dzdy.dim.contants.AttrContants;
-import com.dzdy.dim.session.Session;
+import com.dzdy.dim.contants.Session;
 import io.netty.channel.Channel;
 import io.netty.util.internal.StringUtil;