【Flink】Flink作业调度流程分析
目录
前言
前面介绍了 Spring 容器的概念,其核心可归纳为两个类: BeanFactory 和 ApplicationContext,ApplicationContext 继承自 BeanFactory ,其不仅包含 BeanFactory 所有功能,还扩展了容器功能。之后介绍了在 SSM 时期和 SpringBoot 时期如何启动 ApplicationContext 。在结尾处,我们指出,ApplicationContext 核心其实是 refresh 方法,容器一系列功能都在该方法中实现,如:注册 Bean、注入 Bean 等。
在 refresh 方法中,实现容器核心功能前,先进行了一系列环境准备工作,我们以 SpringBoot 为当前运行环境,深入讨论这部分内容。
注:本篇文章使用的 SpringBoot 版本为 2.0.3.RELEASE,其 Spring 版本为 5.0.7.RELEASE
正文
refresh 方法定义在 ConfigurableApplicationContext 接口中,被 AbstractApplicationContext 抽象类实现,该方法由十几个子方法组成,这些子方法各司其职,但部分子方法被 AbstractApplicationContext 的子类进行扩展,来增强功能。其中,前四个子方法主要进行上下文准备工作。
第一步:prepareRefresh
我们先从 refresh 中的 prepareRefresh 方法开始讨论:
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// 初始化上下文环境,就是记录下容器的启动时间、活动状态等
prepareRefresh();
...
}
}
该方法被继承 AbstractApplicationContext 抽象类的子类进行扩展,扩展该方法的子类有:
因本次演示的环境是 SpringBoot ,前面我们讲过,SpringBoot 会根据当前 Web 应用类型创建不同的上下文对象 ,如 Servlet Web、Reactive Web 等。这里演示的是 Servlet Web 应用,所以创建的上下文对象是 AnnotationConfigServletWebServerApplicationContext 。该类的 prepareRefresh 方法会被执行:
public class AnnotationConfigServletWebServerApplicationContext
extends ServletWebServerApplicationContext implements AnnotationConfigRegistry {
...
@Override
protected void prepareRefresh() {
// 清除 Class 的元数据缓存。底层用 Map 保存元数据,执行 Map 的 clear 方法
this.scanner.clearCache();
// 调用父类,也就是 AbstractApplicationContext 的 prepareRefresh 方法
super.prepareRefresh();
}
...
}
public abstract class AbstractApplicationContext {
...
private long startupDate;
private final AtomicBoolean active = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private Set<ApplicationEvent> earlyApplicationEvents;
...
protected void prepareRefresh() {
// 记录此上下文开始时的系统时间(以毫秒为单位)
this.startupDate = System.currentTimeMillis();
// 记录此上下文是否已关闭,这里设置为未关闭
this.closed.set(false);
// 记录此上下文是否处于活动状态,这里设置为活动状态
this.active.set(true);
if (logger.isInfoEnabled()) {
logger.info("Refreshing " + this);
}
// 这也是交由子类扩展的方法。具体子类为 GenericWebApplicationContext,主要是初始化属性源,
// 将 ServletContext 和 ServletConfig 属性配置添加到 Environment 环境上下文中
initPropertySources();
// 校验 Environment 中那些必备的属性配置是否存在,不存在则抛异常。
getEnvironment().validateRequiredProperties();
// 创建 ApplicationEvent 事件集合
this.earlyApplicationEvents = new LinkedHashSet<>();
}
}
refresh 中的 prepareRefresh 方法执行结束,主要是记录容器的启动时间、活动状态、检查必备属性是否存在。其中,关于 Environment 环境配置,在前面 SpringBoot 系列文章中有详细讨论,感兴趣的同学可自行翻阅。
【自制操作系统03】读取硬盘中的数据
第二步:obtainFreshBeanFactory
接着进入 refresh 中的 obtainFreshBeanFactory 方法
public abstract class AbstractApplicationContext {
...
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
...
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
...
}
}
...
protected ConfigurableListableBeanFactory obtainFreshBeanFactory() {
// 该方法也是由子类扩展,其子类有 AbstractRefreshableApplicationContext 和 GenericApplicationContext,
// 因当前是 Servlet Web 应用,所以执行的是 GenericApplicationContext 中的 refreshBeanFactory 方法。
// 该方法主要设置 BeanFactory 的 serializationId 属性值,也就是序列化id
refreshBeanFactory();
// 通过 getBeanFactory 返回 BeanFactory 对象。同样也是由子类扩展,调用的是 GenericApplicationContext 类中的 getBeanFactory 方法。
// 返回的是 DefaultListableBeanFactory 。
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (logger.isDebugEnabled()) {
logger.debug("Bean factory for " + getDisplayName() + ": " + beanFactory);
}
return beanFactory;
}
...
}
obtainFreshBeanFactory 方法很简单,但如果当前是非 Servlet Web 应用,执行的就是 AbstractRefreshableApplicationContext 中的 refreshBeanFactory 方法,那可就复杂多了,这里就不展开讨论。之后,该方法还返回了 BeanFactory 对象,从这也可以看出 ApplicationContext 底层是以 BeanFactory 为基础,逐步扩展 Spring 容器功能。
第三步:prepareBeanFactory
接着进入 refresh 中的 prepareBeanFactory 方法。prepareBeanFactory 方法主要是对 BeanFactory 做一些配置,包含各种类加载器、需要忽略的依赖以及后置处理器、解析器等,
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
...
prepareBeanFactory(beanFactory);
...
}
...
}
protected void prepareBeanFactory(ConfigurableListableBeanFactory beanFactory) {
// 设置类加载器
beanFactory.setBeanClassLoader(getClassLoader());
// 设置表达式解析器,主要用来解析 EL 表达式; Bean 初始化完成后填充属性时会用到
beanFactory.setBeanExpressionResolver(new StandardBeanExpressionResolver(beanFactory.getBeanClassLoader()));
// 设置属性注册解析器,主要用来解析 Bean 中的各种属性类型,如 String、int 等
beanFactory.addPropertyEditorRegistrar(new ResourceEditorRegistrar(this, getEnvironment()));
// 添加一个后置处理器:ApplicationContextAwareProcessor。
// 该后置处理器用于向实现了 Aware 系列接口的 bean 设置相应属性。
// (后置处理器和 Aware 接口也是比较核心的概念,后面会有文章详细讨论)
beanFactory.addBeanPostProcessor(new ApplicationContextAwareProcessor(this));
// 以下接口,在自动注入时会被忽略,其都是 Aware 系列接口
beanFactory.ignoreDependencyInterface(EnvironmentAware.class);
beanFactory.ignoreDependencyInterface(EmbeddedValueResolverAware.class);
beanFactory.ignoreDependencyInterface(ResourceLoaderAware.class);
beanFactory.ignoreDependencyInterface(ApplicationEventPublisherAware.class);
beanFactory.ignoreDependencyInterface(MessageSourceAware.class);
beanFactory.ignoreDependencyInterface(ApplicationContextAware.class);
// 当以下特殊的 Bean 需自动注入时,指定其注入的类型 。
// 如:注入 BeanFactory 时,注入的类型对象为 ConfigurableListableBeanFactory 。
beanFactory.registerResolvableDependency(BeanFactory.class, beanFactory);
beanFactory.registerResolvableDependency(ResourceLoader.class, this);
beanFactory.registerResolvableDependency(ApplicationEventPublisher.class, this);
beanFactory.registerResolvableDependency(ApplicationContext.class, this);
// 添加 ApplicationListenerDetector 后置处理器。
// 该后置处理器用来检测那些实现了 ApplicationListener 接口的 bean,并将其添加到应用上下文的事件广播器上。
beanFactory.addBeanPostProcessor(new ApplicationListenerDetector(this));
// 判断容器中是否存在 loadTimeWeaver Bean,如果存在则上下文使用临时的 ClassLoader 进行类型匹配。
// 集成 AspectJ 时会用到 loadTimeWeaver 对象。
if (beanFactory.containsBean(LOAD_TIME_WEAVER_BEAN_NAME)) {
beanFactory.addBeanPostProcessor(new LoadTimeWeaverAwareProcessor(beanFactory));
beanFactory.setTempClassLoader(new ContextTypeMatchClassLoader(beanFactory.getBeanClassLoader()));
}
// 注册和环境相关的 Bean,如 environment、systemProperties、systemEnvironment
if (!beanFactory.containsLocalBean(ENVIRONMENT_BEAN_NAME)) {
beanFactory.registerSingleton(ENVIRONMENT_BEAN_NAME, getEnvironment());
}
if (!beanFactory.containsLocalBean(SYSTEM_PROPERTIES_BEAN_NAME)) {
beanFactory.registerSingleton(SYSTEM_PROPERTIES_BEAN_NAME, getEnvironment().getSystemProperties());
}
if (!beanFactory.containsLocalBean(SYSTEM_ENVIRONMENT_BEAN_NAME)) {
beanFactory.registerSingleton(SYSTEM_ENVIRONMENT_BEAN_NAME, getEnvironment().getSystemEnvironment());
}
}
在 prepareBeanFactory 方法中,主要对 BeanFactory 添加了一系列属性项,如添加忽略自动注入的接口、添加 BeanPostProcessor 后置处理器、手动注册部分特殊的 Bean及环境相关的 Bean 。
第四步:postProcessBeanFactory
postProcessBeanFactory 方法是上下文准备的最后一步,主要用来注册 Web 请求相关的处理器、Bean及配置。
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
...
postProcessBeanFactory(beanFactory);
...
}
}
该方法也是由子类进行扩展,实现该方法的子类有:
前面也说过,当前是 Servlet Web 应用,所以创建的 ApplicationContext 上下文是 AnnotationConfigServletWebServerApplicationContext,执行该类的 postProcessBeanFactory 方法。
public class AnnotationConfigServletWebServerApplicationContext
extends ServletWebServerApplicationContext implements AnnotationConfigRegistry {
private final AnnotatedBeanDefinitionReader reader;
private final ClassPathBeanDefinitionScanner scanner;
private final Set<Class<?>> annotatedClasses = new LinkedHashSet<>();
private String[] basePackages;
...
@Override
protected void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
// 先执行父类 ServletWebServerApplicationContext 的 postProcessBeanFactory 方法。
// 跳转到 1 查看父类实现
super.postProcessBeanFactory(beanFactory);
// basePackages 存储的是类路径。先判断是否为 null,不为 null 则通过 ClassPathBeanDefinitionScanner 的 scan 方法
// 扫描该路径下符合条件的 Class,并将 Class 信息包装成 BeanDefinition 注册到容器中,
// 当然,这里没有指定扫描路径,所以不会进入这个 if。
// (BeanDefinition 概念会在后面章节详细讨论)
if (this.basePackages != null && this.basePackages.length > 0) {
this.scanner.scan(this.basePackages);
}
// annotatedClasses 存储的 Class 集合。先判断该集合是否为空,不为空则通过
// AnnotatedBeanDefinitionReader 的 register 方法将 Class 信息包装成 BeanDefinition 注册到容器中,
// 这里同样没有设置 Class 集合内容,所以不会进入这个 if。
if (!this.annotatedClasses.isEmpty()) {
this.reader.register(ClassUtils.toClassArray(this.annotatedClasses));
}
}
}
1、
public class ServletWebServerApplicationContext extends GenericWebApplicationContext
implements ConfigurableWebServerApplicationContext {
...
@Override
protected void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
// 添加 BeanPostProcessor 后置处理器:WebApplicationContextServletContextAwareProcessor,
// 该后置处理器主要是从 ConfigurableWebApplicationContext 上下文中获取 ServletContext 和 ServletConfig 对象
beanFactory.addBeanPostProcessor(new WebApplicationContextServletContextAwareProcessor(this));
// 添加一个 忽略自动注入的接口
beanFactory.ignoreDependencyInterface(ServletContextAware.class);
}
...
}
postProcessBeanFactory 方法执行的操作和前面类似,也是添加了后置处理器和忽略自动注入的接口。
总结
ApplicationContext 上下文准备工作基本结束,主要还是在 BeanFactory 中添加一系列后置处理器、注册特殊的 Bean 及设置忽略自动注入的接口。其中还提到了 Spring 容器的三个核心部分:Aware 系列接口、BeanPostProcessor 后置处理器、BeanDefinition ,这部分在后面的文章会逐步讨论。接下来将对 Spring 容器的核心功能展开讨论。
以上就是本章内容,如果文章中有错误或者需要补充的请及时提出,本人感激不尽。
高通量计算框架HTCondor(五)——分布计算