flink源码解析(五)
2024-12-18 10:18:14 # Flink # 源码解析 # Jobmanager启动流程 # 3.resourceManage工厂类创建与启动

Flink源码解析(五)

第二个工厂resourceManager的实现

前面说了WebMonitorEndpoint的实现,下面说一下这个ResourceManager的实现

我们回到ClusterEntrypoint的方法里面,这里从create方法里面点

往下点点到DefaultDispatcherResourceManagerComponentFactory这个方法中,找到步骤十一的resourceManager的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 步骤十一:
* 创建 StandaloneResourceManager 实例对象
* StandaloneResourceManager resourceManager
* StandaloneResourceManagerFactory resourceManagerFactory
*/
resourceManager = resourceManagerFactory
.createResourceManager(configuration, ResourceID.generate(), rpcService, highAvailabilityServices, heartbeatServices,
fatalErrorHandler, new ClusterInformation(hostname, blobServer.getPort()), webMonitorEndpoint.getRestBaseUrl(), metricRegistry,
hostname);

---------------------------------------------------------
public ResourceManager<T> createResourceManager(Configuration configuration, ResourceID resourceId, RpcService rpcService,
HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, MetricRegistry metricRegistry, String hostname) throws Exception {

final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry, hostname);
final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry, hostname);

/**
* 创建 ResourceManagerRuntimeServices实例
*/
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(configuration, rpcService,
highAvailabilityServices, slotManagerMetricGroup);

/**
* 创建 ResourceManager实例
*/
return createResourceManager(configuration, resourceId, rpcService, highAvailabilityServices, heartbeatServices, fatalErrorHandler,
clusterInformation, webInterfaceUrl, resourceManagerMetricGroup, resourceManagerRuntimeServices);
}

我们看到这里最后return的createResourceManager点进去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
protected ResourceManager<ResourceID> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
ResourceManagerRuntimeServices resourceManagerRuntimeServices) {

final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.getStandaloneClusterStartupPeriodTime(configuration);

/**
* 得到一个 StandaloneResourceManager 实例对象
* 通过查询StandaloneResourceManager体系结构:
* 从RpcEndPoint集成而来,启动成功后会调用 onStart
*/
return new StandaloneResourceManager(
rpcService,
resourceId,
highAvailabilityServices,
heartbeatServices,
resourceManagerRuntimeServices.getSlotManager(),
ResourceManagerPartitionTrackerImpl::new,
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
resourceManagerMetricGroup,
standaloneClusterStartupPeriodTime,
AkkaUtils.getTimeoutAsTime(configuration));
}

这里是return了一个StandaloneResourceManager对象,我们可以看到这里是初始化创建了包括rpcservice在内的一堆服务,然后我们来看一下这个方法的体系结构

这里介绍一下如何在idea里面看一个方法的体系结构

首先我们点击一个方法,然后右键,右键后会有一个

我们点击了这个的话会出现下面的内容,这个就是这个方法的体系结构了

这里绿色的接口,蓝色的是类

我们这里主要看蓝色的部分,类之间的继承关系

我们可以看到这个关系中,我们从StandaloneResourceManager一直到RpcEndpoint里面这里是有一个onstart的钩子函数的,我们从最下面的StandaloneResourceManager开始看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
public StandaloneResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup, Time startupPeriodTime, Time rpcTimeout) {

/**
* 调用该父类方法
*/
super(rpcService, resourceId, highAvailabilityServices, heartbeatServices, slotManager, clusterPartitionTrackerFactory, jobLeaderIdService,
clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, rpcTimeout);

// 注释:
this.startupPeriodTime = Preconditions.checkNotNull(startupPeriodTime);

//todo 这里有surper,说明这里的一部分是父类一部分是子类,我们点进父类看一下



---------------------------------------------------------------
private CompletableFuture<Void> clearStateFuture = CompletableFuture.completedFuture(null);

public ResourceManager(RpcService rpcService, ResourceID resourceId, HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices, SlotManager slotManager, ResourceManagerPartitionTrackerFactory clusterPartitionTrackerFactory,
JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler,
ResourceManagerMetricGroup resourceManagerMetricGroup, Time rpcTimeout) {
/***
* 注释: 当执行完毕这个构造方法的时候,会触发调用 onStart() 方法执行
*/
super(rpcService, AkkaRpcServiceUtils.createRandomName(RESOURCE_MANAGER_NAME), null);

this.resourceId = checkNotNull(resourceId);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.slotManager = checkNotNull(slotManager);
this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
this.clusterInformation = checkNotNull(clusterInformation);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.resourceManagerMetricGroup = checkNotNull(resourceManagerMetricGroup);

this.jobManagerRegistrations = new HashMap<>(4);
this.jmResourceIdRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
this.taskExecutorGatewayFutures = new HashMap<>(8);

this.jobManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();
this.taskManagerHeartbeatManager = NoOpHeartbeatManager.getInstance();

this.clusterPartitionTracker = checkNotNull(clusterPartitionTrackerFactory).get(
(taskExecutorResourceId, dataSetIds) -> taskExecutors.get(taskExecutorResourceId).getTaskExecutorGateway()
.releaseClusterPartitions(dataSetIds, rpcTimeout).exceptionally(throwable -> {
log.debug("Request for release of cluster partitions belonging to data sets {} was not successful.", dataSetIds, throwable);
throw new CompletionException(throwable);
}));
}




//todo 我们可以看到这里还是有一部分是父类一部分是子类
-----------------------------------------------------------------
protected FencedRpcEndpoint(RpcService rpcService, String endpointId, @Nullable F fencingToken) {

/**
* 注释:初始化rpcService, endpointId
*/
super(rpcService, endpointId);

Preconditions.checkArgument(rpcServer instanceof FencedMainThreadExecutable, "The rpcServer must be of type %s.",
FencedMainThreadExecutable.class.getSimpleName());

// no fencing token == no leadership
this.fencingToken = fencingToken;
this.unfencedMainThreadExecutor = new UnfencedMainThreadExecutor((FencedMainThreadExecutable) rpcServer);
this.fencedMainThreadExecutor = new MainThreadExecutor(getRpcService().fenceRpcServer(rpcServer, fencingToken),
this::validateRunsInMainThread);
}


//todo 这里还是一部分是父类一部分是子类,再往下点
-------------------------------------------------------------------
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
/**
* 启动ResourceManager的RpcServer服务
* 和taskmanagaer进行心跳通信进行资源管理
* 进入startServer,返回rpcServer,里面封装了actor
*/
this.rpcServer = rpcService.startServer(this);

/**
* 注释: 线程池的初始化
*/
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}


我们点进去这里的startserver方法看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Override
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
checkNotNull(rpcEndpoint, "rpc endpoint");

final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint);
//代理
final ActorRef actorRef = actorRegistration.getActorRef();

final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture();

LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());

//获取地址
final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
//获取主机名
final String hostname;
Option<String> host = actorRef.path().address().host();
if(host.isEmpty()) {
hostname = "localhost";
} else {
hostname = host.get();
}

Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));

implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(AkkaBasedEndpoint.class);

final InvocationHandler akkaInvocationHandler;

/**
* 通过代理的方式来获取一个 ActorRef 对象
*/
if(rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
akkaInvocationHandler = new FencedAkkaInvocationHandler<>(akkaAddress, hostname, actorRef, configuration.getTimeout(),
configuration.getMaximumFramesize(), actorTerminationFuture, ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
captureAskCallstacks);

implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {

/**
* InvocationHandler
*/
akkaInvocationHandler = new AkkaInvocationHandler(akkaAddress, hostname, actorRef, configuration.getTimeout(),
configuration.getMaximumFramesize(), actorTerminationFuture, captureAskCallstacks);
}

// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();

/**
* 动态代理
* 获取RpcServer对象, 启动 RpcServer
*/
@SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy
.newProxyInstance(classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler);
return server;
}

这里首先是去获取一个代理actorRef

然后去获取这个代理的地址和主机名

然后根据这个代理的地址和主机名等信息再初始化一个akkaInvocationHandler

然后再用这个akkaInvocationHandler再去初始化newProxyInstance一个Rpcserver

现在这个rpcserver里面就包括actorRef了,然后再返回这个rpcserver

1
2
3
4
5
6
/**
* 动态代理
* 获取RpcServer对象, 启动 RpcServer
*/
@SuppressWarnings("unchecked") RpcServer server = (RpcServer) Proxy
.newProxyInstance(classLoader, implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]), akkaInvocationHandler);

上面这个这个内容是调用了java的动态代理,newProxyInstance

这里的逻辑还是需要再看一下,对于这个java的动态代理实在没有怎么了解过

我们继续回退StandaloneResourceManager,我们发现到这里这个方法就结束了

我们看到在ResourceManager这个方法里面这里是当这个构造方法结束的时候会调用onstart方法

我们去Rpcendpoint里面我们会发现这个onstart没有做任何事情

我们看一下他的子类里面发现也没有onstart

在看到ResourceManager方法里面才有这个onstart方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* ResourceManager的实例化完成:执行onStart 方法
*/
@Override
public void onStart() throws Exception {
try {
/**
* 开启ResourceManagerService服务
*/
startResourceManagerServices();

} catch(Exception e) {
final ResourceManagerException exception = new ResourceManagerException(
String.format("Could not start the ResourceManager %s", getAddress()), e);
onFatalError(exception);
throw exception;
}
}

---------------------------------------------------

private void startResourceManagerServices() throws Exception {
try {
/**
* leaderElectionService = ZooKeeperLeaderElectionService
*/
leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();

//Standalone 模式下,什么也没做
initialize();

/**
* this = ResourceManager
* 执行选举,成功之后调用 leaderElectionService.isLeader()
* 进入start
* */
leaderElectionService.start(this);
jobLeaderIdService.start(new JobLeaderIdActionsImpl());

/**
* 注册对TaskExecutor的监控
* */
registerTaskExecutorMetrics();
} catch(Exception e) {
handleStartResourceManagerServicesException(e);
}
}

我们点入这个initialize发现他在standalone模式下什么也没做

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* this = ResourceManager
* 执行选举,成功之后调用 leaderElectionService.isLeader()
* 进入start
* */
leaderElectionService.start(this);
jobLeaderIdService.start(new JobLeaderIdActionsImpl());


-----------------------------------------------------------
//点进StandaloneLeaderElectionService这个里面来看一下这个start
@Override
public void start(LeaderContender newContender) throws Exception {
if (contender != null) {
// Service was already started
throw new IllegalArgumentException("Leader election service cannot be started multiple times.");
}

contender = Preconditions.checkNotNull(newContender);

// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}

看到grantLeadership这个方法进入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
/**
* 异步提交:tryAcceptLeadership,等待返回结果,属于异步编程
* 进入 tryAcceptLeadership:在确认提交之前的异步操作
*/
final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture
.thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor());
// 注释: 调用 confirmLeadership
final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.thenAcceptAsync((acceptLeadership) -> {
if(acceptLeadership) {
// 等待上面的任务处理完成,然后确认。
leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress());
}
}, getRpcService().getExecutor());

// 注释: 调用 whenComplete
confirmationFuture.whenComplete((Void ignored, Throwable throwable) -> {
if(throwable != null) {
onFatalError(ExceptionUtils.stripCompletionException(throwable));
}
});
}


-----------------------------------------------------------

/**
* 确认Leader之前要做的事情
* */
private CompletableFuture<Boolean> tryAcceptLeadership(final UUID newLeaderSessionID) {

// 判断集群是否存在LeaderResourceManager
if(leaderElectionService.hasLeadership(newLeaderSessionID)) {

//生成一个ResourceManagerID
final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID);

log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId);

// 如果之前已成为过 Leader 的话,则清理之前的状态
if(getFencingToken() != null) {
clearStateInternal();
}
setFencingToken(newResourceManagerId);

/**
* 启动心跳服务
* 启动两个定时任务
* 启动 SlotManager 服务
* 启动两个定时任务
*/
startServicesOnLeadership();

return prepareLeadershipAsync().thenApply(ignored -> true);
} else {
return CompletableFuture.completedFuture(false);
}
}

----------------------------------------------
protected void startServicesOnLeadership() {
/**
* 开启心跳服务
*/
startHeartbeatServices();

/**
* 启动 SlotManagerImpl
* 这个里面开启了两个定时任务
*/
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());
}


这里面前面是一个异步编程,这个可以看一下,这里不太了解了,后面是一个startServiceonLeadership,我们看一下是开启了什么

其实这里面有两个方法,一个是开启了心跳服务一个是开启了两个定时任务,我们先点进第一个看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 当前 ResourceManager 启动了两个心跳服务:
* 1、taskManagerHeartbeatManager 监控taskManager
* 2、jobManagerHeartbeatManager 监控jobManager
* jobManager和taskManager向 ResourceManager 去汇报
*/
private void startHeartbeatServices() {
/**
* 监听TaskManager 的心跳
*/
taskManagerHeartbeatManager = heartbeatServices
.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log);

/**
* 监听JobManager 的心跳
*/
jobManagerHeartbeatManager = heartbeatServices
.createHeartbeatManagerSender(resourceId, new JobManagerHeartbeatListener(), getMainThreadExecutor(), log);
}




------------------------------------------------------------------
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(ResourceID resourceId, HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor, Logger log) {

/**
* 注释: 创建心跳对象实例 HeartbeatManagerSenderImpl
*/
return new HeartbeatManagerSenderImpl<>(heartbeatInterval, heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
}


-----------------------------------------------------------------

这里我们点进去这个HeartbeatManagerSenderImpl来看一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
HeartbeatManagerSenderImpl(long heartbeatPeriod, long heartbeatTimeout, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor, Logger log) {

this(heartbeatPeriod, heartbeatTimeout, ownResourceID, heartbeatListener, mainThreadExecutor, log, new HeartbeatMonitorImpl.Factory<>());
}

HeartbeatManagerSenderImpl(long heartbeatPeriod, long heartbeatTimeout, ResourceID ownResourceID, HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor, Logger log, HeartbeatMonitor.Factory<O> heartbeatMonitorFactory) {
super(heartbeatTimeout, ownResourceID, heartbeatListener, mainThreadExecutor, log, heartbeatMonitorFactory);
this.heartbeatPeriod = heartbeatPeriod;

/**
* 启动心跳检测的定时任务
* 实际执行的是this的run()方法
*/
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}

@Override
public void run() {

/**
* 在 Flink 的心跳机制中,跟其他的集群不一样
1、ResourceManager 发送心跳给从节点Taskmanager
2、从节点接收到心跳之后,返回响应
*/
//实现循环执行
if(!stopped) {

/**
* 注释: 遍历每一个 TaskExecutor 出来,然后发送 心跳请求
* 每一次 TaskEXecutor 来 RM 注册的时候, 在注册成功之后,就会给这个 TaskEXecutor 生成一个
* Target, 最终,这个 Target 被封装在 Monitor,
* 每个 TaskExecutor 对应的一个唯一的 Monitor 就被保存在 heartbeatTargets map 中
* RM 在启动的时候,就已经启动了: TaskManagerHeartbeamManager
* 这个组件的内部: 启动了一个 HearBeatManagerSenderImpl 对象。
* 最终的效果;
* RM 启动好了之后,每隔10s 钟,向所有的已注册的 TaskExecutor 发送心跳请求
* 如果发现某一个 TaskExecutor 的上一次心跳时间,现在超过 50s
* 则认为该 TaskExecutor 宕机了。 RM 要执行针对这个 TaskExecutor 的注销
*/
log.debug("Trigger heartbeat request.");
for(HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {

//ResourceManager 给 目标发送(TaskManager 或者 JobManager) 心跳
requestHeartbeat(heartbeatMonitor);
}

/**
* 注释: 实现循环发送心跳的效果
* 1、心跳时间:10s
* 2、心跳超时时间:50s
*/
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}

这里我们调用HeartbeatManagerSenderImpl这个方法其实是调用了下面的run方法,因为有这个命令mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);这个方法接受一个 Runnable 对象作为任务,即 this(当前对象)。在 Java 中,实现了 Runnable 接口的类会将 run() 方法作为任务的执行入口。调用 schedule() 时传入的延迟时间为 0L 毫秒,意味着任务会被立即调度并执行。也就是说,run() 方法会在构造函数执行完毕后立即运行。

那么我们再看一下这个run方法吧,先看一下里面的requestHeartbeat

1
2
3
4
5
6
7
8
9
10
11
12
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {

// 根据 heartbeatMonitor 获取 heartbeatTarget
O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
//获取需要监控的目标
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();

/**
* 向目标发送心跳
*/
heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
}

这个的内容是首先getHeartbeatTarget需要监控心跳的目标,然后每10s发送一次心跳

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 发送心跳的逻辑
* */
@Override
public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
if(!stopped) {
log.debug("Received heartbeat request from {}.", requestOrigin);
/**
* 汇报心跳
*/
final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);

if(heartbeatTarget != null) {
if(heartbeatPayload != null) {
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
}
}
}

这里最后listener收到了这个回应heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);

我们再回到我们的ResourceManager的代码里面看下一段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 启动 SlotManagerImpl
* 这个里面开启了两个定时任务
*/
slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl());



-----------------------------------------------------
@Override
public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) {
LOG.info("Starting the SlotManager.");

this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions = Preconditions.checkNotNull(newResourceActions);

started = true;

/**
* 第一个定时任务:
* checkTaskManagerTimeouts, 检查 TaskManager 的心跳
* taskManagerTimeout = resourcemanager.taskmanager-timeout = 30000
* 每隔30M检查是否有TaskManager节点宕机
*/
taskManagerTimeoutCheck = scheduledExecutor
.scheduleWithFixedDelay(() -> mainThreadExecutor.execute(() -> checkTaskManagerTimeouts()), 0L, taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);

/**
* 第二个定时任务: checkSlotRequestTimeouts 请求solt超时5分钟
* slotRequestTimeout = slot.request.timeout = 5L * 60L * 1000L
*/
slotRequestTimeoutCheck = scheduledExecutor
.scheduleWithFixedDelay(() -> mainThreadExecutor.execute(() -> checkSlotRequestTimeouts()), 0L, slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);

/**
* 监控taskSlotsAvailable
* 监控taskSlotsTotal
* */
registerSlotManagerMetrics();
}



---------------------------------------------------------

这里的第一个定时任务是检查TaskManager的心跳,看看他们挂没挂

第二个定时任务是要在申请资源的时候咔看看五分钟之内有没有回应

1
2
3
4
5
6
7
8
9
10
11
/**
* 监控taskSlotsAvailable
* 监控taskSlotsTotal
* */
registerSlotManagerMetrics();

------------------------------------------
private void registerSlotManagerMetrics() {
slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_AVAILABLE, () -> (long) getNumberFreeSlots());
slotManagerMetricGroup.gauge(MetricNames.TASK_SLOTS_TOTAL, () -> (long) getNumberRegisteredSlots());
}

这个最后面还有两个监控,这两个监控是监控了

一个是taskSlot使用了多少

一个是taskSlot剩余了多少

好了,这里的resourceManager暂时差不多了,我们回到clusterentrypoint

在DefaultDispatcherResourceManagerComponentFactory这个代码里面有一个

1
2
3
4
5
6
7
8
9
10
11
12
13
resourceManager.start();



-------------------------------------------------------
@Override
public void start() {
/**
* 启动成功,给自己发送一条消息
* */
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
}

这里就是再启动一下,启动成功后是给自己发一条消息