教师个人网站建设/南京seo公司
概述
在上一节学习中,我们了解到了Netty作为HTTP服务端是怎么建立的,请求的执行流程是怎么样的,对Netty作为HTTP服务端的能力有了一个初步的认识,这一帖主要来学习一下基于Netty框架的客户端与服务端的长连接通讯。
示例
socket服务端与之前作为HTTP服务端的建立基本一致,都遵循如下流程:
- 建立接收和处理的线程组
- 使用ServerBootstrap绑定处理器并启动
- 建立连接并调用initChannel方法初始化各种处理器
服务端如下:
package com.leolee.netty.secondExample;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @ClassName MySocketServer* @Description: socket服务端* @Author LeoLee* @Date 2020/8/23* @Version V1.0**/
public class MySocketServer {public static void main(String[] args) throws InterruptedException {//定义线程组 EventLoopGroup为死循环//boss线程组一直在接收客户端发起的请求,但是不对请求做处理,boss会将接收到的请i交给worker线程组来处理//实际可以用一个线程组来做客户端的请求接收和处理两件事,但是不推荐EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {//启动类定义ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new MySocketServerInitializer());//子处理器,自定义处理器//绑定监听端口ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();//定义关闭监听channelFuture.channel().closeFuture().sync();} finally {//Netty提供的优雅关闭bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
package com.leolee.netty.secondExample;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;/*** @ClassName MySocketServerInitializer* @Description: 一旦客户端和服务端建立联系之后initChannel就会被调用* @Author LeoLee* @Date 2020/8/23* @Version V1.0**/
public class MySocketServerInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//声明管道ChannelPipeline pipeline = ch.pipeline();//绑定自带的解码器,就是对二进制数据的解析工具,至于解码器构造方法的参数之后详细分析pipeline.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//编码器pipeline.addLast("lengthFieldPrepender", new LengthFieldPrepender(4));//由于涉及到服务端和客户端的字符串数据,需要绑定字符串的编解码pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));//自定义处理器pipeline.addLast("mySocketServerHandler", new MySocketServerHandler());}}
package com.leolee.netty.secondExample;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.UUID;/*** @ClassName MySocketServerHandler* @Description: websocket服务端自定义处理器* @Author LeoLee* @Date 2020/8/23* @Version V1.0**/
public class MySocketServerHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(ctx.channel().remoteAddress() + ":" + msg);//返回数据ctx.channel().writeAndFlush("from server:" + UUID.randomUUID());}/*** 功能描述: <br> 重写异常时的处理回调方法* 〈〉在这里遇到异常直接关闭掉链接* @Param: [ctx, cause]* @Return: void* @Author: LeoLee* @Date: 2020/8/23 15:04*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}
}
客户端如下:
package com.leolee.netty.secondExample;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;/*** @ClassName MyClient* @Description: socket客户端* @Author LeoLee* @Date 2020/8/23* @Version V1.0**/
public class MyClient {public static void main(String[] args) throws InterruptedException {//客户端只需要一个线程组EventLoopGroup eventLoopGroup = new NioEventLoopGroup();try {//声明客户端启动类Bootstrap bootstrap = new Bootstrap();bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).handler(new MyClientInitializer());ChannelFuture channelFuture = bootstrap.connect("localhost", 8899).sync();channelFuture.channel().close().sync();} finally {//优雅关闭eventLoopGroup.shutdownGracefully();}}
}
package com.leolee.netty.secondExample;import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;/*** @ClassName MyClientInitializer* @Description:* @Author LeoLee* @Date 2020/8/23* @Version V1.0**/
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//声明管道ChannelPipeline pipeline = ch.pipeline();//绑定自带的解码器,就是对二进制数据的解析工具,至于解码器构造方法的参数之后详细分析pipeline.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//编码器pipeline.addLast("lengthFieldPrepender", new LengthFieldPrepender(4));//由于涉及到服务端和客户端的字符串数据,需要绑定字符串的编解码pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));//自定义处理器pipeline.addLast("myClientHandler", new MyClientHandler());}
}
package com.leolee.netty.secondExample;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.time.LocalDateTime;/*** @ClassName MyClientHandler* @Description:* @Author LeoLee* @Date 2020/8/23* @Version V1.0**/
public class MyClientHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {System.out.println(ctx.channel().remoteAddress());System.out.println("Client output:" + msg);ctx.writeAndFlush("from client:" + LocalDateTime.now());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}/*** 功能描述: <br> 该回调方法是连接处理活跃状态* 〈〉由于demo没有办法模拟请求的发送,所以重写这个方法来模拟客户端的消息发送* @Param: [ctx]* @Return: void* @Author: LeoLee* @Date: 2020/8/23 16:55*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush("模拟客户端发送消息");}
}
先把服务端跑起来,再把客户端跑起来,会看到两个控制条一直在互相发送消息,并且可以运行多个客户端,都会收到服务端的消息回复。
但是由于我的电脑不知道怎么回事,一直是如下报错,不知道怎么解决,知道的小伙伴请给我留言一哈,谢谢啦
需要代码的来这里拿嗷:demo项目地址