• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Tomcat NIO 实现

武飞扬头像
南顾北衫
帮助1

1. tomcat网络整体架构

学新通

来自 https://www.cnblogs.com/cuzzz/p/17499364.html

上图是tomcat整个网络请求模型

  1. Acceptor线程作为监听线程,会通过通过 accept 方法 获取连接,该线程没有使用selector进行多路复用,使用了阻塞式的accept
  2. 有请求连接后,就会把该连接设置为非阻塞的socket,并且加入到队列中交给poller线程进行处理
  3. poller线程有selector,可以进行多路复用,从队列中获取 需要注册事件的连接
  4. 对于读事件来说,会先取消关心的读事件(因为JDK 的NIO是水平触发模,如果不进行取消,只要该socket缓冲区的数据没有读取完成,会一直触发读事件发生),然后交给业务线程池进行http协议解析,以及servlet业务执行

2. Tomcat读Socket数据原理

tomcat 连接建立时,监听read事件,然后使用fill(boolean block) 方法进行socket数据读取。参数block可以是阻塞模式,也可以是非阻塞模式

  • 阻塞模式:如果socket.read()返回0,再次注册读事件,第二次读,如果有数据返回,没有数据,object.wait,如果超时会抛出超时异常
  • 非阻塞模式:不管是否读取到数据,直接返回

http1.1协议数据解析

  • 解析请求行 :非阻塞模式读取

  • 解析请求头 :非阻塞模式读取

  • 请求行和请求头使用非阻塞模式读,会带来的问题,比如请求头只发送了一半,请求行只有一半,然后非阻塞读的时候没有读取到新数据,tomcat的处理是直接让该连接变成长连接,并重新注册读事件监听,这样就可以不占用线程资源

  • 请求体(HttpServletRequest中请求体Stream流方式读取):阻塞,没有读完就注册读事件,直到读到数据,会占用线程资源(因为会使得该线程阻塞)

怎么判断请求体是否读取完整?
Http1.1协议中使用content-length 或者 transfer-encoding字段进行判断

  • content-length:整个请求内容字节数
  • transfer-encoding:比如:transfer-encoding:chunk,格式是chunk长度\r\n ,结束标志是0\r\n\r\n

3. 实现简易的Tomcat NIO网络模型

 public static void main(String[] args) throws IOException {
        MyTomcat tomcat= new MyTomcat ();
        tomcat.start();
    }


    void start() {
        Poller poller = new Poller();
        Acceptor acceptor = new Acceptor();
        acceptor.poller = poller;
        acceptor.start();
        poller.start();

        Thread thread = new Thread(acceptor);
        thread.setName("acceptor");
        thread.start();

        Thread pollerThread = new Thread(poller);
        pollerThread.setName("poller");
        pollerThread.start();

    }

    class Acceptor implements Runnable {
        ServerSocketChannel serverSocketChannel = null;

        Poller poller = null;

        public void start() {
            try {
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(true);
                serverSocketChannel.bind(new InetSocketAddress("0.0.0.0", 9898));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            while (true) {
                try {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    socketChannel.configureBlocking(false);

                    SocketWrapper socketWrapper = new SocketWrapper();
                    socketWrapper.socketChannel = socketChannel;
                    socketWrapper.poller = poller;
                    poller.register(socketWrapper);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    class Poller implements Runnable {
        Selector selector = null;

        Executor executor = Executors.newFixedThreadPool(50);

        LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>();

        public void register(SocketWrapper socketWrapper) {
            Event event = new Event();
            event.socketWrapper = socketWrapper;
            addEvent(event);
        }

        public void addEvent(Event event) {
            queue.add(event);
            selector.wakeup();
        }

        public void start() {
            try {
                selector = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void run() {
            int keyCount = 0;
            while (true) {
                try {
                    keyCount = selector.select();
                    Event event = null;
                    while ((event = queue.poll()) != null) {
                        SocketWrapper socketWrapper = event.socketWrapper;
                        SocketChannel socketChannel = socketWrapper.socketChannel;
                        if (event.ops == event.REGISTER) {
                            socketChannel.register(selector, SelectionKey.OP_READ, socketWrapper);
                        } else if (socketWrapper.readBlocking) {
                            synchronized (socketWrapper.object) {
                                socketWrapper.readBlocking = false;
                                socketWrapper.object.notify();
                            }
                        } else {
                            SelectionKey selectionKey = socketChannel.keyFor(selector);
                            if (selectionKey == null) {
                                socketChannel.close();
                            } else {
                                SocketWrapper attachment = ((SocketWrapper) selectionKey.attachment());
                                if (attachment != null) {
                                    try {
                                        int ops = event.ops | selectionKey.interestOps();
                                        selectionKey.interestOps(ops);
                                    } catch (CancelledKeyException ckx) {
                                        selectionKey.cancel();
                                    }
                                } else {
                                    selectionKey.cancel();
                                }
                            }
                        }
                    }
                    if (keyCount > 0) {
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        System.out.println(keyCount);
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            iterator.remove();
                            if (selectionKey.isReadable()) {
                                final SocketWrapper socketWrapper = (SocketWrapper) selectionKey.attachment();

                                selectionKey.interestOps(selectionKey.interestOps() & (~selectionKey.readyOps()));
                                executor.execute(() -> {
                                    Http11Processor http11Processor = new Http11Processor();
                                    http11Processor.process(socketWrapper);
                                });
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    }

    class SocketWrapper {
        private SocketChannel socketChannel;
        private Poller poller;
        private final Object object = new Object();
        private volatile boolean readBlocking = false;
        private long timeout = TimeUnit.SECONDS.toMillis(20);


        private int read(boolean isBlock, ByteBuffer byteBuffer) {
            int len = 0;
            try {
                if (isBlock) {
                    long startNanos = 0;
                    do {
                        if (startNanos > 0) {
                            long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                            if (elapsedMillis == 0) {
                                elapsedMillis = 1;
                            }
                            timeout -= elapsedMillis;
                            if (timeout <= 0) {
                                throw new RuntimeException("读取超时");
                            }
                        }

                        len = socketChannel.read(byteBuffer);
                        if (len == -1) {
                            throw new RuntimeException("客户端断开连接");
                        } else if (len == 0) {
                            if (!readBlocking) {
                                readBlocking = true;
                                poller.register(this);
                            }
                            synchronized (object) {
                                if (readBlocking) {
                                    if (timeout > 0) {
                                        startNanos = System.nanoTime();
                                        object.wait(timeout);
                                    } else {
                                        object.wait();
                                    }
                                }
                            }
                        }
                    } while (len == 0);
                } else {
                    len = socketChannel.read(byteBuffer);
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
            return len;
        }

    }

    class Event {
        private final int REGISTER = 0x100;
        private int ops = REGISTER;
        private SocketWrapper socketWrapper;
    }

    class Http11Processor {
        void process(SocketWrapper socketWrapper) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(4 * 1024);

            Request request = new Request();

            //解析请求行
            parseRequestLine(request, socketWrapper, byteBuffer);

            //解析请求头
            parseRequestHeaders(request, socketWrapper, byteBuffer);
            System.out.println(request.method);
            System.out.println(request.uri);
            System.out.println(request.protocol);
            System.out.println(request.headers);

            //写
            Response response = new Response();
            response.socketWrapper = socketWrapper;

            response.write(("<h1>hello world</h1>").getBytes(StandardCharsets.UTF_8));
        }

        private void parseRequestHeaders(Request request, SocketWrapper socketWrapper, ByteBuffer byteBuffer) {
            boolean isParseHeaders = false;
            StringBuilder stringBuilder = new StringBuilder();
            String headerName = null;
            while (!isParseHeaders) {
                if (byteBuffer.position() >= byteBuffer.limit()) {
                    byteBuffer.clear();
                    socketWrapper.read(true, byteBuffer);
                }
                byte b = byteBuffer.get();
                if (headerName == null && b == ':') {
                    headerName = stringBuilder.toString();
                    stringBuilder = new StringBuilder();
                } else if (b == '\r') {
                } else if (b == '\n' && headerName == null) {
                    isParseHeaders = true;
                } else if (b == '\n') {
                    request.headers.put(headerName, stringBuilder.toString());
                    headerName = null;
                    stringBuilder = new StringBuilder();
                } else {
                    stringBuilder.append((char) b);
                }
            }
        }

        private void parseRequestLine(Request request, SocketWrapper socketWrapper, ByteBuffer byteBuffer) {
            //读 解析
            socketWrapper.read(true, byteBuffer);
            byteBuffer.flip();
            StringBuilder stringBuilder = new StringBuilder();
            boolean space = false;
            while (!space) {
                if (byteBuffer.position() >= byteBuffer.limit()) {
                    byteBuffer.clear();
                    socketWrapper.read(true, byteBuffer);
                }
                char c = (char) byteBuffer.get();
                if (c == ' ') {
                    request.method = stringBuilder.toString();
                    space = true;
                } else if (c == '\r') {
                } else if (c == '\n') {
                    space = true;
                } else {
                    stringBuilder.append(c);
                }
            }
            space = false;
            stringBuilder = new StringBuilder();
            while (!space) {
                if (byteBuffer.position() >= byteBuffer.limit()) {
                    byteBuffer.clear();
                    socketWrapper.read(true, byteBuffer);
                }
                char c = (char) byteBuffer.get();
                if (c == ' ') {
                    request.uri = stringBuilder.toString();
                    space = true;
                } else if (c == '\r') {
                } else if (c == '\n') {
                    space = true;
                } else {
                    stringBuilder.append(c);
                }
            }

            space = false;
            stringBuilder = new StringBuilder();
            while (!space) {
                if (byteBuffer.position() >= byteBuffer.limit()) {
                    byteBuffer.clear();
                    socketWrapper.read(true, byteBuffer);
                }
                char c = (char) byteBuffer.get();
                if (c == '\r') {
                } else if (c == '\n') {
                    request.protocol = stringBuilder.toString();
                    space = true;
                } else {
                    stringBuilder.append(c);
                }
            }
        }
    }

    class Request {
        private String method;
        private String uri;
        private String protocol;
        private int contentLength;
        private final Map<String, Object> headers = new HashMap<>();

    }

    class Response {
        SocketWrapper socketWrapper = null;
        private final int maxHeaderSize = 1024 * 2;
        private ByteBuffer byteBuffer = null;

        public void write(byte[] bytes) {
            System.out.println("write enter");
            byteBuffer = ByteBuffer.allocate(maxHeaderSize);
            //写入响应行
            byteBuffer.put("HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8));
            //写入响应头
            byteBuffer.put("Content-Type: text/html;charset=utf-8\r\n".getBytes(StandardCharsets.UTF_8));
            //content-length
            byteBuffer.put(("Content-Length: "   bytes.length   "\r\n").getBytes(StandardCharsets.UTF_8));

            //写入响应空行
            byteBuffer.put("\r\n".getBytes(StandardCharsets.UTF_8));
            //写入响应体
            try {
                byteBuffer.flip();
                SocketChannel socketChannel = socketWrapper.socketChannel;

                int write = socketChannel.write(byteBuffer);
                System.out.println("http header write:"   write);

                ByteBuffer picByteBuffer = ByteBuffer.wrap(bytes);
                while (picByteBuffer.hasRemaining()) {
                    int write1 = socketChannel.write(picByteBuffer);
                    System.out.println("picByteBuffer position:"   picByteBuffer.position()   " limit:"   picByteBuffer.limit()   " leave:"   (picByteBuffer.limit() - picByteBuffer.position())   " http body write:"   write1   " content-length:"   bytes.length);
                }


                Event event = new Event();
                event.ops = SelectionKey.OP_READ;
                event.socketWrapper = socketWrapper;
                socketWrapper.poller.addEvent(event);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfhhacc
系列文章
更多 icon
同类精品
更多 icon
继续加载