flink源码解析(四)
2024-12-18 10:17:04 # Flink # 源码解析 # Jobmanager启动流程 # 2.WebMonitorEndpoint工厂类创建与启动

Flink源码解析(四)

这里还是接着我们上面的内容

我们前面创建了一个工厂类,里面包裹三个工厂类,后面我们调用了create方法来创建了核心组件并且启动了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* dispatcherResourceManagerComponentFactory -> 内部封装了三个核心工厂
* 步骤九:根据工厂对象创建核心组件并且启动
1、Dispatcher
2、ResourceManager
3、WebMonitorEndpoint
* 进入create方法
*/
clusterComponent = dispatcherResourceManagerComponentFactory
.create(configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);

点进这个create看一下

我们这里一个一个看,先看这个WebMonitorEndpoint

WebMonitorEndpoint


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**

* 申明要创建的核心对象
* webMonitorEndpoint:与web端交互的Rest服务
* resourceManager:资源管理器
* dispatcherRunner:dispatcher执行器
* */
WebMonitorEndpoint<?> webMonitorEndpoint = null;
ResourceManager<?> resourceManager = null;
DispatcherRunner dispatcherRunner = null;
--------------------------------------------------------

/**
* 步骤十:
* restEndpointFactory通过工厂实例创建webMonitorEndpoint
* SessionRestEndpointFactory restEndpointFactory
* DispatcherRestEndpoint webMonitorEndpoint
*
*/
webMonitorEndpoint = restEndpointFactory
.createRestEndpoint(configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");
//启动服务
webMonitorEndpoint.start();

这里我们先申明了我们要创建的对象,然后使用工厂实例来创建这个对象并启动

我们先不进去看,先从启动集群开始一点点看,对于这个创建webMonitorEndpoint和启动这一路做了什么

initializeServices(初始化)

我们返回步骤七的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
initializeServices(configuration, pluginManager);

//jobmanager地址写入配置
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());


-------------------------------
protected void initializeServices(Configuration configuration, PluginManager pluginManager) throws Exception {
LOG.info("Initializing cluster services.");
synchronized(lock) {

/**
* 步骤十四:
* 实例化commonRpcService:基于Akka的RpcService 实现。
* commonRpcService 是一个基于akka的ActorSystem,端口为:6123
* jobmanager.rpc.port 6123 配置JobManager进行RPC通信的端口
1、初始化 ActorSystem
2、启动 Actor
*/
commonRpcService = AkkaRpcServiceUtils
.createRemoteRpcService(configuration, configuration.getString(JobManagerOptions.ADDRESS), getRPCPortRange(configuration),
configuration.getString(JobManagerOptions.BIND_HOST), configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));

先看这个createRemoteRpcService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
*
* 注释: 初始化一个 AkkaRpcServiceBuilder
*/
final AkkaRpcServiceBuilder akkaRpcServiceBuilder = AkkaRpcServiceUtils
.remoteServiceBuilder(configuration, externalAddress, externalPortRange);


-----------------------------------------------
public static AkkaRpcServiceBuilder remoteServiceBuilder(Configuration configuration, @Nullable String externalAddress,
String externalPortRange) {

/*************************************************
*
* 注释: 调用构造器构建一个 AkkaRpcServiceBuilder
* 传入 Address 和 Port, 这个端口,是自动尝试获取可用的端口
*/
return new AkkaRpcServiceBuilder(configuration, LOG, externalAddress, externalPortRange);
}

首先创建初始化了一个Builder

然后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
*
* 注释: 绑定主机名和端口号
*/
if(bindAddress != null) {
akkaRpcServiceBuilder.withBindAddress(bindAddress);
}
bindPort.ifPresent(akkaRpcServiceBuilder::withBindPort);

/**
*
* 注释: 创建和启动 AkkaRpcService
* 进去createAndStart
*/
return akkaRpcServiceBuilder.createAndStart();

绑定了主机号端口号并且创建和启动了这个builder

点进去看一下这个创建和启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/**
* 启动RPC服务
* */
public AkkaRpcService createAndStart() throws Exception {

/**
* 获取线程池并行度配置,获取akka的并行度配置参数
* FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR;
* FORK_JOIN_EXECUTOR_PARALLELISM_MIN;
* FORK_JOIN_EXECUTOR_PARALLELISM_MAX
*
*/
if(actorSystemExecutorConfiguration == null) {
actorSystemExecutorConfiguration = BootstrapTools.ForkJoinExecutorConfiguration.
fromConfiguration(configuration);
}

/**
*
* 如果没有配置外部访问地址
* 创建本地ActorSystem
*/
final ActorSystem actorSystem;
if(externalAddress == null) {
// create local actor system
actorSystem = BootstrapTools
.startLocalActorSystem(configuration, actorSystemName, logger, actorSystemExecutorConfiguration, customConfig);
} else {
// 否则创建一个远程ActorSystem
actorSystem = BootstrapTools.startRemoteActorSystem(configuration, actorSystemName, externalAddress, externalPortRange, bindAddress,
Optional.ofNullable(bindPort), logger, actorSystemExecutorConfiguration, customConfig);
}

/**
*
* 创建一个 AkkaRpcService实例返回
* AkkaRpcService内部会初始化一个SupervisorActor
*/
return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
}


------------------------------------------
public static ActorSystem startLocalActorSystem(Configuration configuration, String actorSystemName, Logger logger,
ActorSystemExecutorConfiguration actorSystemExecutorConfiguration, Config customConfig) throws Exception {

logger.info("Trying to start local actor system");

try {

/**
* 注释: 获取 Akka Config 对象
*/
Config akkaConfig = AkkaUtils
.getAkkaConfig(configuration, scala.Option.empty(), scala.Option.empty(), actorSystemExecutorConfiguration.getAkkaConfig());

if(customConfig != null) {
akkaConfig = customConfig.withFallback(akkaConfig);
}

/**
* 注释: 启动 LocalActorSystem
*/
return startActorSystem(akkaConfig, actorSystemName, logger);
} catch(Throwable t) {
throw new Exception("Could not create actor system", t);
}
}


---------------------------------
private static ActorSystem startActorSystem(Config akkaConfig, String actorSystemName, Logger logger) {
logger.debug("Using akka configuration\n {}", akkaConfig);

/*************************************************
*
* 注释: 创建 Akka ActorSystem
*/
ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);

logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem));
return actorSystem;
}

这里首先就是创建这个ActorSystem

然后下面new AkkaRpcService,我们再看一下这个new AkkaRpcService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {

this.actorSystem = checkNotNull(actorSystem, "actor system");
this.configuration = checkNotNull(configuration, "akka rpc service configuration");

Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
if(actorSystemAddress.host().isDefined()) {
address = actorSystemAddress.host().get();
} else {
address = "";
}

if(actorSystemAddress.port().isDefined()) {
port = (Integer) actorSystemAddress.port().get();
} else {
port = -1;
}

captureAskCallstacks = configuration.captureAskCallStack();
internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);

terminationFuture = new CompletableFuture<>();

stopped = false;

/**
* 启动SupervisorActor
*/
supervisor = startSupervisorActor();
}

前面就是一些校验和获取地址,后面最后是绑定了一个supervisor,我们看一下

1
2
3
4
5
6
7
8
9
10
11
12
private Supervisor startSupervisorActor() {

final ExecutorService terminationFutureExecutor = Executors
.newSingleThreadExecutor(new ExecutorThreadFactory("AkkaRpcService-Supervisor-Termination-Future-Executor"));

final ActorRef actorRef = SupervisorActor.startSupervisorActor(actorSystem, terminationFutureExecutor);

/**
* 创建一个 SupervisorActor
*/
return Supervisor.create(actorRef, terminationFutureExecutor);
}

我们可以发现这里有我们很熟悉的Actor的create这个方式

这里就是我们第一篇文章中写的akka框架中的内容,这里是在刚才创建的actorSystem下创建一个supervisorActor

现在我们再返回去,返回步骤十四看步骤十五

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 步骤十五:
* 初始化一个 ioExecutor,创建IO的线程池
* IO的线程池的数量 :getPoolSize(configuration)
* cluster.io-pool.size * 4 = 4倍 cpu cores
*/
ioExecutor = Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration), new ExecutorThreadFactory("cluster-io"));


------------------------------------------------------------------
public static int getPoolSize(Configuration config) {

//cluster.io-pool.size = 4倍 cpu cores
final int poolSize = config.getInteger(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE, 4 * Hardware.getNumberCPUCores());
Preconditions.checkArgument(poolSize > 0,
String.format("Illegal pool size (%s) of io-executor, please re-configure '%s'.", poolSize,
ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE.key()));
return poolSize;
}

这里是创建一个io的线程池,线程池的数量是4倍的cpu

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 步骤十六:
* HA service 相关的实现,到底使用哪种根据用户的需求来定义
* 比如:处理 ResourceManager 的 leader 选举、JobManager leader 的选举等
* haServices = ZooKeeperHaServices (大多数情况基于zookeeper)
*/
haServices = createHaServices(configuration, ioExecutor);

--------------------------------------------------
protected HighAvailabilityServices createHaServices(Configuration configuration, Executor executor) throws Exception {

/**
* 创建HA服务
* 查看createHighAvailabilityServices
*/
return HighAvailabilityServicesUtils
.createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
}
--------------------------------------------------------
public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor,
AddressResolution addressResolution) throws Exception {

/**
* 获取HA配置方式
* 在fink-conf.yaml 配置文件中,我们会去配置:high-availability = zookeeper
* 查看fromConfig
* */
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);

switch(highAvailabilityMode) {

/**
*
* 根据HA模式来进行启动
*/
case NONE:
final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);

final String resourceManagerRpcUrl = AkkaRpcServiceUtils
.getRpcUrl(hostnamePort.f0, hostnamePort.f1, AkkaRpcServiceUtils.createWildcardName(ResourceManager.RESOURCE_MANAGER_NAME),
addressResolution, configuration);
final String dispatcherRpcUrl = AkkaRpcServiceUtils
.getRpcUrl(hostnamePort.f0, hostnamePort.f1, AkkaRpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME),
addressResolution, configuration);
final String webMonitorAddress = getWebMonitorAddress(configuration, addressResolution);

// 如果不是高可用:StandaloneHaServices
return new StandaloneHaServices(resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
/**
*
* 一般情况Flink所采用的HA模式:ZooKeeper
*/
case ZOOKEEPER:

BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
/**
* 创建 ZooKeeperHaServices服务
* 对象的内部,包装了一个 ZooKeeper 的实例对象
* */
return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService);
case FACTORY_CLASS:
return createCustomHAServices(configuration, executor);
default:
throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported.");
}
}

这里是主要是创建ha服务和初始化一个传输大文件的服务

1
2
3
case NONE:
return new StandaloneHaServices(resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);

这里是获取配置文件中写的模式,如果不是高可用的话就使用standalone模式

接下来如果是Zookeeper模式就使用zk的HA机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException {

// 注释: 如果开启了高可用(需要开启备份服务来支持高可用)
if(HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {

/**
* 注释: 创建 BlobStore
*/
return createFileSystemBlobStore(config);

} else {
return new VoidBlobStore();
}
}

---------------------------------------------------
private static BlobStoreService createFileSystemBlobStore(Configuration configuration) throws IOException {

/*************************************************
*
* 注释: 获取 HA 模式下,存储状态数据的 HDFS 路径
*/
final Path clusterStoragePath = HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration);

final FileSystem fileSystem;
try {

/*************************************************
*
* 注释: 获取文件系统类型,一般是 HDFS,返回值类型是: HadoopFileSystem
* 它是一个 Hadoop FileSystem 的包装对象
*/
fileSystem = clusterStoragePath.getFileSystem();
} catch(Exception e) {
throw new IOException(String.format("Could not create FileSystem for highly available storage path (%s)", clusterStoragePath), e);
}

/*************************************************
*
* 注释: 创建 FileSystemBlobStore 返回
*/
return new FileSystemBlobStore(fileSystem, clusterStoragePath.toUri().toString());
}

这里的主要内容就是创建一个FileSystemBlobStore,也就是初始化一个文件传输机制

因为rpc不适用于传输大文件,就事先创建好这个来管理大文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 步骤十八:
* 初始化心跳服务
*/
heartbeatServices = createHeartbeatServices(configuration);

/**
* 步骤十九: metrics(性能监控) 相关的服务
*/
metricRegistry = createMetricRegistry(configuration, pluginManager);
final RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, commonRpcService.getAddress());
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
final String hostname = RpcUtils.getHostname(commonRpcService);
processMetricGroup = MetricUtils
.instantiateProcessMetricGroup(metricRegistry, hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));

/**
* 步骤二十: ArchivedExecutionGraphStore: 存储执行图ExecutionGraph的可序列化形式,默认是基于文件存储
* 存储executionGraph的服务,默认有两种实现
* 1. MemoryArchivedExecutionGraphStore:基于内存存储
* 2. FileArchivedExecutionGraphStore :基于文件系统存储 defalut
*/
archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());

webMonitorEndpoint(createRestEndpoint)

我们回到步骤十,看一下这个create方法,创建webMonitorEndpoint

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 步骤十:
* restEndpointFactory通过工厂实例创建webMonitorEndpoint
* SessionRestEndpointFactory restEndpointFactory
* DispatcherRestEndpoint webMonitorEndpoint
*
*/
webMonitorEndpoint = restEndpointFactory
.createRestEndpoint(configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler);
log.debug("Starting Dispatcher REST endpoint.");

我们点进SessionRestEndpointFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 步骤二十一:
* createRestEndpoint初始化成员变量
* new DispatcherRestEndpoint
* */
@Override
public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, TransientBlobService transientBlobService,
ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws Exception {

final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);

return new DispatcherRestEndpoint(RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, configuration,
restHandlerConfiguration, resourceManagerGatewayRetriever, transientBlobService, executor, metricFetcher, leaderElectionService,
RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration), fatalErrorHandler);
}

----------------------------------------------------------
public DispatcherRestEndpoint(RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever<DispatcherGateway> leaderRetriever,
Configuration clusterConfiguration, RestHandlerConfiguration restConfiguration,
GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever, TransientBlobService transientBlobService,
ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService,
ExecutionGraphCache executionGraphCache, FatalErrorHandler fatalErrorHandler) throws IOException {
super(endpointConfiguration, leaderRetriever, clusterConfiguration, restConfiguration, resourceManagerRetriever, transientBlobService,
executor, metricFetcher, leaderElectionService, executionGraphCache, fatalErrorHandler);

webSubmissionExtension = WebMonitorExtension.empty();
}
----------------------------------------------------------
//上面的方法集成了WebMonitorEndpoint这个父类,我们看一下这个父类里面有什么方法
public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender, JsonArchivist {

protected final GatewayRetriever<? extends T> leaderRetriever;
protected final Configuration clusterConfiguration;
protected final RestHandlerConfiguration restConfiguration;
private final GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever;
private final TransientBlobService transientBlobService;
protected final ScheduledExecutorService executor;

private final ExecutionGraphCache executionGraphCache;
private final CheckpointStatsCache checkpointStatsCache;

private final MetricFetcher metricFetcher;

private final LeaderElectionService leaderElectionService;

private final FatalErrorHandler fatalErrorHandler;

private boolean hasWebUI = false;

private final Collection<JsonArchivist> archivingHandlers = new ArrayList<>(16);

@Nullable
private ScheduledFuture<?> executionGraphCleanupTask;

我们可以看到WebMonitorEndpoint这个父类里面是很多的handler方法,这里是写了很多的处理器

我们回去工厂类,看一下里面

start

1
2
//启动服务
webMonitorEndpoint.start();

这里是启动服务,我们看一下这个start里面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 步骤二十二:
* 初始化各种Handler
* 查看initializeHandlers代码实现
*/
handlers = initializeHandlers(restAddressFuture);
-----------------------------------------------------
//看一下这里的DispatcherRestEndpoint子类
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {

/**
* 调用父类进行handlers的初始化操作
*/
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture);
--------------------------------------------------
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {

/**
* 注释: Handler 容器初始化
*/
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);

// 注释: WebSubmissionExtension
final Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers = initializeWebSubmissionHandlers(
localAddressFuture);
----------------------------------------------------------------

这里是一个handle的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 选举和删除临时问价
*/
@Override
public void startInternal() throws Exception {
/**
* 1.执行选举的方法
* this = webMonitorEndpoint
* ZooKeeperLeaderElectionService.start()
* 选为Leader:isLeader() 否则:notLeader()
*/
leaderElectionService.start(this);

/**
* 2.开启定时任务,删除ExecutionGraphEntry执行完成后的临时文件和缓存文件
*/
startExecutionGraphCacheCleanupTask();

if(hasWebUI) {
log.info("Web frontend listening at {}.", getRestBaseUrl());
}
}

在这里还有一个方法,看一下这个方法startInternal

1
2
3
4
5
6
7
/**
* 1.执行选举的方法
* this = webMonitorEndpoint
* ZooKeeperLeaderElectionService.start()
* 选为Leader:isLeader() 否则:notLeader()
*/
leaderElectionService.start(this);

这里首先去传入了一个this

这里的this我们需要去这里看一下ZooKeeperLeaderElectionService

看一下这里的start方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
*
* 因为当前这个类是 LeaderLatchListener 的子类,所以当该组件在进行选举如果成功的话
* 则会自动调用 isLeader() 方法,否则调用 notLeader 方法。
* 这是 ZooKeeper 的 API 框架的机制
*/
@Override
public void start(LeaderContender contender) throws Exception {
Preconditions.checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");

LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);

synchronized(lock) {

client.getUnhandledErrorListenable().addListener(this);

// 注释: 这个值到底是什么,根据情况而定
leaderContender = contender;

/**
*
* 注释: Fink 的 选举,和 HBase 一样都是通过 ZooKeeper 的 API 框架 Curator 实现的
* 1、leaderLatch.start(); 事实上就是举行选举
* 2、当选举结束的时候:
* 如果成功了: isLeader()
* 如果失败了: notLeader()
*/
leaderLatch.addListener(this);
leaderLatch.start();

/**
*
* 注释: 注册监听器,如果选举结束之后:
* 1、自己成为 Leader, 则会回调 isLeader() 进行处理
* 2、自己成为 Follower,则会回调 notLeader() 进行处理
*/
cache.getListenable().addListener(this);
cache.start();

client.getConnectionStateListenable().addListener(listener);

running = true;
}
}

这里在选举成功时候会自动调用isLeader方法,否则调用notLeader方法

在选举完成后会走到下面的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 2.开启定时任务,删除ExecutionGraphEntry执行完成后的临时文件和缓存文件
*/
startExecutionGraphCacheCleanupTask();



-----------------------------------------------
private void startExecutionGraphCacheCleanupTask() {
final long cleanupInterval = 2 * restConfiguration.getRefreshInterval();

/**
* 定期的执行清理任务
* 具体执行:executionGraphCache::cleanup
* 删除临时文件和task任务执行的一些缓存文件
*/
executionGraphCleanupTask = executor
.scheduleWithFixedDelay(executionGraphCache::cleanup, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS);
}

这里主要是在选举完成后开启定时任务,定期清理

这就是我们的三个工厂类里面的WebMonitorEndpoint的创建和启动