当前位置: 首页>资讯 >

天天新资讯:IO流中「线程」模型总结

来源: 博客园 | 时间: 2023-04-07 09:42:41 |

目录一、基础简介二、同步阻塞1、模型图解2、参考案例三、同步非阻塞1、模型图解2、参考案例四、异步非阻塞1、模型图解2、参考案例五、Reactor模型1、模型图解1.1 Reactor设计原理1.2 单Reactor单线程1.3 单Reactor多线程1.4 主从Reactor多线程2、参考案例六、参考源码

IO流模块:经常看、经常用、经常忘;


(资料图片仅供参考)

一、基础简介

在IO流的网络模型中,以常见的「客户端-服务端」交互场景为例;

客户端与服务端进行通信「交互」,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,当然也有自定义的业务流程需要执行,从处理逻辑看就是「读取数据-业务执行-应答写数据」的形式;

Java提供「三种」IO网络编程模型,即:「BIO同步阻塞」、「NIO同步非阻塞」、「AIO异步非阻塞」;

二、同步阻塞1、模型图解

BIO即同步阻塞,服务端收到客户端的请求时,会启动一个线程处理,「交互」会阻塞直到整个流程结束;

这种模式如果在高并发且流程复杂耗时的场景下,客户端的请求响应会存在严重的性能问题,并且占用过多资源;

2、参考案例

服务端】启动ServerSocket接收客户端的请求,经过一系列逻辑之后,向客户端发送消息,注意这里线程的10秒休眠;

public class SocketServer01 {    public static void main(String[] args) throws Exception {        // 1、创建Socket服务端        ServerSocket serverSocket = new ServerSocket(8080);        // 2、方法阻塞等待,直到有客户端连接        Socket socket = serverSocket.accept();        // 3、输入流,输出流        InputStream inStream = socket.getInputStream();        OutputStream outStream = socket.getOutputStream();        // 4、数据接收和响应        int readLen = 0;        byte[] buf = new byte[1024];        if ((readLen=inStream.read(buf)) != -1){            // 接收数据            String readVar = new String(buf, 0, readLen) ;            System.out.println("readVar======="+readVar);        }        // 响应数据        Thread.sleep(10000);        outStream.write("sever-8080-write;".getBytes());        // 5、资源关闭        IoClose.ioClose(outStream,inStream,socket,serverSocket);    }}

客户端】Socket连接,先向ServerSocket发送请求,再接收其响应,由于Server端模拟耗时,Client处于长时间阻塞状态;

public class SocketClient01 {    public static void main(String[] args) throws Exception {        // 1、创建Socket客户端        Socket socket = new Socket(InetAddress.getLocalHost(), 8080);        // 2、输入流,输出流        OutputStream outStream = socket.getOutputStream();        InputStream inStream = socket.getInputStream();        // 3、数据发送和响应接收        // 发送数据        outStream.write("client-hello".getBytes());        // 接收数据        int readLen = 0;        byte[] buf = new byte[1024];        if ((readLen=inStream.read(buf)) != -1){            String readVar = new String(buf, 0, readLen) ;            System.out.println("readVar======="+readVar);        }        // 4、资源关闭        IoClose.ioClose(inStream,outStream,socket);    }}
三、同步非阻塞1、模型图解

NIO即同步非阻塞,服务端可以实现一个线程,处理多个客户端请求连接,服务端的并发能力得到极大的提升;

这种模式下客户端的请求连接都会注册到Selector多路复用器上,多路复用器会进行轮询,对请求连接的IO流进行处理;

2、参考案例

服务端】单线程可以处理多个客户端请求,通过轮询多路复用器查看是否有IO请求;

public class SocketServer01 {    public static void main(String[] args) throws Exception {        try {            //启动服务开启监听            ServerSocketChannel socketChannel = ServerSocketChannel.open();            socketChannel.socket().bind(new InetSocketAddress("127.0.0.1", 8989));            // 设置非阻塞,接受客户端            socketChannel.configureBlocking(false);            // 打开多路复用器            Selector selector = Selector.open();            // 服务端Socket注册到多路复用器,指定兴趣事件            socketChannel.register(selector, SelectionKey.OP_ACCEPT);            // 多路复用器轮询            ByteBuffer buffer = ByteBuffer.allocateDirect(1024);            while (selector.select() > 0){                Set selectionKeys = selector.selectedKeys();                Iterator selectionKeyIter = selectionKeys.iterator();                while (selectionKeyIter.hasNext()){                    SelectionKey selectionKey = selectionKeyIter.next() ;                    selectionKeyIter.remove();                    if(selectionKey.isAcceptable()) {                        // 接受新的连接                        SocketChannel client = socketChannel.accept();                        // 设置读非阻塞                        client.configureBlocking(false);                        // 注册到多路复用器                        client.register(selector, SelectionKey.OP_READ);                    } else if (selectionKey.isReadable()) {                        // 通道可读                        SocketChannel client = (SocketChannel) selectionKey.channel();                        int len = client.read(buffer);                        if (len > 0){                            buffer.flip();                            byte[] readArr = new byte[buffer.limit()];                            buffer.get(readArr);                            System.out.println(client.socket().getPort() + "端口数据:" + new String(readArr));                            buffer.clear();                        }                    }                }            }        } catch (Exception e) {            e.printStackTrace();        }    }}

客户端】每隔3秒持续的向通道内写数据,服务端通过轮询多路复用器,持续的读取数据;

public class SocketClient01 {    public static void main(String[] args) throws Exception {        try {            // 连接服务端            SocketChannel socketChannel = SocketChannel.open();            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);            String conVar = "client-hello";            writeBuffer.put(conVar.getBytes());            writeBuffer.flip();            // 每隔3S发送一次数据            while (true) {                Thread.sleep(3000);                writeBuffer.rewind();                socketChannel.write(writeBuffer);                writeBuffer.clear();            }        } catch (Exception e) {            e.printStackTrace();        }    }}
四、异步非阻塞1、模型图解

AIO即异步非阻塞,对于通道内数据的「读」和「写」动作,都是采用异步的模式,对于性能的提升是巨大的;

这与常规的第三方对接模式很相似,本地服务在请求第三方服务时,请求过程耗时很大,会异步执行,第三方第一次回调,确认请求可以被执行;第二次回调则是推送处理结果,这种思想在处理复杂问题时,可以很大程度的提高性能,节省资源:

2、参考案例

服务端】各种「accept」、「read」、「write」动作是异步,通过Future来获取计算的结果;

public class SocketServer01 {    public static void main(String[] args) throws Exception {        // 启动服务开启监听        AsynchronousServerSocketChannel socketChannel = AsynchronousServerSocketChannel.open() ;        socketChannel.bind(new InetSocketAddress("127.0.0.1", 8989));        // 指定30秒内获取客户端连接,否则超时        Future acceptFuture = socketChannel.accept();        AsynchronousSocketChannel asyChannel = acceptFuture.get(30, TimeUnit.SECONDS);        if (asyChannel != null && asyChannel.isOpen()){            // 读数据            ByteBuffer inBuffer = ByteBuffer.allocate(1024);            Future readResult = asyChannel.read(inBuffer);            readResult.get();            System.out.println("read:"+new String(inBuffer.array()));            // 写数据            inBuffer.flip();            Future writeResult = asyChannel.write(ByteBuffer.wrap("server-hello".getBytes()));            writeResult.get();        }        // 关闭资源        asyChannel.close();    }}

客户端】相关「connect」、「read」、「write」方法调用是异步的,通过Future来获取计算的结果;

public class SocketClient01 {    public static void main(String[] args) throws Exception {        // 连接服务端        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();        Future result = socketChannel.connect(new InetSocketAddress("127.0.0.1", 8989));        result.get();        // 写数据        String conVar = "client-hello";        ByteBuffer reqBuffer = ByteBuffer.wrap(conVar.getBytes());        Future writeFuture = socketChannel.write(reqBuffer);        writeFuture.get();        // 读数据        ByteBuffer inBuffer = ByteBuffer.allocate(1024);        Future readFuture = socketChannel.read(inBuffer);        readFuture.get();        System.out.println("read:"+new String(inBuffer.array()));        // 关闭资源        socketChannel.close();    }}
五、Reactor模型1、模型图解

这部分内容,可以参考「Doug Lea的《IO》」文档,查看更多细节;

1.1 Reactor设计原理

Reactor模式基于事件驱动设计,也称为「反应器」模式或者「分发者」模式;服务端收到多个客户端请求后,会将请求分派给对应的线程处理;

Reactor:负责事件的监听和分发;Handler:负责处理事件,核心逻辑「read读」、「decode解码」、「compute业务计算」、「encode编码」、「send应答数据」;

1.2 单Reactor单线程

【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;

【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;

【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;

【4】在Handler中,会完成相应的业务流程;

这种模式将所有逻辑「连接、读写、业务」放在一个线程中处理,避免多线程的通信,资源竞争等问题,但是存在明显的并发和性能问题;

1.3 单Reactor多线程

【1】Reactor线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;

【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,并创建一个Handler对象来处理后续业务;

【3】如果不是连接请求事件,则Reactor会将该事件交由当前连接的Handler来处理;

【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;

【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;

这种模式将业务从Reactor单线程分离处理,可以让其更专注于事件的分发和调度,Handler使用多线程也充分的利用cpu的处理能力,导致逻辑变的更加复杂,Reactor单线程依旧存在高并发的性能问题;

1.4 主从Reactor多线程

【1】 MainReactor主线程通过select监听客户端的请求事件,收到事件后通过Dispatch进行分发;

【2】如果是建立连接请求事件,Acceptor通过「accept」方法获取连接,之后MainReactor将连接分配给SubReactor;

【3】如果不是连接请求事件,则MainReactor将连接分配给SubReactor,SubReactor调用当前连接的Handler来处理;

【4】在Handler中,只负责事件响应不处理具体业务,将数据发送给Worker线程池来处理;

【5】Worker线程池会分配具体的线程来处理业务,最后把结果返回给Handler做响应;

这种模式Reactor线程分工明确,MainReactor负责接收新的请求连接,SubReactor负责后续的交互业务,适应于高并发的处理场景,是Netty组件通信框架的所采用的模式;

2、参考案例

服务端】提供两个EventLoopGroup,「ParentGroup」主要是用来接收客户端的请求连接,真正的处理是转交给「ChildGroup」执行,即Reactor多线程模型;

@Slf4jpublic class NettyServer {    public static void main(String[] args) {        // EventLoop组,处理事件和IO        EventLoopGroup parentGroup = new NioEventLoopGroup();        EventLoopGroup childGroup = new NioEventLoopGroup();        try {            // 服务端启动引导类            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(parentGroup, childGroup)                    .channel(NioServerSocketChannel.class).childHandler(new ServerChannelInit());            // 异步IO的结果            ChannelFuture channelFuture = serverBootstrap.bind(8989).sync();            channelFuture.channel().closeFuture().sync();        } catch (Exception e){            e.printStackTrace();        } finally {            parentGroup.shutdownGracefully();            childGroup.shutdownGracefully();        }    }}class ServerChannelInit extends ChannelInitializer {    @Override    protected void initChannel(SocketChannel socketChannel) {        // 获取管道        ChannelPipeline pipeline = socketChannel.pipeline();        // 编码、解码器        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));        // 添加自定义的handler        pipeline.addLast("serverHandler", new ServerHandler());    }}class ServerHandler extends ChannelInboundHandlerAdapter {    /**     * 通道读和写     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("Server-Msg【"+msg+"】");        TimeUnit.MILLISECONDS.sleep(2000);        String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;        ctx.channel().writeAndFlush("hello-client;time:" + nowTime);        ctx.fireChannelActive();    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}

客户端】通过Bootstrap类,与服务器建立连接,服务端通过ServerBootstrap启动服务,绑定在8989端口,然后服务端和客户端进行通信;

public class NettyClient {    public static void main(String[] args) {        // EventLoop处理事件和IO        NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();        try {            // 客户端通道引导            Bootstrap bootstrap = new Bootstrap();            bootstrap.group(eventLoopGroup)                    .channel(NioSocketChannel.class).handler(new ClientChannelInit());            // 异步IO的结果            ChannelFuture channelFuture = bootstrap.connect("localhost", 8989).sync();            channelFuture.channel().closeFuture().sync();        } catch (Exception e){            e.printStackTrace();        } finally {            eventLoopGroup.shutdownGracefully();        }    }}class ClientChannelInit extends ChannelInitializer {    @Override    protected void initChannel(SocketChannel socketChannel) {        // 获取管道        ChannelPipeline pipeline = socketChannel.pipeline();        // 编码、解码器        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));        // 添加自定义的handler        pipeline.addLast("clientHandler", new ClientHandler());    }}class ClientHandler extends ChannelInboundHandlerAdapter {    /**     * 通道读和写     */    @Override    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("Client-Msg【"+msg+"】");        TimeUnit.MILLISECONDS.sleep(2000);        String nowTime = DateTime.now().toString(DatePattern.NORM_DATETIME_PATTERN) ;        ctx.channel().writeAndFlush("hello-server;time:" + nowTime);    }    @Override    public void channelActive(ChannelHandlerContext ctx) throws Exception {        ctx.channel().writeAndFlush("channel...active");    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception {        cause.printStackTrace();        ctx.close();    }}
六、参考源码
编程文档:https://gitee.com/cicadasmile/butte-java-note应用仓库:https://gitee.com/cicadasmile/butte-flyer-parent

关键词:

 

热文推荐

天天新资讯:IO流中「线程」模型总结

客户端与服务端进行通信交互,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,理逻辑看就是:读取数据-业务执行-应

2023-04-07

焦点快报!中国“拼经济”为世界经济注入暖流

中国全力“拼经济”正给世界经济注入暖流。近期,多家国际组织和机构认为中国经济加快恢复将带动区域乃至世界经济增长提速。  世界银行认为

2023-04-07

天天视点!张家界天门山三男一女跳崖?当地已成立专案组调查,年龄最小的00后女孩的同学:她曾是一个开朗女孩……

橙柿互动报道4月6日,张家界天门山景区游客跳崖事件引发公众关注。网传消息称,4月4日13时50分许,接天门山景区报告,有4名游客(3男1女)在景区

2023-04-07

怎样鉴别驱蚊香的好坏 鉴别驱蚊香的好坏的方法

1、看色泽。好的蚊香色泽比较均匀,加工比较精细,点燃的时间可达5个多小时;质量差的蚊香色泽深浅不匀,加工粗糙,易熄火。2、闻气味。好的蚊

2023-04-07

形容雨的成语不带雨字_形容雨的成语 全球聚焦

1、风调雨顺、五风十雨、雨旸时若:形容风雨适时,气候调和,遂人所愿;和风细雨、柔风甘雨、斜风细雨、雨丝风片:指春天的和风细雨。2、亦喻

2023-04-07

当前热点-利润持续走低,传统加油站如何走出新的“增长曲线”?

上世纪90年代,能源行业掀起了一阵轰轰烈烈的加油站经营之风。不少经营者乘此风而起,获得了丰厚的利润。然而,今时不同往日,传统加油站“躺

2023-04-06

天天日报丨天津调整残疾、孤老人员和烈属减征个人所得税政策

最近,天津发布关于调整残疾、孤老人员和烈属减征个人所得税的政策。

2023-04-06

头条焦点:连云港赣榆区二宗工业用地拍出高价,助力赣榆工业立区建设

近日,赣榆共挂牌成交7宗工业用地。具体情况,文中详解!赣榆区二宗工业用地拍出高价4月4日,由赣榆区资源和规划局举办的国有建设用地使用权网

2023-04-06

天天观焦点:充电热水袋里面的水怎么制作?

1、插电式热水袋(也叫暖水宝、电暖袋、贴身暖等)以其价格实惠、长期保温而畅销。2、这种热水袋出售时有两种状态,一种是厂家已经充进了导电

2023-04-06

最资讯丨推荐|2023慕尼黑上海电子生产设备展【论坛议程】大公布!

慕尼黑上海电子生产设备展作为电子制造行业重要的展示交流平台,将在2023年4月13-15日于上海新国际博览中心(N1-N5&W5馆)举办。不仅仅满足于

2023-04-06

世界快看:新任世粮署执行干事强调合作应对粮食危机

新华社罗马4月5日电(记者贺飞)美国常驻联合国粮农三机构——联合国粮食及农业组织、国际农业发展基金及世界粮食计划署(世粮署)大使辛迪·

2023-04-06

资讯:石家庄市新华区西三庄学校开展春季常见传染病预防知识培训

为保障全校师生身心健康,4月4日,西三庄学校健康副校长王文娟为学校师生开展了一次春季常见传染病预防知识讲座。

2023-04-06

要求日本撤回排污入海计划!韩议员将赴福岛考察-焦点快报

中新网4月6日电据韩联社报道,当地时间6日,韩国最大在野党共同民主党的“日本福岛核电站污水对策团”(简称“对策团”)将前往日本福岛,就核污

2023-04-06

天上的街市原文赏析_天上的街市原文 全球快资讯

1、天上的街市郭沫若(郭开贞)远远的 街灯 明了,好像 闪着 无数的 明星。2、天上的 明星 现了,好像 点着

2023-04-06

A股五大上市险企净利润呈“一升四降”,投资收益下降近两成 动态焦点

财报数据显示,中国人保旗下的人保财险在2022年实现承保利润100 63亿元,同比增长456 0%;综合成本率由2021年的99 5%压降至97 6%,下降1 9个

2023-04-06

天天观点:太突然!他宣布离婚,代价 90 亿

太突然!他宣布离婚,代价90亿,4月4日三六零发布公告称:公司获悉2023年4月4日周鸿祎先生与胡欢女士经友好协商,已办理解除婚姻关系手续,并就

2023-04-06

5.98万起售,五菱缤果,真的便宜吗?

3月29日,预热很久的五菱缤果(图片|配置|询价)终于上市了,该车定位五门四座纯电动车,共推出了5款车型,两个续航版本(203公里、333公里)。

2023-04-06

磁力链接怎么用的(磁力链接怎么用) 世界热资讯

1、是我们下载网上资源的常用软件,可以识别下载磁性链接。2、打开百度网盘,双击新建一个下载任务。3、在链接输入框中填写“

2023-04-06

【快播报】30多项服务一网打尽 长江航运有了“水上百事通”

30多项服务一网打尽长江航运有了“水上百事通”---湖北日报讯(记者戴辉、通讯员高妞、李璐)从航道电子地图,到“水上超市”下单……4月3日,

2023-04-06

特朗普刑事案标志美国党争和社会撕裂进一步加剧_消息

纽约 华盛顿4月4日电(国际观察)特朗普刑事案标志美国党争和社会撕裂进一步加剧新华社记者孙丁兴越刘亚南美国前总统特朗普4日在纽约州纽约市

2023-04-06

资讯

天天新资讯:IO流中「线程」模型总结

客户端与服务端进行通信交互,可能是同步或者异步,服务端进行「流」处理时,可能是阻塞或者非阻塞模式,理逻辑看就是:读取数据-业务执行-应

2023-04-07     
焦点快报!中国“拼经济”为世界经济注入暖流

中国全力“拼经济”正给世界经济注入暖流。近期,多家国际组织和机构认为中国经济加快恢复将带动区域乃至世界经济增长提速。  世界银行认为

2023-04-07     
天天视点!张家界天门山三男一女跳崖?当地已成立专案组调查,年龄最小的00后女孩的同学:她曾是一个开朗女孩……

橙柿互动报道4月6日,张家界天门山景区游客跳崖事件引发公众关注。网传消息称,4月4日13时50分许,接天门山景区报告,有4名游客(3男1女)在景区

2023-04-07     
怎样鉴别驱蚊香的好坏 鉴别驱蚊香的好坏的方法

1、看色泽。好的蚊香色泽比较均匀,加工比较精细,点燃的时间可达5个多小时;质量差的蚊香色泽深浅不匀,加工粗糙,易熄火。2、闻气味。好的蚊

2023-04-07     
形容雨的成语不带雨字_形容雨的成语 全球聚焦

1、风调雨顺、五风十雨、雨旸时若:形容风雨适时,气候调和,遂人所愿;和风细雨、柔风甘雨、斜风细雨、雨丝风片:指春天的和风细雨。2、亦喻

2023-04-07     
当前热点-利润持续走低,传统加油站如何走出新的“增长曲线”?

上世纪90年代,能源行业掀起了一阵轰轰烈烈的加油站经营之风。不少经营者乘此风而起,获得了丰厚的利润。然而,今时不同往日,传统加油站“躺

2023-04-06