基于Selector+Channel+线程池的Timeserver
public class TimeServer {
private static ExecutorService executor;
static {
executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
}
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(8080));
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, ssc.validOps());
while (true) {
int readyCount = selector.select(1000);
if(readyCount==0){
continue;
}
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while(keyIterator.hasNext()){
SelectionKey selectionKey = keyIterator.next();
if(selectionKey.isValid()){
//表示ServerSocketChannel
if(selectionKey.isAcceptable()){
ServerSocketChannel server = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = server.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
//表示SocketChannel
if(selectionKey.isReadable()){
executor.submit(new TimeServerTask(selectionKey));
}
keyIterator.remove();
}
}
}
}
}
public class TimeServerTask implements Runnable{
private SelectionKey selectionKey;
public TimeServerTask(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void run() {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
try {
int count =0;
while ((count = channel.read(byteBuffer)) > 0) {
byteBuffer.flip();
byte[] request=new byte[byteBuffer.remaining()];
byteBuffer.get(request);
String requestStr=new String(request);
byteBuffer.clear();
if (!"GET CURRENT TIME".equals(requestStr)) {
channel.write(byteBuffer.put("BAD_REQUEST".getBytes()));
} else {
byteBuffer.put(Calendar.getInstance().getTime().toLocaleString().getBytes());
byteBuffer.flip();
channel.write(byteBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
selectionKey.cancel();
}
}
}
这段代码的实现中:
- 把Channel的就绪选择放在了主线程(Acceptor线程)中来处理(等待数据准备阶段)
- 而真正的读取请求并返回响应放在了线程池中提交一个任务来执行(处理数据阶段)
真正意义上实现了一个线程服务于多个client
TimeClient直接使用上一节的即可
网友评论