博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java学习-NIO(三)Channel
阅读量:2212 次
发布时间:2019-05-06

本文共 9020 字,大约阅读时间需要 30 分钟。

通道(Channel)是java.nio的第二个主要创新。它们既不是一个扩展也不是一项增强,而是全新、极好的Java I/O示例,提供与I/O服务的直接连接。Channel用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。

channel介绍

通道是访问I/O服务的导管。I/O可以分为广义的两大类别:File I/O和Stream I/O。那么相应地有两种类型的通道也就不足为怪了,它们是文件(file)通道和套接字(socket)通道。我们看到在api里有一个FileChannel类和三个socket通道类:SocketChannel、ServerSocketChannel和DatagramChannel。

通道可以以多种方式创建。Socket通道有可以直接创建新socket通道的工厂方法。但是一个FileChannel对象却只能通过在一个打开的RandomAccessFile、FileInputStream或FileOutputStream对象上调用getChannel( )方法来获取。你不能直接创建一个FileChannel对象。

我们先来看一下FileChannel的用法:

// 创建文件输出字节流   FileOutputStream fos = new FileOutputStream("data.txt");   //得到文件通道   FileChannel fc = fos.getChannel();   //往通道写入ByteBuffer   fc.write(ByteBuffer.wrap("Some text ".getBytes()));   //关闭流   fos.close();   //随机访问文件   RandomAccessFile raf = new RandomAccessFile("data.txt", "rw");   //得到文件通道   fc = raf.getChannel();   //设置通道的文件位置 为末尾   fc.position(fc.size());    //往通道写入ByteBuffer   fc.write(ByteBuffer.wrap("Some more".getBytes()));   //关闭   raf.close();   //创建文件输入流   FileInputStream fs = new FileInputStream("data.txt");   //得到文件通道   fc = fs.getChannel();   //分配ByteBuffer空间大小   ByteBuffer buff = ByteBuffer.allocate(BSIZE);   //从通道中读取ByteBuffer   fc.read(buff);   //调用此方法为一系列通道写入或相对获取 操作做好准备   buff.flip();   //从ByteBuffer从依次读取字节并打印   while (buff.hasRemaining()){       System.out.print((char) buff.get());   }   fs.close();

再来看一下SocketChannel:

SocketChannel sc = SocketChannel.open( ); sc.connect (new InetSocketAddress ("somehost", someport));  ServerSocketChannel ssc = ServerSocketChannel.open( );  ssc.socket( ).bind (new InetSocketAddress (somelocalport));  DatagramChannel dc = DatagramChannel.open( );

可以设置 SocketChannel 为非阻塞模式(non-blocking mode).设置之后,就可以在异步模式下调用connect(), read() 和write()了。如果SocketChannel在非阻塞模式下,此时调用connect(),该方法可能在连接建立之前就返回了。为了确定连接是否建立,可以调用finishConnect()的方法。像这样:

socketChannel.configureBlocking(false);socketChannel.connect(new InetSocketAddress("http://jenkov.com", 80));while(! socketChannel.finishConnect() ){    //wait, or do something else...}

服务器端的使用经常会考虑到非阻塞socket通道,因为它们使同时管理很多socket通道变得更容易。但是,在客户端使用一个或几个非阻塞模式的socket通道也是有益处的,例如,借助非阻塞socket通道,GUI程序可以专注于用户请求并且同时维护与一个或多个服务器的会话。在很多程序上,非阻塞模式都是有用的。

调用finishConnect( )方法来完成连接过程,该方法任何时候都可以安全地进行调用。假如在一个非阻塞模式的SocketChannel对象上调用finishConnect( )方法,将可能出现下列情形之一:

  • connect( )方法尚未被调用。那么将产生NoConnectionPendingException异常。
  • 连接建立过程正在进行,尚未完成。那么什么都不会发生,finishConnect( )方法会立即返回false值。
  • 在非阻塞模式下调用connect( )方法之后,SocketChannel又被切换回了阻塞模式。那么如果有必要的话,调用线程会阻塞直到连接建立完成,finishConnect( )方法接着就会返回true值。在初次调用connect( )或最后一次调用finishConnect( )之后,连接建立过程已经完成。那么SocketChannel对象的内部状态将被更新到已连接状态,finishConnect( )方法会返回true值,然后SocketChannel对象就可以被用来传输数据了。
  • 连接已经建立。那么什么都不会发生,finishConnect( )方法会返回true值。

Socket通道是线程安全的。并发访问时无需特别措施来保护发起访问的多个线程,不过任何时候都只有一个读操作和一个写操作在进行中。请记住,sockets是面向流的而非包导向的。它们可以保证发送的字节会按照顺序到达但无法承诺维持字节分组。某个发送器可能给一个socket写入了20个字节而接收器调用read( )方法时却只收到了其中的3个字节。剩下的17个字节还是传输中。由于这个原因,让多个不配合的线程共享某个流socket的同一侧绝非一个好的设计选择。

最后再看一下DatagramChannel:

最后一个socket通道是DatagramChannel。正如SocketChannel对应Socket,ServerSocketChannel对应ServerSocket,每一个DatagramChannel对象也有一个关联的DatagramSocket对象。不过原命名模式在此并未适用:“DatagramSocketChannel”显得有点笨拙,因此采用了简洁的“DatagramChannel”名称。

正如SocketChannel模拟连接导向的流协议(如TCP/IP),DatagramChannel则模拟包导向的无连接协议(如UDP/IP):

创建DatagramChannel的模式和创建其他socket通道是一样的:调用静态的open( )方法来创建一个新实例。新DatagramChannel会有一个可以通过调用socket( )方法获取的对等DatagramSocket对象。DatagramChannel对象既可以充当服务器(监听者)也可以充当客户端(发送者)。如果你希望新创建的通道负责监听,那么通道必须首先被绑定到一个端口或地址/端口组合上。绑定DatagramChannel同绑定一个常规的DatagramSocket没什么区别,都是委托对等socket对象上的API实现的:

DatagramChannel channel = DatagramChannel.open( ); DatagramSocket socket = channel.socket( );  socket.bind (new InetSocketAddress (portNumber));

DatagramChannel是无连接的。每个数据报(datagram)都是一个自包含的实体,拥有它自己的目的地址及不依赖其他数据报的数据净荷。与面向流的的socket不同,DatagramChannel可以发送单独的数据报给不同的目的地址。同样,DatagramChannel对象也可以接收来自任意地址的数据包。每个到达的数据报都含有关于它来自何处的信息(源地址)。

一个未绑定的DatagramChannel仍能接收数据包。当一个底层socket被创建时,一个动态生成的端口号就会分配给它。绑定行为要求通道关联的端口被设置为一个特定的值(此过程可能涉及安全检查或其他验证)。不论通道是否绑定,所有发送的包都含有DatagramChannel的源地址(带端口号)。未绑定的DatagramChannel可以接收发送给它的端口的包,通常是来回应该通道之前发出的一个包。已绑定的通道接收发送给它们所绑定的熟知端口(wellknown port)的包。数据的实际发送或接收是通过send( )和receive( )方法来实现的。

注意:假如您提供的ByteBuffer没有足够的剩余空间来存放您正在接收的数据包,没有被填充的字节都会被悄悄地丢弃。

Scatter/Gather

通道提供了一种被称为Scatter/Gather的重要新功能(有时也被称为矢量I/O)。它是指在多个缓冲区上实现一个简单的I/O操作。对于一个write操作而言,数据是从几个缓冲区按顺序抽取(称为gather)并沿着通道发送的。缓冲区本身并不需要具备这种gather的能力(通常它们也没有此能力)。该gather过程的效果就好比全部缓冲区的内容被连结起来,并在发送数据前存放到一个大的缓冲区中。对于read操作而言,从通道读取的数据会按顺序被散布(称为scatter)到多个缓冲区,将每个缓冲区填满直至通道中的数据或者缓冲区的最大空间被消耗完。

scatter / gather经常用于需要将传输的数据分开处理的场合,例如传输一个由消息头和消息体组成的消息,你可能会将消息体和消息头分散到不同的buffer中,这样你可以方便的处理消息头和消息体。

Scattering Reads是指数据从一个channel读取到多个buffer中。如下图描述:

代码示例如下:

ByteBuffer header = ByteBuffer.allocateDirect (10); ByteBuffer body = ByteBuffer.allocateDirect (80); ByteBuffer [] buffers = { header, body }; int bytesRead = channel.read (buffers);

Gathering Writes是指数据从多个buffer写入到同一个channel。如下图描述:

代码示例如下:

ByteBuffer header = ByteBuffer.allocateDirect (10);  ByteBuffer body = ByteBuffer.allocateDirect (80);  ByteBuffer [] buffers = { header, body };  channel.write(bufferArray);

使用得当的话,Scatter/Gather会是一个极其强大的工具。它允许你委托操作系统来完成辛苦活:将读取到的数据分开存放到多个存储桶(bucket)或者将不同的数据区块合并成一个整体。这是一个巨大的成就,因为操作系统已经被高度优化来完成此类工作了。它节省了您来回移动数据的工作,也就避免了缓冲区拷贝和减少了您需要编写、调试的代码数量。既然您基本上通过提供数据容器引用来组合数据,那么按照不同的组合构建多个缓冲区阵列引用,各种数据区块就可以以不同的方式来组合了。下面的例子好地诠释了这一点:

public class GatheringTest {
private static final String DEMOGRAPHIC = "output.txt"; public static void main (String [] argv) throws Exception { int reps = 10; if (argv.length > 0) { reps = Integer.parseInt(argv[0]); } FileOutputStream fos = new FileOutputStream(DEMOGRAPHIC); GatheringByteChannel gatherChannel = fos.getChannel(); ByteBuffer[] bs = utterBS(reps); while (gatherChannel.write(bs) > 0) { // 不做操作,让通道把数据输出到文件写完 } System.out.println("Mindshare paradigms synergized to " + DEMOGRAPHIC); fos.close(); } private static String [] col1 = { "Aggregate", "Enable", "Leverage", "Facilitate", "Synergize", "Repurpose", "Strategize", "Reinvent", "Harness" }; private static String [] col2 = { "cross-platform", "best-of-breed", "frictionless", "ubiquitous", "extensible", "compelling", "mission-critical", "collaborative", "integrated" }; private static String [] col3 = { "methodologies", "infomediaries", "platforms", "schemas", "mindshare", "paradigms", "functionalities", "web services", "infrastructures" }; private static String newline = System.getProperty ("line.separator"); private static ByteBuffer [] utterBS (int howMany) throws Exception { List list = new LinkedList(); for (int i = 0; i < howMany; i++) { list.add(pickRandom(col1, " ")); list.add(pickRandom(col2, " ")); list.add(pickRandom(col3, newline)); } ByteBuffer[] bufs = new ByteBuffer[list.size()]; list.toArray(bufs); return (bufs); } private static Random rand = new Random( ); /** * 随机生成字符 * @param strings * @param suffix * @return * @throws Exception */ private static ByteBuffer pickRandom (String [] strings, String suffix) throws Exception { String string = strings [rand.nextInt (strings.length)]; int total = string.length() + suffix.length( ); ByteBuffer buf = ByteBuffer.allocate (total); buf.put (string.getBytes ("US-ASCII")); buf.put (suffix.getBytes ("US-ASCII")); buf.flip( ); return (buf); }}

输出为:

Reinvent integrated web services Aggregate best-of-breed platforms Harness frictionless platforms Repurpose extensible paradigms Facilitate ubiquitous methodologies Repurpose integrated methodologies Facilitate mission-critical paradigms Synergize compelling methodologies Reinvent compelling functionalities Facilitate extensible platforms

虽然这种输出没有什么意义,但是gather确是很容易的让我们把它输出出来。

Pipe

java.nio.channels包中含有一个名为Pipe(管道)的类。广义上讲,管道就是一个用来在两个实体之间单向传输数据的导管。

Java NIO 管道是2个线程之间的单向数据连接。Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。Pipe类创建一对提供环回机制的Channel对象。这两个通道的远端是连接起来的,以便任何写在SinkChannel对象上的数据都能出现在SourceChannel对象上。

下面我们来创建一条Pipe,并向Pipe中写数据:

//通过Pipe.open()方法打开管道Pipe pipe = Pipe.open();//要向管道写数据,需要访问sink通道Pipe.SinkChannel sinkChannel = pipe.sink();//通过调用SinkChannel的write()方法,将数据写入SinkChannelString newData = "New String to write to file..." + System.currentTimeMillis();ByteBuffer buf = ByteBuffer.allocate(48);buf.clear();buf.put(newData.getBytes());buf.flip();while(buf.hasRemaining()) {    sinkChannel.write(buf);}

再看如何从管道中读取数据:

读取管道的数据,需要访问source通道:

Pipe.SourceChannel sourceChannel = pipe.source();

调用source通道的read()方法来读取数据:

ByteBuffer buf = ByteBuffer.allocate(48);int bytesRead = sourceChannel.read(buf);

read()方法返回的int值会告诉我们多少字节被读进了缓冲区。

到此我们就把通道的简单用法讲完了,要想会用还是得多去练习,多模拟使用,这样才知道什么时候用以及怎么用,下节我们来讲选择器-Selectors。

转载于:https://www.cnblogs.com/rickiyang/p/11074241.html

你可能感兴趣的文章
搞懂分布式技术4:ZAB协议概述与选主流程详解
查看>>
搞懂分布式技术5:Zookeeper的配置与集群管理实战
查看>>
搞懂分布式技术6:Zookeeper典型应用场景及实践
查看>>
搞懂分布式技术10:LVS实现负载均衡的原理与实践
查看>>
搞懂分布式技术11:分布式session解决方案与一致性hash
查看>>
搞懂分布式技术12:分布式ID生成方案
查看>>
搞懂分布式技术13:缓存的那些事
查看>>
搞懂分布式技术14:Spring Boot使用注解集成Redis缓存
查看>>
搞懂分布式技术15:缓存更新的套路
查看>>
搞懂分布式技术16:浅谈分布式锁的几种方案
查看>>
搞懂分布式技术17:浅析分布式事务
查看>>
搞懂分布式技术18:分布式事务常用解决方案
查看>>
搞懂分布式技术19:使用RocketMQ事务消息解决分布式事务
查看>>
搞懂分布式技术20:消息队列因何而生
查看>>
搞懂分布式技术21:浅谈分布式消息技术 Kafka
查看>>
后端技术杂谈1:搜索引擎基础倒排索引
查看>>
后端技术杂谈2:搜索引擎工作原理
查看>>
后端技术杂谈3:Lucene基础原理与实践
查看>>
后端技术杂谈4:Elasticsearch与solr入门实践
查看>>
后端技术杂谈5:云计算的前世今生
查看>>