浏览代码

feat: v1.0.1 放入web容器 微服务化

王智勇 5 年之前
父节点
当前提交
a9cd320450

+ 24 - 16
pom.xml

@@ -4,10 +4,17 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.1.5.RELEASE</version>
+        <relativePath/>
+    </parent>
 
     <groupId>dzdy</groupId>
     <artifactId>dim</artifactId>
     <version>1.0-SNAPSHOT</version>
+    <description> push service basic on netty</description>
 
     <properties>
         <java.version>1.8</java.version>
@@ -18,17 +25,33 @@
     </properties>
 
     <dependencies>
-
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-configuration-processor</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <!-- netty -->
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
             <version>${netty.version}</version>
         </dependency>
+        <!-- lombok -->
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
             <version>${lombok.version}</version>
         </dependency>
+        <!-- fastjson -->
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
@@ -53,19 +76,4 @@
             <scope>runtime</scope>
         </dependency>
     </dependencies>
-
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <version>2.0.2</version>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
 </project>

+ 29 - 0
src/main/java/com/dzdy/dim/DimApplication.java

@@ -0,0 +1,29 @@
+package com.dzdy.dim;
+
+import com.dzdy.dim.config.NettyManager;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import javax.annotation.Resource;
+
+/**
+ * @author : wangzhiyong
+ * @date : 2019/5/29 16:24
+ * description :
+ */
+@SpringBootApplication
+public class DimApplication implements CommandLineRunner {
+
+    @Resource
+    NettyManager nettyManager;
+
+    public static void main(String[] args) {
+        SpringApplication.run(DimApplication.class, args);
+    }
+
+    @Override
+    public void run(String... args) {
+        nettyManager.run();
+    }
+}

+ 20 - 5
src/main/java/com/dzdy/dim/api/impl/ClientApiImpl.java

@@ -3,24 +3,39 @@ package com.dzdy.dim.api.impl;
 import com.dzdy.dim.api.ClientApi;
 import com.dzdy.dim.api.R;
 import com.dzdy.dim.session.Session;
+import com.dzdy.dim.util.SessionUtil;
+import io.netty.channel.Channel;
+import org.springframework.web.bind.annotation.*;
 
 import java.util.List;
 
 /**
- * todo
- *
  * @author : wangzhiyong
  * @date : 2019/5/27 15:36
  * description :
  */
+@RestController
+@RequestMapping("client")
 public class ClientApiImpl implements ClientApi {
+
     @Override
+    @GetMapping("getAllSession")
     public R<List<Session>> getAllSession(String userId) {
-        return null;
+        List<Session> sessionList = SessionUtil.getSession(userId);
+        return R.ok(sessionList);
     }
 
     @Override
-    public R<Boolean> removeSession(String userId) {
-        return null;
+    @DeleteMapping("/userId/{sessionId}")
+    public R<Boolean> removeSession(@PathVariable String sessionId) {
+        Channel channel = SessionUtil.getChannel(sessionId);
+        channel.close().addListener(future -> {
+            if (future.isSuccess()) {
+                System.out.println("客户端关闭成功");
+            } else {
+                System.out.println("客户端关闭失败:" + future.cause());
+            }
+        });
+        return R.ok(true);
     }
 }

+ 15 - 3
src/main/java/com/dzdy/dim/api/impl/PushApiImpl.java

@@ -3,24 +3,36 @@ package com.dzdy.dim.api.impl;
 import com.dzdy.dim.api.PushApi;
 import com.dzdy.dim.api.R;
 import com.dzdy.dim.session.Session;
+import com.dzdy.dim.util.PushUtil;
+import com.dzdy.dim.util.SessionUtil;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
 
 import java.util.List;
 
 /**
- * todo
+ * 推送消息
+ * 异步推送 后续可拓展等待结果
  *
  * @author : wangzhiyong
  * @date : 2019/5/27 15:36
  * description :
  */
+@RestController
+@RequestMapping("push")
 public class PushApiImpl implements PushApi {
     @Override
+    @PostMapping("pushBySession")
     public R<Boolean> pushBySession(List<Session> sessions, Object data) {
-        return null;
+        sessions.forEach(session -> PushUtil.push(session, data));
+        return R.ok(true);
     }
 
     @Override
+    @PostMapping("pushByUserId")
     public R<Boolean> pushByUserId(String userId, Object data) {
-        return null;
+        SessionUtil.getSession(userId).forEach(session -> PushUtil.push(session, data));
+        return R.ok(true);
     }
 }

+ 0 - 32
src/main/java/com/dzdy/dim/baseIO/IOClient.java

@@ -1,32 +0,0 @@
-package com.dzdy.dim.baseIO;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.util.Date;
-
-/**
- * @author : wangzhiyong
- * @date : 2019/5/23 17:03
- * description :
- */
-public class IOClient {
-
-    public static void main(String[] args) throws Exception {
-
-        new Thread(() -> {
-            try {
-                Socket socket = new Socket("127.0.0.1", 9000);
-                while (true) {
-                    socket.getOutputStream().write((new Date() + ": hello world").getBytes());
-                    Thread.sleep(2000);
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-
-
-        }).start();
-    }
-}

+ 0 - 44
src/main/java/com/dzdy/dim/baseIO/IOSerive.java

@@ -1,44 +0,0 @@
-package com.dzdy.dim.baseIO;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-/**
- * @author : wangzhiyong
- * @date : 2019/5/23 16:56
- * description :
- */
-public class IOSerive {
-
-    public static void main(String[] args) throws Exception {
-        ServerSocket serverSocket = new ServerSocket(9000);
-
-        new Thread(() -> {
-            while (true) {
-                try {
-                    Socket socket = serverSocket.accept();
-
-                    new Thread(() -> {
-                        try {
-                            int len;
-                            byte[] data = new byte[1024];
-                            InputStream inputStream = socket.getInputStream();
-                            while ((len = inputStream.read(data)) != -1) {
-                                System.out.println(new String(data, 0, len));
-                            }
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                        }
-                    }).start();
-
-                } catch (IOException e) {
-                    e.printStackTrace();
-                }
-
-            }
-        }).start();
-
-    }
-}

+ 0 - 96
src/main/java/com/dzdy/dim/baseIO/NIOServer.java

@@ -1,96 +0,0 @@
-package com.dzdy.dim.baseIO;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * @author : wangzhiyong
- * @date : 2019/5/23 17:32
- * description :
- */
-public class NIOServer {
-    public static void main(String[] args) throws IOException {
-        Selector serverSelector = Selector.open();
-        Selector clientSelector = Selector.open();
-
-        new Thread(() -> {
-            try {
-                // 对应IO编程中服务端启动
-                ServerSocketChannel listenerChannel = ServerSocketChannel.open();
-                listenerChannel.socket().bind(new InetSocketAddress(9000));
-                listenerChannel.configureBlocking(false);
-                listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
-
-                while (true) {
-                    // 监测是否有新的连接,这里的1指的是阻塞的时间为 1ms
-                    if (serverSelector.select(1) > 0) {
-                        Set<SelectionKey> set = serverSelector.selectedKeys();
-                        Iterator<SelectionKey> keyIterator = set.iterator();
-
-                        while (keyIterator.hasNext()) {
-                            SelectionKey key = keyIterator.next();
-
-                            if (key.isAcceptable()) {
-                                try {
-                                    // (1) 每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
-                                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
-                                    clientChannel.configureBlocking(false);
-                                    clientChannel.register(clientSelector, SelectionKey.OP_READ);
-                                } finally {
-                                    keyIterator.remove();
-                                }
-                            }
-
-                        }
-                    }
-                }
-            } catch (IOException ignored) {
-            }
-
-        }).start();
-
-
-        new Thread(() -> {
-            try {
-                while (true) {
-                    // (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为 1ms
-                    if (clientSelector.select(1) > 0) {
-                        Set<SelectionKey> set = clientSelector.selectedKeys();
-                        Iterator<SelectionKey> keyIterator = set.iterator();
-
-                        while (keyIterator.hasNext()) {
-                            SelectionKey key = keyIterator.next();
-
-                            if (key.isReadable()) {
-                                try {
-                                    SocketChannel clientChannel = (SocketChannel) key.channel();
-                                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
-                                    // (3) 面向 Buffer
-                                    clientChannel.read(byteBuffer);
-                                    byteBuffer.flip();
-                                    System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
-                                            .toString());
-                                } finally {
-                                    keyIterator.remove();
-                                    key.interestOps(SelectionKey.OP_READ);
-                                }
-                            }
-
-                        }
-                    }
-                }
-            } catch (IOException ignored) {
-            }
-        }).start();
-
-
-    }
-}

+ 1 - 1
src/main/java/com/dzdy/dim/client/NettyClient.java

@@ -59,7 +59,7 @@ public class NettyClient {
                     }
                 });
 
-        connect(bootstrap, "127.0.0.1", 9001, MAX_RETRY);
+        connect(bootstrap, "127.0.0.1", 9002, MAX_RETRY);
 
         //连接超时
         bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)

+ 10 - 0
src/main/java/com/dzdy/dim/client/handler/HeartBeatTimerHandler.java

@@ -3,6 +3,7 @@ package com.dzdy.dim.client.handler;
 import com.dzdy.dim.protocol.request.HeartBeatRequestPacket;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.EventLoop;
 
 import java.util.Date;
 import java.util.concurrent.TimeUnit;
@@ -39,4 +40,13 @@ public class HeartBeatTimerHandler extends ChannelInboundHandlerAdapter {
             }
         }, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
     }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        //如果运行过程中服务端挂了,执行重连机制
+        EventLoop eventLoop = ctx.channel().eventLoop();
+        // todo 客户端自行将服务放在容器中启动
+        eventLoop.schedule(() -> null, 10L, TimeUnit.SECONDS);
+        super.channelInactive(ctx);
+    }
 }

+ 0 - 4
src/main/java/com/dzdy/dim/client/handler/LogoutResponseHandler.java

@@ -16,8 +16,4 @@ public class LogoutResponseHandler extends SimpleChannelInboundHandler<LogoutRes
         SessionUtil.unBind(ctx.channel());
     }
 
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) {
-        System.out.println("客户端连接关闭 - LogoutResponseHandler");
-    }
 }

+ 82 - 0
src/main/java/com/dzdy/dim/config/NettyConfig.java

@@ -0,0 +1,82 @@
+package com.dzdy.dim.config;
+
+import com.dzdy.dim.handler.IMIdleStateHandler;
+import com.dzdy.dim.handler.PacketCodecHandler;
+import com.dzdy.dim.handler.Spliter;
+import com.dzdy.dim.service.handler.AuthHandler;
+import com.dzdy.dim.service.handler.ConfirmRequestHandler;
+import com.dzdy.dim.service.handler.HeartBeatRequestHandler;
+import com.dzdy.dim.service.handler.LoginRequestHandler;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+/**
+ * netty 配置
+ *
+ * @author : wangzhiyong
+ * @date : 2019/5/30 10:34
+ * description :
+ */
+@Data
+@Component
+@ConfigurationProperties(prefix = "dim.netty")
+public class NettyConfig {
+
+    /**
+     * boss 线程数 一般为1
+     */
+    private Integer boosCount = 1;
+    /**
+     * worker 线程数 默认为核心数*2
+     */
+    private Integer workerCount;
+    /**
+     * 监听端口
+     */
+    private Integer port = 9001;
+    /**
+     * 握手请求的队列的最大长度
+     */
+    private Integer soBackLog = 1024;
+
+    @Bean("serverBootstrap")
+    public ServerBootstrap serverBootstrap() {
+        return new ServerBootstrap();
+    }
+
+    @Bean("bossGroup")
+    public NioEventLoopGroup bossGroup() {
+        return new NioEventLoopGroup(boosCount);
+    }
+
+    @Bean("workerGroup")
+    public NioEventLoopGroup workerGroup() {
+        if (workerCount != null) {
+            return new NioEventLoopGroup(workerCount);
+        }
+        return new NioEventLoopGroup();
+    }
+
+    @Bean("channelHandler")
+    public ChannelHandler channelHandler() {
+        return new ChannelInitializer<NioSocketChannel>() {
+            @Override
+            protected void initChannel(NioSocketChannel ch) {
+                ch.pipeline().addLast(new IMIdleStateHandler());
+                ch.pipeline().addLast(new Spliter());
+                ch.pipeline().addLast(PacketCodecHandler.INSTANCE);
+                ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
+                ch.pipeline().addLast(HeartBeatRequestHandler.INSTANCE);
+                ch.pipeline().addLast(AuthHandler.INSTANCE);
+                ch.pipeline().addLast(ConfirmRequestHandler.INSTANCE);
+            }
+        };
+    }
+}

+ 69 - 0
src/main/java/com/dzdy/dim/config/NettyManager.java

@@ -0,0 +1,69 @@
+package com.dzdy.dim.config;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+
+/**
+ * netty 服务方法管理
+ *
+ * @author : wangzhiyong
+ * @date : 2019/5/30 11:24
+ * description :
+ */
+@Configuration
+public class NettyManager {
+
+    @Resource
+    private NettyConfig netty;
+    @Resource
+    private ServerBootstrap serverBootstrap;
+    @Resource
+    private NioEventLoopGroup bossGroup;
+    @Resource
+    private NioEventLoopGroup workerGroup;
+    @Resource
+    private ChannelHandler channelHandler;
+
+    public void run() {
+        serverBootstrap
+                .group(bossGroup, workerGroup)
+                //指定我们服务端的 IO 模型为NIO
+                .channel(NioServerSocketChannel.class)
+                .option(ChannelOption.SO_BACKLOG, netty.getSoBackLog())
+                // TCP 心跳机制 true 开启
+                .childOption(ChannelOption.SO_KEEPALIVE, true)
+                // 是否关闭Nagle算法 true 高实时性,有消息立即发送 false 减少发送次数减少网络交互 提升性能(Nagle 算法)
+                .childOption(ChannelOption.TCP_NODELAY, true)
+                .childHandler(channelHandler);
+        bind(netty.getPort());
+    }
+
+    /**
+     * 异步监听 绑定端口是否成功 并加大端口号继续绑定
+     *
+     * @param port
+     */
+    private void bind(int port) {
+        serverBootstrap.bind(port).addListener(future -> {
+            if (future.isSuccess()) {
+                System.out.println("端口[" + port + "]绑定成功");
+            } else {
+                System.out.println("端口[" + port + "]绑定失败");
+                bind(port + 1);
+            }
+        });
+    }
+
+    @PreDestroy
+    public void stop() {
+        bossGroup.shutdownGracefully();
+        workerGroup.shutdownGracefully();
+    }
+}

+ 82 - 82
src/main/java/com/dzdy/dim/service/NettyServer.java

@@ -1,82 +1,82 @@
-package com.dzdy.dim.service;
-
-import com.dzdy.dim.handler.IMIdleStateHandler;
-import com.dzdy.dim.handler.PacketCodecHandler;
-import com.dzdy.dim.handler.Spliter;
-import com.dzdy.dim.service.handler.AuthHandler;
-import com.dzdy.dim.service.handler.ConfirmRequestHandler;
-import com.dzdy.dim.service.handler.HeartBeatRequestHandler;
-import com.dzdy.dim.service.handler.LoginRequestHandler;
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-
-/**
- * @author : wangzhiyong
- * @date : 2019/5/23 17:36
- * description :
- */
-public class NettyServer {
-
-    private static final int PORT = 9001;
-
-    public static void main(String[] args) {
-        final ServerBootstrap serverBootstrap = new ServerBootstrap();
-
-        //接受连接
-        NioEventLoopGroup boss = new NioEventLoopGroup();
-        //异步非阻塞处理数据
-        NioEventLoopGroup worker = new NioEventLoopGroup();
-        serverBootstrap.group(boss, worker)
-                //指定我们服务端的 IO 模型为NIO
-                .channel(NioServerSocketChannel.class)
-                // 临时存放已完成三次握手的请求的队列的最大长度 如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
-                .option(ChannelOption.SO_BACKLOG, 1024)
-                // TCP 心跳机制 true 开启
-                .childOption(ChannelOption.SO_KEEPALIVE, true)
-                // 是否关闭Nagle算法 true 高实时性,有消息立即发送 false 减少发送次数减少网络交互 提升性能(Nagle 算法)
-                .childOption(ChannelOption.TCP_NODELAY, true)
-                //指定启动前的逻辑操作
-                .handler(new ChannelInitializer<NioServerSocketChannel>() {
-                    @Override
-                    protected void initChannel(NioServerSocketChannel ch) throws Exception {
-                        System.out.println("服务端启动中");
-                    }
-                })
-                //用于指定处理新连接数据
-                .childHandler(new ChannelInitializer<NioSocketChannel>() {
-                    @Override
-                    protected void initChannel(NioSocketChannel ch) {
-                        ch.pipeline().addLast(new IMIdleStateHandler());
-                        ch.pipeline().addLast(new Spliter());
-                        ch.pipeline().addLast(PacketCodecHandler.INSTANCE);
-                        ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
-                        ch.pipeline().addLast(HeartBeatRequestHandler.INSTANCE);
-                        ch.pipeline().addLast(AuthHandler.INSTANCE);
-                        ch.pipeline().addLast(ConfirmRequestHandler.INSTANCE);
-                    }
-                });
-        bind(serverBootstrap, PORT);
-    }
-
-    /**
-     * 异步监听 绑定端口是否成功 并加大端口号继续绑定
-     *
-     * @param bootstrap
-     * @param port
-     */
-    private static void bind(ServerBootstrap bootstrap, int port) {
-        bootstrap.bind(port).addListener(future -> {
-            if (future.isSuccess()) {
-                System.out.println("端口[" + port + "]绑定成功");
-            } else {
-                System.out.println("端口[" + port + "]绑定失败");
-                bind(bootstrap, port + 1);
-            }
-        });
-    }
-
-}
+//package com.dzdy.dim.service;
+//
+//import com.dzdy.dim.handler.IMIdleStateHandler;
+//import com.dzdy.dim.handler.PacketCodecHandler;
+//import com.dzdy.dim.handler.Spliter;
+//import com.dzdy.dim.service.handler.AuthHandler;
+//import com.dzdy.dim.service.handler.ConfirmRequestHandler;
+//import com.dzdy.dim.service.handler.HeartBeatRequestHandler;
+//import com.dzdy.dim.service.handler.LoginRequestHandler;
+//import io.netty.bootstrap.ServerBootstrap;
+//import io.netty.channel.ChannelInitializer;
+//import io.netty.channel.ChannelOption;
+//import io.netty.channel.nio.NioEventLoopGroup;
+//import io.netty.channel.socket.nio.NioServerSocketChannel;
+//import io.netty.channel.socket.nio.NioSocketChannel;
+//
+///**
+// * @author : wangzhiyong
+// * @date : 2019/5/23 17:36
+// * description :
+// */
+//public class NettyServer {
+//
+//    private static final int PORT = 9001;
+//
+//    public static void main(String[] args) {
+//        final ServerBootstrap serverBootstrap = new ServerBootstrap();
+//
+//        //接受连接
+//        NioEventLoopGroup boss = new NioEventLoopGroup();
+//        //异步非阻塞处理数据
+//        NioEventLoopGroup worker = new NioEventLoopGroup();
+//        serverBootstrap.group(boss, worker)
+//                //指定我们服务端的 IO 模型为NIO
+//                .channel(NioServerSocketChannel.class)
+//                // 临时存放已完成三次握手的请求的队列的最大长度 如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
+//                .option(ChannelOption.SO_BACKLOG, 1024)
+//                // TCP 心跳机制 true 开启
+//                .childOption(ChannelOption.SO_KEEPALIVE, true)
+//                // 是否关闭Nagle算法 true 高实时性,有消息立即发送 false 减少发送次数减少网络交互 提升性能(Nagle 算法)
+//                .childOption(ChannelOption.TCP_NODELAY, true)
+//                //指定启动前的逻辑操作
+//                .handler(new ChannelInitializer<NioServerSocketChannel>() {
+//                    @Override
+//                    protected void initChannel(NioServerSocketChannel ch) {
+//                        System.out.println("服务端启动中");
+//                    }
+//                })
+//                //用于指定处理新连接数据
+//                .childHandler(new ChannelInitializer<NioSocketChannel>() {
+//                    @Override
+//                    protected void initChannel(NioSocketChannel ch) {
+//                        ch.pipeline().addLast(new IMIdleStateHandler());
+//                        ch.pipeline().addLast(new Spliter());
+//                        ch.pipeline().addLast(PacketCodecHandler.INSTANCE);
+//                        ch.pipeline().addLast(LoginRequestHandler.INSTANCE);
+//                        ch.pipeline().addLast(HeartBeatRequestHandler.INSTANCE);
+//                        ch.pipeline().addLast(AuthHandler.INSTANCE);
+//                        ch.pipeline().addLast(ConfirmRequestHandler.INSTANCE);
+//                    }
+//                });
+//        bind(serverBootstrap, PORT);
+//    }
+//
+//    /**
+//     * 异步监听 绑定端口是否成功 并加大端口号继续绑定
+//     *
+//     * @param bootstrap
+//     * @param port
+//     */
+//    private static void bind(ServerBootstrap bootstrap, int port) {
+//        bootstrap.bind(port).addListener(future -> {
+//            if (future.isSuccess()) {
+//                System.out.println("端口[" + port + "]绑定成功");
+//            } else {
+//                System.out.println("端口[" + port + "]绑定失败");
+//                bind(bootstrap, port + 1);
+//            }
+//        });
+//    }
+//
+//}

+ 16 - 10
src/main/java/com/dzdy/dim/service/handler/LoginRequestHandler.java

@@ -7,6 +7,7 @@ import com.dzdy.dim.session.Session;
 import com.dzdy.dim.util.IDUtil;
 import com.dzdy.dim.util.JWTUtil;
 import com.dzdy.dim.util.SessionUtil;
+import io.jsonwebtoken.ExpiredJwtException;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -26,20 +27,25 @@ public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginReques
     }
 
     @Override
-    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket packet) throws Exception {
+    protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket packet) {
         LoginResponsePacket responsePacket = new LoginResponsePacket();
         responsePacket.setVersion(packet.getVersion());
 
         String token = packet.getToken();
-        JWTUser user = JWTUtil.decode(token);
-        if (null != user) {
-            responsePacket.setSuccess(true);
-            String sessionId = IDUtil.randomId();
-            Session session = new Session(sessionId, user.getUser_name(), null);
-            responsePacket.setSession(session);
-            SessionUtil.bind(session, ctx.channel());
-            System.out.println(String.format("[%s]登录成功", user.getUser_name()));
-        } else {
+        try {
+            JWTUser user = JWTUtil.decode(token);
+            if (null != user) {
+                responsePacket.setSuccess(true);
+                String sessionId = IDUtil.randomId();
+                Session session = new Session(sessionId, user.getUser_name(), null);
+                responsePacket.setSession(session);
+                SessionUtil.bind(session, ctx.channel());
+                System.out.println(String.format("[%s]登录成功", user.getUser_name()));
+            }
+        } catch (ExpiredJwtException e) {
+            responsePacket.setSuccess(false);
+            responsePacket.setReason("token过期,连接失败");
+        } catch (Exception e) {
             responsePacket.setSuccess(false);
             responsePacket.setReason("token错误,连接失败");
         }

+ 5 - 11
src/main/java/com/dzdy/dim/util/JWTUtil.java

@@ -24,17 +24,11 @@ public class JWTUtil {
 
     private static final SecretKey SECRET_KEY = new SecretKeySpec("[B@11c20519222222222222222222222".getBytes(), SignatureAlgorithm.HS256.getJcaName());
 
-    public static JWTUser decode(String token) {
-        try {
-            if (null != token && token.length() > 0) {
-                Jws<Claims> jws = Jwts.parser().setSigningKey(SECRET_KEY).parseClaimsJws(token);
-                Claims body = jws.getBody();
-                return JSONObject.parseObject(JSONObject.toJSONString(body), JWTUser.class);
-            }
-        } catch (ExpiredJwtException e) {
-            System.out.println("token expired.");
-        } catch (Exception e) {
-            e.printStackTrace();
+    public static JWTUser decode(String token) throws ExpiredJwtException {
+        if (null != token && token.length() > 0) {
+            Jws<Claims> jws = Jwts.parser().setSigningKey(SECRET_KEY).parseClaimsJws(token);
+            Claims body = jws.getBody();
+            return JSONObject.parseObject(JSONObject.toJSONString(body), JWTUser.class);
         }
         return null;
     }

+ 30 - 0
src/main/java/com/dzdy/dim/util/PushUtil.java

@@ -0,0 +1,30 @@
+package com.dzdy.dim.util;
+
+import com.dzdy.dim.session.Session;
+import io.netty.channel.Channel;
+
+/**
+ * 推送工具
+ *
+ * @author : wangzhiyong
+ * @date : 2019/5/30 10:25
+ * description :
+ */
+public class PushUtil {
+
+    /**
+     * 发送消息
+     * @param session
+     * @param data
+     */
+    public static void push(Session session, Object data) {
+        Channel channel = SessionUtil.getChannel(session.getSessionId());
+        channel.writeAndFlush(data).addListener(future -> {
+            if (future.isSuccess()) {
+                System.out.println(session.getUserId() + " 发送成功");
+            } else {
+                System.out.println(session.getUserId() + " 发送失败");
+            }
+        });
+    }
+}

+ 16 - 0
src/main/java/com/dzdy/dim/util/SessionUtil.java

@@ -3,12 +3,16 @@ package com.dzdy.dim.util;
 import com.dzdy.dim.contants.AttrContants;
 import com.dzdy.dim.session.Session;
 import io.netty.channel.Channel;
+import io.netty.util.internal.StringUtil;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * session 管理
+ * todo 后续拓展其他存储方式 redis/mysql 等
  *
  * @author : wangzhiyong
  * @date : 2019/5/27 13:56
@@ -42,4 +46,16 @@ public class SessionUtil {
     public static Channel getChannel(String sessionId) {
         return USER_ID_CHANNEL_MAP.get(sessionId);
     }
+
+    public static List<Session> getSession(String userId) {
+        List<Session> sessionList = new ArrayList<>(USER_ID_CHANNEL_MAP.size());
+        for (Channel channel : USER_ID_CHANNEL_MAP.values()) {
+            Session session = getSession(channel);
+            if (!StringUtil.isNullOrEmpty(userId) && !userId.equals(session.getUserId())) {
+                continue;
+            }
+            sessionList.add(session);
+        }
+        return sessionList;
+    }
 }

+ 6 - 0
src/main/resources/application.yml

@@ -0,0 +1,6 @@
+dim:
+  netty:
+    port: 9002
+
+server:
+  port: 9090