美文网首页
Tomcat源码分析 -- 6

Tomcat源码分析 -- 6

作者: sschrodinger | 来源:发表于2019-03-08 09:44 被阅读0次

Tomcat源码分析 -- 6

sschrodinger

2019/01/09


参考


  • 《深入剖析 Tomcat》 - 基于Tomcat 4.x
  • 《Tomcat 架构解析》刘光瑞 著
  • 《大话设计模式》程杰 著
  • Tomcat 8.5.x 源码

tomcat 核心组件


tomcat 使用两个核心部件处理到来的 socket 连接,分别是 ConnectorContainer。其中connector主要是处理外来的连接,并将它发往Container进行处理,最后通过connector重新发回客户端。

核心组件

一个 service 组件管理多个 Connector 和一个 Container。


Connector (连接器)


Connector 在新的 Socket 到来之后,对 Socket 的内容进行解析,解析出 Http request,并创建 request 对象和 respnse。

Connector都遵循以下的结构处理 socket 数据。

coyote结构

其中,ProtocolHandler 和 Adaptor 都是 Connector 直接持有的对象。

在 Tomcat 8 中,Connector 是继承了 LifecycleMBeanBase 的一个类,源码截取如下:

public class Connector extends LifecycleMBeanBase  {

    public Connector() {
        this(null);
    }

    public Connector(String protocol) {
        setProtocol(protocol);
        // Instantiate protocol handler
        ProtocolHandler p = null;
        try {
            Class<?> clazz = Class.forName(protocolHandlerClassName);
            p = (ProtocolHandler) clazz.getConstructor().newInstance();
        } catch (Exception e) {
            log.error(sm.getString(
                    "coyoteConnector.protocolHandlerInstantiationFailed"), e);
        } finally {
            this.protocolHandler = p;
        }

        if (Globals.STRICT_SERVLET_COMPLIANCE) {
            uriCharset = StandardCharsets.ISO_8859_1;
        } else {
            uriCharset = StandardCharsets.UTF_8;
        }
    }
    
    protected String protocolHandlerClassName =
        "org.apache.coyote.http11.Http11NioProtocol";
    
    protected Service service = null;
    
    protected final ProtocolHandler protocolHandler;
    
    protected Adapter adapter = null;
    
    public Service getService() {
        return this.service;
    }
    
    @Override
    protected void initInternal() throws LifecycleException {

        super.initInternal();

        // Initialize adapter
        adapter = new CoyoteAdapter(this);
        protocolHandler.setAdapter(adapter);

        // Make sure parseBodyMethodsSet has a default
        if (null == parseBodyMethodsSet) {
            setParseBodyMethods(getParseBodyMethods());
        }

        if (protocolHandler.isAprRequired() && !AprLifecycleListener.isAprAvailable()) {
            throw new LifecycleException(sm.getString("coyoteConnector.protocolHandlerNoApr",
                    getProtocolHandlerClassName()));
        }
        if (AprLifecycleListener.isAprAvailable() && AprLifecycleListener.getUseOpenSSL() &&
                protocolHandler instanceof AbstractHttp11JsseProtocol) {
            AbstractHttp11JsseProtocol<?> jsseProtocolHandler =
                    (AbstractHttp11JsseProtocol<?>) protocolHandler;
            if (jsseProtocolHandler.isSSLEnabled() &&
                    jsseProtocolHandler.getSslImplementationName() == null) {
                // OpenSSL is compatible with the JSSE configuration, so use it if APR is available
                jsseProtocolHandler.setSslImplementationName(OpenSSLImplementation.class.getName());
            }
        }

        try {
            protocolHandler.init();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerInitializationFailed"), e);
        }
    }


    /**
     * Begin processing requests via this Connector.
     *
     * @exception LifecycleException if a fatal startup error occurs
     */
    @Override
    protected void startInternal() throws LifecycleException {

        // Validate settings before starting
        if (getPort() < 0) {
            throw new LifecycleException(sm.getString(
                    "coyoteConnector.invalidPort", Integer.valueOf(getPort())));
        }

        setState(LifecycleState.STARTING);

        try {
            protocolHandler.start();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerStartFailed"), e);
        }
    }


    /**
     * Terminate processing requests via this Connector.
     *
     * @exception LifecycleException if a fatal shutdown error occurs
     */
    @Override
    protected void stopInternal() throws LifecycleException {

        setState(LifecycleState.STOPPING);

        try {
            protocolHandler.stop();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerStopFailed"), e);
        }
    }


    @Override
    protected void destroyInternal() throws LifecycleException {
        try {
            protocolHandler.destroy();
        } catch (Exception e) {
            throw new LifecycleException(
                    sm.getString("coyoteConnector.protocolHandlerDestroyFailed"), e);
        }

        if (getService() != null) {
            getService().removeConnector(this);
        }

        super.destroyInternal();
    }
    

在 server.xml 未指定的情况下,Connector 新建org.apache.coyote.http11.Http11NioProtocol处理器,并新建org.apache.catalina.connector.CoyoteAdapter适配器。适配器负责和容器沟通,处理器根据协议处理来源 socket 连接。

ProtocolHandler 接口如下所示:

public interface ProtocolHandler {

    public void setAdapter(Adapter adapter);
    public Adapter getAdapter();

    public Executor getExecutor();

    public void init() throws Exception;

    public void start() throws Exception;

    public void pause() throws Exception;

    public void resume() throws Exception;

    public void stop() throws Exception;

    public void destroy() throws Exception;

    public void closeServerSocketGraceful();

    public boolean isAprRequired();

    public boolean isSendfileSupported();

    public void addSslHostConfig(SSLHostConfig sslHostConfig);
    public SSLHostConfig[] findSslHostConfigs();

    public void addUpgradeProtocol(UpgradeProtocol upgradeProtocol);
    public UpgradeProtocol[] findUpgradeProtocols();
}

ProtocolHandler 持有一个 Adapter 对象,用于与容器进行交流。

根据应用层协议和传输层协议不同,如应用层使用 HTTP、AJP 或者 HTTP2,传输层使用 NIO、NIO2 或者 APR 进行通信,ProtocolHandler 分别实现了Http11NioProtocolHttp2NioProtocolAJPNioProtocol等 ProtocolHandler 实例。

ProtocolHandler 的直接实现,AbstractProtocol,通过持有 AbstractEndPoint 和 Handler ,分别处理底层的 socket 业务(socket 监听)和应用层业务(应用协议解析)。AbstractProtocol的部分代码如下:

public abstract class AbstractProtocol<S> implements ProtocolHandler,
        MBeanRegistration {
    
    private final AbstractEndpoint<S> endpoint;
    private Handler<S> handler;
    
    public AbstractProtocol(AbstractEndpoint<S> endpoint) {
        this.endpoint = endpoint;
        setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
        setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
    }
    
    protected Adapter adapter;
    @Override
    public void setAdapter(Adapter adapter) { this.adapter = adapter; }
    @Override
    public Adapter getAdapter() { return adapter; }
    
    protected abstract Processor createProcessor();
    
    @Override
    public void init() throws Exception {
        if (getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.init", getName()));
        }

        if (oname == null) {
            // Component not pre-registered so register it
            oname = createObjectName();
            if (oname != null) {
                Registry.getRegistry(null, null).registerComponent(this, oname, null);
            }
        }

        if (this.domain != null) {
            rgOname = new ObjectName(domain + ":type=GlobalRequestProcessor,name=" + getName());
            Registry.getRegistry(null, null).registerComponent(
                    getHandler().getGlobal(), rgOname, null);
        }

        String endpointName = getName();
        endpoint.setName(endpointName.substring(1, endpointName.length()-1));
        endpoint.setDomain(domain);

        endpoint.init();
    }


    @Override
    public void start() throws Exception {
        if (getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.start", getName()));
        }

        endpoint.start();

        // Start async timeout thread
        asyncTimeout = new AsyncTimeout();
        Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout");
        int priority = endpoint.getThreadPriority();
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            priority = Thread.NORM_PRIORITY;
        }
        timeoutThread.setPriority(priority);
        timeoutThread.setDaemon(true);
        timeoutThread.start();
    }


    @Override
    public void pause() throws Exception {
        if (getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.pause", getName()));
        }

        endpoint.pause();
    }


    @Override
    public void resume() throws Exception {
        if(getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.resume", getName()));
        }

        endpoint.resume();
    }


    @Override
    public void stop() throws Exception {
        if(getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.stop", getName()));
        }

        if (asyncTimeout != null) {
            asyncTimeout.stop();
        }

        endpoint.stop();
    }


    @Override
    public void destroy() throws Exception {
        if(getLog().isInfoEnabled()) {
            getLog().info(sm.getString("abstractProtocolHandler.destroy", getName()));
        }

        try {
            endpoint.destroy();
        } finally {
            if (oname != null) {
                if (mserver == null) {
                    Registry.getRegistry(null, null).unregisterComponent(oname);
                } else {
                    // Possibly registered with a different MBeanServer
                    try {
                        mserver.unregisterMBean(oname);
                    } catch (MBeanRegistrationException | InstanceNotFoundException e) {
                        getLog().info(sm.getString("abstractProtocol.mbeanDeregistrationFailed",
                                oname, mserver));
                    }
                }
            }

            if (rgOname != null) {
                Registry.getRegistry(null, null).unregisterComponent(rgOname);
            }
        }
    }

}

当 AbstractProtocol 启动时,会启动 EndPoint 对 socket 进行操作。具体操作流程如下:

image

Endpoint 并行运行多个线程,每个线程运行一个Abstract.Accept实例。在AbstractEndpoint.Acceptor实例中监听端口通信,只要Endpoint处于运行状态,则始终监听循环。

与Endpoint启动和Acceptor相关的代码如下:

public abstract class AbstractEndpoint<S> {

    //...

    protected Acceptor[] acceptors;
    
    public abstract void bind() throws Exception;
    public abstract void startInternal() throws Exception;
    
    public void init() throws Exception {
        if (bindOnInit) {
            bind();
            bindState = BindState.BOUND_ON_INIT;
        }
        //...
    }
    
    public final void start() throws Exception {
        if (bindState == BindState.UNBOUND) {
            bind();
            bindState = BindState.BOUND_ON_START;
        }
        startInternal();
    }
    
    protected final void startAcceptorThreads() {
        int count = getAcceptorThreadCount();
        acceptors = new Acceptor[count];

        for (int i = 0; i < count; i++) {
            acceptors[i] = createAcceptor();
            String threadName = getName() + "-Acceptor-" + i;
            acceptors[i].setThreadName(threadName);
            Thread t = new Thread(acceptors[i], threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }
    
    protected abstract Acceptor createAcceptor();
    
    public abstract static class Acceptor implements Runnable {
        public enum AcceptorState {
            NEW, RUNNING, PAUSED, ENDED
        }

        protected volatile AcceptorState state = AcceptorState.NEW;
        public final AcceptorState getState() {
            return state;
        }

        private String threadName;
        protected final void setThreadName(final String threadName) {
            this.threadName = threadName;
        }
        protected final String getThreadName() {
            return threadName;
        }
    }
}

public class NioEndpoint extends AbstractJsseEndpoint<NioChannel> {

    //...
    
    private volatile ServerSocketChannel serverSock = null;
    
     @Override
    public void bind() throws Exception {

        if (!getUseInheritedChannel()) {
            serverSock = ServerSocketChannel.open();
            socketProperties.setProperties(serverSock.socket());
            InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
            serverSock.socket().bind(addr,getAcceptCount());
        } else {
            Channel ic = System.inheritedChannel();
            if (ic instanceof ServerSocketChannel) {
                serverSock = (ServerSocketChannel) ic;
            }
            if (serverSock == null) {
                throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited"));
            }
        }
        serverSock.configureBlocking(true); //mimic APR behavior

        // Initialize thread count defaults for acceptor, poller
        if (acceptorThreadCount == 0) {
            // FIXME: Doesn't seem to work that well with multiple accept threads
            acceptorThreadCount = 1;
        }
        if (pollerThreadCount <= 0) {
            //minimum one poller thread
            pollerThreadCount = 1;
        }
        setStopLatch(new CountDownLatch(pollerThreadCount));

        // Initialize SSL if needed
        initialiseSsl();

        selectorPool.open();
    }
    
    @Override
    public void startInternal() throws Exception {

        if (!running) {
            running = true;
            paused = false;

            processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getProcessorCache());
            eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
            nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
                    socketProperties.getBufferPool());

            // Create worker collection
            if ( getExecutor() == null ) {
                createExecutor();
            }

            initializeConnectionLatch();

            // Start poller threads
            pollers = new Poller[getPollerThreadCount()];
            for (int i=0; i<pollers.length; i++) {
                pollers[i] = new Poller();
                Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
            }

            startAcceptorThreads();
        }
    }
    
    protected class Acceptor extends AbstractEndpoint.Acceptor {

        @Override
        public void run() {

            int errorDelay = 0;

            while (running) {

                // Loop if endpoint is paused
                //...
                state = AcceptorState.RUNNING;

                try {
                    //if we have reached max connections, wait
                    countUpOrAwaitConnection();

                    SocketChannel socket = null;
                    try {
                        // Accept the next incoming connection from the server
                        // socket
                        socket = serverSock.accept();
                    } catch (IOException ioe) {
                        // We didn't get a socket
                        countDownConnection();
                        //...必要的处理
                    }
                    // Successful accept, reset the error delay
                    errorDelay = 0;

                    // Configure the socket
                    if (running && !paused) {
                        // setSocketOptions() will hand the socket off to
                        // an appropriate processor if successful
                        if (!setSocketOptions(socket)) {
                            closeSocket(socket);
                        }
                    } else {
                        closeSocket(socket);
                    }
                } catch (Throwable t) {
                    ExceptionUtils.handleThrowable(t);
                    log.error(sm.getString("endpoint.accept.fail"), t);
                }
            }
            state = AcceptorState.ENDED;
        }
        //...
    }

    
}

如上代码,调用函数的顺序是

graph LR
AbstractEndpoint.start-->NioEndpoint.startInternal
NioEndpoint.startInternal-->NioEndpoint.Acceptor.run

在 NioEndPoint 中,bind 函数负责将 serverChanel 绑定在特定端口上,并以阻塞方式运行,详见NioEndpoint.bind()函数。所有的线程都阻塞在 ServerSock.accept() 之中

NioEndpoint.Acceptor.run() 函数中,除了最必要的处理,最重要的就是setSocketOptions(socket)函数,除了设置 socket 信息,还将 socket 发送给对应的 handler 进行处理。

setSocketOptions 代码如下:

rotected boolean setSocketOptions(SocketChannel socket) {
    // Process the connection
    try {
        //disable blocking, APR style, we are gonna be polling it
        socket.configureBlocking(false);
        Socket sock = socket.socket();
        socketProperties.setProperties(sock);

        NioChannel channel = nioChannels.pop();
        if (channel == null) {
        SocketBufferHandler bufhandler = new SocketBufferHandler(
                socketProperties.getAppReadBufSize(),
                socketProperties.getAppWriteBufSize(),
                socketProperties.getDirectBuffer());
            if (isSSLEnabled()) {
                channel = new SecureNioChannel(socket,bufhandler, selectorPool, this);
            } else {
                channel = new NioChannel(socket, bufhandler);
            }
        } else {
            channel.setIOChannel(socket);
            channel.reset();
        }
        getPoller0().register(channel);
    } catch (Throwable t) {
        ExceptionUtils.handleThrowable(t);
        try {
            log.error("",t);
        } catch (Throwable tt) {
            ExceptionUtils.handleThrowable(tt);
        }
        // Tell to close the socket
        return false;
    }
    return true;
}

NioEndpoint 使用 Poller 管理 socket,Poller 是 NioEndpoint 新建时产生的类,一个 NioEndpoint 对象持有一个 Poller 数组,每个 Poller 有自己独立的 Selector 对象,setSocketOptions 函数通过 getPoller0 函数实现对 Poller 对象的轮询选择,并将 socket 注册到 Poller 对象中。

Poller 部分代码如下:

public class Poller implements Runnable {

    private Selector selector;
    private final SynchronizedQueue<PollerEvent> events =
                new SynchronizedQueue<>();

    private volatile boolean close = false;

    public Poller() throws IOException {
        this.selector = Selector.open();
    }

    public Selector getSelector() { return selector;}


    /*
     * 将新连接的 Socket 注册到 poller 中
     */
    public void register(final NioChannel socket) {
        socket.setPoller(this);
        NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
        socket.setSocketWrapper(ka);
        ka.setPoller(this);
        ka.setReadTimeout(getSocketProperties().getSoTimeout());
        ka.setWriteTimeout(getSocketProperties().getSoTimeout());
        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
        ka.setSecure(isSSLEnabled());
        ka.setReadTimeout(getConnectionTimeout());
        ka.setWriteTimeout(getConnectionTimeout());
        PollerEvent r = eventCache.pop();
        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
        if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
        else r.reset(socket,ka,OP_REGISTER);
        addEvent(r);
    }

    @Override
    public void run() {
        // Loop until destroy() is called
        while (true) {

            boolean hasEvents = false;
            try {
                if (!close) {
                    hasEvents = events();
                    if (wakeupCounter.getAndSet(-1) > 0) {
                        //if we are here, means we have other stuff to do
                        //do a non blocking select
                        keyCount = selector.selectNow();
                    } else {
                        keyCount = selector.select(selectorTimeout);
                    }
                    wakeupCounter.set(0);
                }
                //...
            } catch (Throwable x) {
                //...
            }
            //either we timed out or we woke up, //...
            while (iterator != null && iterator.hasNext()) {
                SelectionKey sk = iterator.next();
                NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
                // Attachment may be null if another thread has called
                // cancelledKey()
                if (attachment == null) {
                    iterator.remove();
                } else {
                    iterator.remove();
                    processKey(sk, attachment);
                }
            }//while

            //process timeouts
            timeout(keyCount,hasEvents);
        }//while

        getStopLatch().countDown();
    }

    protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
        try {
            if ( close ) {
                cancelledKey(sk);
            } else if ( sk.isValid() && attachment != null ) {
                if (sk.isReadable() || sk.isWritable() ) {
                    if ( attachment.getSendfileData() != null ) {
                        processSendfile(sk,attachment, false);
                    } else {
                        unreg(sk, attachment, sk.readyOps());
                        boolean closeSocket = false;
                        // Read goes before write
                        if (sk.isReadable()) {
                            if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
                                closeSocket = true;
                            }
                        }
                        if (!closeSocket && sk.isWritable()) {
                            if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
                                closeSocket = true;
                            }
                        }
                        if (closeSocket) {
                            cancelledKey(sk);
                        }
                    }
                }
            } else {
                //invalid key
                cancelledKey(sk);
            }
        } catch ( CancelledKeyException ckx ) {
            //...
        } catch (Throwable t) {
            //...
        }
    }
}

核心代码为 register 和 run。register 函数将 socket 注册到 poller 中,并提供 OP_REGISTER 的事件,run 函数通过事件运行,并对每一个连接执行processKey(sk, attachment)方法。

当 processKey 执行到 isReadable 分支时,即如下代码段:

if (sk.isReadable()) {
    if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
        closeSocket = true;
    }
}

在 processSocket 函数之中,会调用 SocketProcessorBase 类的 doRun 方法,处理,最终 doRun 方法会调用 Endpoint 类的 Handler 方法进行处理。具体流程如下

graph LR
NioEndpoint.Poller.processKey-->SocketProcessorBase.run
SocketProcessorBase.run-->SocketProcessor.doRun
SocketProcessor.doRun-->AbstractEndpoint.Handler

以上是处理 socket 监听的全过程。

实际上,NioEndPoint 调用的是 AbstractProtocol 的Handler实现。因为在 AbstractProtocol 的子类构造函数中,将该 Handler 赋值给了 NioEndpoint。如下代码段:

public AbstractHttp11Protocol(AbstractEndpoint<S> endpoint) {
    super(endpoint);
    setConnectionTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
    ConnectionHandler<S> cHandler = new ConnectionHandler<>(this);
    setHandler(cHandler);
    getEndpoint().setHandler(cHandler);
}

Handler 调用 Processor 的 process 方法,最终调用 Http11Processor 的 service 方法对 socket 请求等进行解码,并组成 Request 和 Response。

service 解码的过程如下:

@Override
public SocketState service(SocketWrapperBase<?> socketWrapper)
    throws IOException {
    //...

    while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null &&
        try {
            //第一阶段
            if (!inputBuffer.parseRequestLine(keptAlive)) {
                if (inputBuffer.getParsingRequestLinePhase() == -1) {
                    return SocketState.UPGRADING;
                } else if (handleIncompleteRequestLineRead()) {
                    break;
                }
            }

            if (endpoint.isPaused()) {
                // 503 - Service unavailable
                //...
            } else {
                keptAlive = true;
                request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
                //第二阶段
                if (!inputBuffer.parseHeaders()) {
                    // We've read part of the request, don't recycle it
                    // instead associate it with the socket
                    openSocket = true;
                    readComplete = false;
                    break;
                }
                if (!disableUploadTimeout) {
                    socketWrapper.setReadTimeout(connectionUploadTimeout);
                }
            }
        } catch (IOException e) {
            //...
            break;
        } catch (Throwable t) {
            //...
            // 400 - Bad Request
            response.setStatus(400);
            setErrorState(ErrorState.CLOSE_CLEAN, t);
            getAdapter().log(request, response, 0);
        }

        // Has an upgrade been requested?
        //...

        //第三阶段
        if (!getErrorState().isError()) {
            // Setting up filters, and parse some request headers
            rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
            try {
                prepareRequest();
            } catch (Throwable t) {
                //...
                // 500 - Internal Server Error
                response.setStatus(500);
                //...
            }
        }

        if (maxKeepAliveRequests == 1) {
            keepAlive = false;
        } else if (maxKeepAliveRequests > 0 &&
                socketWrapper.decrementKeepAlive() <= 0) {
            keepAlive = false;
        }

        //第四阶段
        if (!getErrorState().isError()) {
            try {
                rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
                getAdapter().service(request, response);
                if(keepAlive && !getErrorState().isError() && !isAsync() &&
                        statusDropsConnection(response.getStatus())) {
                    setErrorState(ErrorState.CLOSE_CLEAN, null);
                }
            } catch (InterruptedIOException e) {
            setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e);
            } catch (HeadersTooLargeException e) {
                log.error(sm.getString("http11processor.request.process"), e);
                // The response should not have been committed but check it
                // anyway to be safe
                if (response.isCommitted()) {
                    setErrorState(ErrorState.CLOSE_NOW, e);
                } else {
                    response.reset();
                    response.setStatus(500);
                    setErrorState(ErrorState.CLOSE_CLEAN, e);
                    response.setHeader("Connection", "close"); // TODO: Remove
                }
            } catch (Throwable t) {
                ExceptionUtils.handleThrowable(t);
                log.error(sm.getString("http11processor.request.process"), t);
                // 500 - Internal Server Error
                response.setStatus(500);
                setErrorState(ErrorState.CLOSE_CLEAN, t);
                getAdapter().log(request, response, 0);
            }
        }
    }
}

Processor 对 Socket 的解析分成了三个阶段:
包括解析 http 请求行,解析 http 头,准备 request 信息,第四阶段调用 Adapter 的 service 方法,将解析的数据交由容器处理。

默认实现的 CoyoteAdapter 实现了 Adapter 接口,并将 response 和 request 交由容器处理(invoke)方法,代码片段如下:

@Override
public void service(org.apache.coyote.Request req, org.apache.coyote.Response res)
            throws Exception {
    //...
    try {
        // Parse and set Catalina and configuration specific
        // request parameters
        postParseSuccess = postParseRequest(req, request, res, response);
        if (postParseSuccess) {
            //check valves if we support async
            request.setAsyncSupported(
                    connector.getService().getContainer().getPipeline().isAsyncSupported());
            // Calling the container
            connector.getService().getContainer().getPipeline().getFirst().invoke(
                   request, response);
        }
        //...
    }//...
}

Container


Container 使用责任链的设计模式处理从 Connector 发送过来的请求,责任链用 Pipeline 维持,每一个容器维持一个Pipeline,每一个Pipeline 维持多个 Value 对象来处理连接(调用 value 的 invoke 方法)。

实例如下:

责任链模式

note

  • 每一个 Container 都会给他自己持有的 Pipeline 增加一些默认 Value,以处理一些必要的信息,比如说,Wrapper 容器持有的 默认 Value,StandardWrapperValve,就实现了过滤机制,以满足servlet 编程的 Filter 的使用。

相关文章

网友评论

      本文标题:Tomcat源码分析 -- 6

      本文链接:https://www.haomeiwen.com/subject/xavcpqtx.html