您当前的位置: 首页 数码 >

环球快播:Tomcat长轮询原理与源码解析

2023-04-16 19:40:44 来源:博客园

系列文章目录和关于我


(相关资料图)

零丶长轮询的引入

最近在看工作使用到的diamond配置中心原理,发现大多数配置中心在推和拉模型上做的选择出奇的一致选择了基于长轮询的拉模型

基于拉模型的客户端轮询的方案客户端通过轮询方式发现服务端的配置变更事件。轮询的频率决定了动态配置获取的实时性。

优点:简单、可靠。缺点:应用增多时,较高的轮询频率给整个配置中心服务带来巨大的压力。

另外,从配置中心的应用场景上来看,是一种写少读多的系统,客户端大多数轮询请求都是没有意义的,因此这种方案不够高效。

基于推模型的客户端长轮询的方案

基于Http长轮询模型,实现了让客户端在没有发生动态配置变更的时候减少轮询。这样减少了无意义的轮询请求量,提高了轮询的效率;也降低了系统负载,提升了整个系统的资源利用率。

一丶何为长轮询

长轮询本质上是原始轮询技术的一种更有效的形式。

它的出现是为了解决:向服务器发送重复请求会浪费资源,因为必须为每个新传入的请求建立连接,必须解析请求的 HTTP 头部,必须执行对新数据的查询,并且必须生成和交付响应(通常不提供新数据)然后必须关闭连接并清除所有资源。

从tomcat服务器的角度就是客户端不停请求,每次都得解析报文封装成Request,Response对象,并且占用线程池中的一个线程。并且每次轮询都要进行tcp握手,挥手,网卡发起中断,操作系统处理中断从内核空间拷贝数据到用户空间,一通忙活服务端返回 配置未修改(配置中心没有修改配置,客户端缓存的配置和配置中心一致,所以是白忙活)

长轮询是一种服务器选择尽可能长的时间保持和客户端连接打开的技术仅在数据变得可用或达到超时阙值后才提供响应而不是在给到客户端的新数据可用之前,让每个客户端多次发起重复的请求

简而言之,就是服务端并不是立马写回响应,而是hold住一段时间,如果这段时间有数据需要写回(例如配置的修改,新配置需要写回)再写回,然后浏览器再发送一个新请求,从而实现及时性,节省网络开销的作用。

二丶使用等待唤醒机制写一个简单的“长轮询”(脱裤子放屁)
package com.cuzzz.springbootlearn.longpull;import org.springframework.beans.factory.InitializingBean;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.servlet.ServletOutputStream;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import java.nio.charset.StandardCharsets;import java.util.concurrent.*;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;@RestController@RequestMapping("long-pull")public class MyController implements InitializingBean {    /**     * 处理任务的线程     */    private ThreadPoolExecutor processExecutor;    /**     * 等待唤醒的锁     */    private static final ReentrantLock lock = new ReentrantLock();    /**     * 当请求获取配置的时候,在此condition上等待一定时间     * 当修改配置的时候通过这个condition 通知其他获取配置的线程     */    private static final Condition condition = lock.newCondition();    @GetMapping    public void get(HttpServletRequest request, HttpServletResponse response) throws ExecutionException, InterruptedException {        //组转成任务        Task task = new Task(request, response,                () -> "拿配置" + System.currentTimeMillis());        //提交到线程池        Future submit = processExecutor.submit(task);        //tomcat线程阻塞于此        submit.get();    }    /**     * 模拟修改配置     *     * 唤醒其他获取配置的线程     */    @PostMapping    public String post(HttpServletRequest request, HttpServletResponse response) {        lock.lock();        try {            condition.signalAll();        }finally {            lock.unlock();        }        return "OK";    }    static class Task implements Runnable {        private HttpServletResponse response;        /**         * 等待时长         */        private final long timeout;        private Callable task;        public Task(HttpServletRequest request, HttpServletResponse response, Callable task) {            this.response = response;            String time = request.getHeader("time-out");            if (time == null){                //默认等待10秒                this.timeout = 10;            }else {                this.timeout = Long.parseLong(time);            }            this.task = task;        }        @Override        public void run() {            lock.lock();            try {                //超市等待                boolean await = condition.await(timeout, TimeUnit.SECONDS);                //超时                if (!await) {                    throw new TimeoutException();                }                //获取配置                T call = task.call();                //写回                ServletOutputStream outputStream = response.getOutputStream();                outputStream.write(("没超时拿当前配置:" + call).getBytes(StandardCharsets.UTF_8));            } catch (TimeoutException | InterruptedException exception) {                //超时或者线程被中断                try {                    ServletOutputStream outputStream = response.getOutputStream();                    T call = task.call();                    outputStream.write(("超时or中断拿配置:" + call).getBytes(StandardCharsets.UTF_8));                } catch (Exception ex) {                    throw new RuntimeException(ex);                }            } catch (Exception e) {                throw new RuntimeException(e);            } finally {                lock.unlock();            }        }    }    @Override    public void afterPropertiesSet() {        int cpuNums = Runtime.getRuntime().availableProcessors();        processExecutor                = new ThreadPoolExecutor(cpuNums, cpuNums * 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());    }}

使用get方法反问的请求回被提交到线程池进行await等待,使用post方法的请求回唤醒这些线程。

但是这个写法有点脱裤子放屁

为什么会出现这种情况,直接提交到线程池异步执行不可以么,加入我们删除上面submit.get方法会发现其实什么结果都不会,这是因为异步提交到线程池后,tomcat已经结束了这次请求,并没有维护这个连接,所以没有办法写回结果。

如果不删除这一行,tomcat线程阻塞住我们可以写回结果,但是其实没有达到配置使用长轮询的初衷——"解放tomcat线程,让配置中心服务端可以处理更多请求"。

所以我们现在陷入一个尴尬的境地,怎么解决昵?看下去

三丶Tomcat Servlet 3.0长轮询原理1.AsyncContext实现长轮询
package com.cuzzz.springbootlearn.longpull;import org.springframework.stereotype.Controller;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import javax.servlet.AsyncContext;import javax.servlet.AsyncEvent;import javax.servlet.AsyncListener;import javax.servlet.ServletResponse;import javax.servlet.http.HttpServletRequest;import javax.servlet.http.HttpServletResponse;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;@RestController@RequestMapping("long-pull3")public class MyController2 {    private static final ScheduledExecutorService procesExecutor            = Executors.newSingleThreadScheduledExecutor();    /**     * 记录配置改变的map     */    private static final ConcurrentHashMap configCache            = new ConcurrentHashMap<>();    /**     * 记录长轮询的任务     */    private static final ConcurrentLinkedDeque interestQueue            = new ConcurrentLinkedDeque<>();    static {        //每2秒看一下释放配置变更,或者任务超时        procesExecutor.scheduleWithFixedDelay(() -> {            ListneedRemove  = new ArrayList<>();            for (AsyncTask asyncTask : interestQueue) {                if (asyncTask.timeout()) {                    asyncTask.run();                    needRemove.add(asyncTask);                    continue;                }                if (configCache.containsKey(asyncTask.configId)) {                    needRemove.add(asyncTask);                    asyncTask.run();                }            }            interestQueue.removeAll(needRemove);        }, 1, 2, TimeUnit.SECONDS);    }    static class AsyncTask implements Runnable {        private final AsyncContext asyncContext;        private final long timeout;        private static long startTime;        private String configId;        AsyncTask(AsyncContext asyncContext) {            this.asyncContext = asyncContext;            HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();            String timeStr = request.getHeader("time-out");            if (timeStr == null) {                timeout = 10;            } else {                timeout = Long.parseLong(timeStr);            }        //关注的配置key,应该getParameter的,无所谓            this.configId = request.getHeader("config-id");            if (this.configId == null) {                this.configId = "default";            }                        //开始时间            startTime = System.currentTimeMillis();        }        //是否超时        public boolean timeout() {            return (System.currentTimeMillis() - startTime) / 1000 > timeout;        }        @Override        public void run() {            String result = "开始于" + System.currentTimeMillis() + "--";            try {                if (timeout()) {                    result = "超时: " + result;                } else {                    result += configCache.get(this.configId);                }                result += "--结束于:" + System.currentTimeMillis();                ServletResponse response = asyncContext.getResponse();                response.getOutputStream().write(result.getBytes(StandardCharsets.UTF_8));                                //后续将交给tomcat线程池处理,将给客户端响应                asyncContext.complete();            } catch (IOException e) {                throw new RuntimeException(e);            }        }    }    @GetMapping    public void get(HttpServletRequest request, HttpServletResponse response) {        //打印处理的tomcate线程id        System.out.println("线程id" + Thread.currentThread().getId());        //添加一个获取配置的异步任务        interestQueue.add(new AsyncTask(asyncContext));        //开启异步        AsyncContext asyncContext = request.startAsync();        asyncContext.setTimeout(0);        //监听器打印最后回调的tomcat线程id        asyncContext.addListener(new AsyncListener() {            @Override            public void onComplete(AsyncEvent event) throws IOException {                System.out.println("线程id" + Thread.currentThread().getId());            }            //...剩余其他方法        });                //立马就会释放tomcat线程池资源        System.out.println("tomcat主线程释放");    }    @PostMapping    public void post(HttpServletRequest request) {        String c = String.valueOf(request.getParameter("config-id"));        if (c.equals("null")){            c = "default";        }        String v = String.valueOf(request.getParameter("value"));        configCache.put(c, v);    }}

上面演示利用AsyncContexttomcat是如何实现长轮询

这种方式的优势在于:解放了tomcat线程,其实tomcat的线程只是运行了get方法中的代码,然后立马可以去其他请求,真正获取配置更改的是我们的单线程定时2秒去轮询。

2.实现原理2.1 tomcat处理一个请求的流程

Connector是客户端连接到Tomcat容器的服务点,它提供协议服务来将引擎与客户端各种协议隔离开来

在Connector组件中创建了Http11NioProtocol组件,Http11NioProtocol默认持有NioEndpoin,NioEndpoint中持有Acceptor和Poller,并且启动的时候会启动一个线程运行Acceptor

Acceptor服务器端监听客户端的连接,会启动线程一直执行

每接收一个客户端连接就轮询一个Poller组件,添加到Poller组件的事件队列中。,每接收一个客户端连接就轮询一个Poller组件,添加到Poller组件的事件队列中。

Poller组件持有多路复用器selector,poller组件不停从自身的事件队列中将事件取出注册到自身的多路复用器上,同时多路复用器会不停的轮询检查是否有通道准备就绪,准备就绪的通道就可以扔给tomcat线程池处理了。

tomcat线程池处理请求

这里会根据协议创建不同的Processor处理,这里创建的是Http11Processor,Http11Processor会使用CoyoteAdapter去解析报文随后交给Container去处理请求

CoyoteAdapter解析报文随后交给Container去处理请求

Container会将Filter和Servlet组装成FilterChain依次调用

FilterChain会依次调用Filter#doFilter,然后调用Servlet#service方法

至此会调用到Servlete#service方法,SpringMVC中的Dispatcher会反射调用我们controller的方法

2.2 AsyncContext 如何实现异步2.2.1 request.startAsync() 修改异步状态机状态为Starting

AsycContext内部持有一个AsyncStateMachine来管理异步请求的状态(有点状态模式的意思)

状态机的初始状态是AsyncState.DISPATCHED,通过setStarted将状态机的状态更新成STARTING

2.2.2 AbstractProtocol启动定时任务处理超时异步请求

Connector启动的时候触发ProtocolHandler的start方法,如下

其中startAsyncTimeout方法会遍历waitingProcessors中每一个Processor的timeoutAsync方法,这里的Processor就是Http11Processor

那么waitProcessors中的Http11Processor是谁塞进去的昵?

tomcat线程在执行完我们的Servlet代码后,Http11NioProtocol会判断请求状态,如果为Long那么会塞到waitProcessors集合中。

如果发现请求超时,那么会调用Http11Processor#doTimeoutAsycn然后由封装的socket通道socketWrapper以TIMEOUT的事件类型重新提交到tomcat线程池中。

2.2.3 AsyncContext#complete触发OPEN_READ事件

可以看到其实和超时一样,只不过超时是由定时任务线程轮询来判断,而AsyncContext#complete则是我们业务线程触发processSocketEvent将后续处理提交到tomcat线程池中。

四丶长轮询的优点和缺点

本文学习了长轮询和tomcat长轮询的原理,可以看到这种方式的优点

浏览器长轮询的过程中,请求并没有理解响应,而是等到超时或者有需要返回的数据(比如配置中心在这个超时事件内发送配置的变更)才返回,解决了短轮询频繁进行请求网络开销的问题,减少了读多写少业务情景下无意义请求。真是通过这种方式,减少了无意义的请求,而且释放了tomcat线程池中的线程,使得我们服务端可以支持更多的客户端(因为业务逻辑是放在其他的线程池执行的,而且对于配置中心来说,可以让多个客户端的长轮询请求由一个线程去处理,原本是一个请求一个tomcat线程处理,从而可以支持更多的请求)

当然这种方式也是有缺点的

hold住请求也是会消耗资源的,如果1w个请求同时到来,我们都需要hold住(封装成任务塞到队列)这写任务也是会占用内存的,而短轮询则会立马返回,从而时间资源的释放

请求先后顺序无法保证,比如轮询第五个客户端的请求的时候,出现了配置的变更,这时候第五个请求会被提交到tomcat线程池中,从而早于前面四个请求得到响应,这对于需要严格有序的业务场景是有影响的

多台实例监听配置中心实例,出现不一致的情况

比如配置中心四台实例监听配置变更,前三台可能响应了得到V1的配置,但是轮询到第四台实例的请求的时候又发生了变更可能就得到了v2的配置,这时候这四台配置不一致了。需要保证这种一致性需要我们采取其他的策略,比如配置中心服务端主动udp推,或者加上版本号保证这四台配置一致。

标签:
x 广告
x 广告

Copyright ©  2015-2022 起点科学网版权所有  备案号:皖ICP备2022009963号-12   联系邮箱: 39 60 29 14 2@qq.com