`
shihuan830619
  • 浏览: 573910 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

Netty5 AIO

    博客分类:
  • J2SE
阅读更多
工程结构图:


TimeServer.java文件内容如下:
package com.shihuan.netty.server;

public class TimeServer {

	public static void main(String[] args) {
		int port = 8080;
		if (args != null && args.length > 0) {
		    try {
		    	port = Integer.valueOf(args[0]);
		    } catch (NumberFormatException e) {
		    	// 采用默认值
		    }
		}
		AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port);
		new Thread(timeServer, "AIO-AsyncTimeServerHandler-001").start();
	}

}


AsyncTimeServerHandler.java文件内容如下:
package com.shihuan.netty.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;

public class AsyncTimeServerHandler implements Runnable {

	private int port;

    CountDownLatch latch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    public AsyncTimeServerHandler(int port) {
		this.port = port;
		try {
		    asynchronousServerSocketChannel = AsynchronousServerSocketChannel
			    .open();
		    asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
		    System.out.println("The time server is start in port : " + port);
		} catch (IOException e) {
		    e.printStackTrace();
		}
    }
    
    public void doAccept() {
    	asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler());
	}
	
	@Override
	public void run() {
		latch = new CountDownLatch(1);
		doAccept();
		try {
		    latch.await();
		} catch (InterruptedException e) {
		    e.printStackTrace();
		}
	}

}


AcceptCompletionHandler.java文件内容如下:
package com.shihuan.netty.server;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> {

	@Override
	public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) {
		attachment.asynchronousServerSocketChannel.accept(attachment, this);
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		result.read(buffer, buffer, new ReadCompletionHandler(result));
	}

	@Override
	public void failed(Throwable exc, AsyncTimeServerHandler attachment) {
		exc.printStackTrace();
		attachment.latch.countDown();
	}

}


ReadCompletionHandler.java文件内容如下:
package com.shihuan.netty.server;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {

	private AsynchronousSocketChannel channel;
	
	public ReadCompletionHandler(AsynchronousSocketChannel channel) {
		if (this.channel == null) {
			this.channel = channel;
		}    
	}
	
	private void doWrite(String currentTime) {
		if (currentTime != null && currentTime.trim().length() > 0) {
			byte[] bytes = (currentTime).getBytes();
			ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
			writeBuffer.put(bytes);
			writeBuffer.flip();
			channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
				@Override
				public void completed(Integer result, ByteBuffer buffer) {
					// 如果没有发送完成,继续发送
					if (buffer.hasRemaining()) {
						channel.write(buffer, buffer, this);
					}	
				}

				@Override
				public void failed(Throwable exc, ByteBuffer attachment) {
					try {
						channel.close();
					} catch (IOException e) {
						// ingnore on close
					}
				}
			});
		}
	}
	
	@Override
	public void completed(Integer result, ByteBuffer attachment) {
		attachment.flip();
		byte[] body = new byte[attachment.remaining()];
		attachment.get(body);
		try {
		    String req = new String(body, "UTF-8");
		    System.out.println("The time server receive order : " + req);
		    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date(System.currentTimeMillis()).toString() : "BAD ORDER";
		    doWrite(currentTime);
		} catch (UnsupportedEncodingException e) {
		    e.printStackTrace();
		}
	}

	@Override
	public void failed(Throwable exc, ByteBuffer attachment) {
		try {
		    this.channel.close();
		} catch (IOException e) {
		    e.printStackTrace();
		}
	}

}


TimeClient.java文件内容如下:
package com.shihuan.netty.client;

public class TimeClient {

	public static void main(String[] args) {
		int port = 8080;
		if (args != null && args.length > 0) {
		    try {
		    	port = Integer.valueOf(args[0]);
		    } catch (NumberFormatException e) {
		    	// 采用默认值
		    }

		}
		new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler-001").start();
	}

}


AsyncTimeClientHandler.java文件内容如下:
package com.shihuan.netty.client;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {

	private AsynchronousSocketChannel client;
    private String host;
    private int port;
    private CountDownLatch latch;

    public AsyncTimeClientHandler(String host, int port) {
		this.host = host;
		this.port = port;
		try {
		    client = AsynchronousSocketChannel.open();
		} catch (IOException e) {
		    e.printStackTrace();
		}
    }
	
	@Override
	public void run() {
		latch = new CountDownLatch(1);
		client.connect(new InetSocketAddress(host, port), this, this);
		try {
		    latch.await();
		} catch (InterruptedException e1) {
		    e1.printStackTrace();
		}
		try {
		    client.close();
		} catch (IOException e) {
		    e.printStackTrace();
		}
	}

	@Override
	public void completed(Void result, AsyncTimeClientHandler attachment) {
		byte[] req = "QUERY TIME ORDER".getBytes();
		ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
		writeBuffer.put(req);
		writeBuffer.flip();
		client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
			    @Override
			    public void completed(Integer result, ByteBuffer buffer) {
				if (buffer.hasRemaining()) {
				    client.write(buffer, buffer, this);
				} else {
				    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
				    client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
						@Override
						public void completed(Integer result,
							ByteBuffer buffer) {
						    buffer.flip();
						    byte[] bytes = new byte[buffer.remaining()];
						    buffer.get(bytes);
						    String body;
						    try {
								body = new String(bytes,"UTF-8");
								System.out.println("Now is : " + body);
								latch.countDown();
						    } catch (UnsupportedEncodingException e) {
						    	e.printStackTrace();
						    }
						}
	
						@Override
						public void failed(Throwable exc, ByteBuffer attachment) {
						    try {
								client.close();
								latch.countDown();
						    } catch (IOException e) {
						    	// ingnore on close
						    }
						}
				    });
				}
		    }
	
		    @Override
		    public void failed(Throwable exc, ByteBuffer attachment) {
				try {
				    client.close();
				    latch.countDown();
				} catch (IOException e) {
				    // ingnore on close
				}
		    }
		});
	}

	@Override
	public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
		exc.printStackTrace();
		try {
		    client.close();
		    latch.countDown();
		} catch (IOException e) {
		    e.printStackTrace();
		}
	}

}


运行截图:



testnetty5.rar是源代码
  • 大小: 34.6 KB
  • 大小: 21.3 KB
  • 大小: 17.3 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics