Flink源码解析(四)
这里还是接着我们上面的内容
我们前面创建了一个工厂类,里面包裹三个工厂类,后面我们调用了create方法来创建了核心组件并且启动了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
clusterComponent = dispatcherResourceManagerComponentFactory .create(configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()), this);
|
点进这个create看一下
我们这里一个一个看,先看这个WebMonitorEndpoint
WebMonitorEndpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
WebMonitorEndpoint<?> webMonitorEndpoint = null; ResourceManager<?> resourceManager = null; DispatcherRunner dispatcherRunner = null; --------------------------------------------------------
webMonitorEndpoint = restEndpointFactory .createRestEndpoint(configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint."); webMonitorEndpoint.start();
|
这里我们先申明了我们要创建的对象,然后使用工厂实例来创建这个对象并启动
我们先不进去看,先从启动集群开始一点点看,对于这个创建webMonitorEndpoint和启动这一路做了什么
initializeServices(初始化)
我们返回步骤七的初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| initializeServices(configuration, pluginManager);
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
------------------------------- protected void initializeServices(Configuration configuration, PluginManager pluginManager) throws Exception { LOG.info("Initializing cluster services."); synchronized(lock) {
commonRpcService = AkkaRpcServiceUtils .createRemoteRpcService(configuration, configuration.getString(JobManagerOptions.ADDRESS), getRPCPortRange(configuration), configuration.getString(JobManagerOptions.BIND_HOST), configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));
|
先看这个createRemoteRpcService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
final AkkaRpcServiceBuilder akkaRpcServiceBuilder = AkkaRpcServiceUtils .remoteServiceBuilder(configuration, externalAddress, externalPortRange); ----------------------------------------------- public static AkkaRpcServiceBuilder remoteServiceBuilder(Configuration configuration, @Nullable String externalAddress, String externalPortRange) {
return new AkkaRpcServiceBuilder(configuration, LOG, externalAddress, externalPortRange); }
|
首先创建初始化了一个Builder
然后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
if(bindAddress != null) { akkaRpcServiceBuilder.withBindAddress(bindAddress); } bindPort.ifPresent(akkaRpcServiceBuilder::withBindPort);
return akkaRpcServiceBuilder.createAndStart();
|
绑定了主机号端口号并且创建和启动了这个builder
点进去看一下这个创建和启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
|
public AkkaRpcService createAndStart() throws Exception {
if(actorSystemExecutorConfiguration == null) { actorSystemExecutorConfiguration = BootstrapTools.ForkJoinExecutorConfiguration. fromConfiguration(configuration); }
final ActorSystem actorSystem; if(externalAddress == null) { actorSystem = BootstrapTools .startLocalActorSystem(configuration, actorSystemName, logger, actorSystemExecutorConfiguration, customConfig); } else { actorSystem = BootstrapTools.startRemoteActorSystem(configuration, actorSystemName, externalAddress, externalPortRange, bindAddress, Optional.ofNullable(bindPort), logger, actorSystemExecutorConfiguration, customConfig); }
return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration)); }
------------------------------------------ public static ActorSystem startLocalActorSystem(Configuration configuration, String actorSystemName, Logger logger, ActorSystemExecutorConfiguration actorSystemExecutorConfiguration, Config customConfig) throws Exception {
logger.info("Trying to start local actor system");
try {
Config akkaConfig = AkkaUtils .getAkkaConfig(configuration, scala.Option.empty(), scala.Option.empty(), actorSystemExecutorConfiguration.getAkkaConfig());
if(customConfig != null) { akkaConfig = customConfig.withFallback(akkaConfig); }
return startActorSystem(akkaConfig, actorSystemName, logger); } catch(Throwable t) { throw new Exception("Could not create actor system", t); } }
--------------------------------- private static ActorSystem startActorSystem(Config akkaConfig, String actorSystemName, Logger logger) { logger.debug("Using akka configuration\n {}", akkaConfig);
ActorSystem actorSystem = AkkaUtils.createActorSystem(actorSystemName, akkaConfig);
logger.info("Actor system started at {}", AkkaUtils.getAddress(actorSystem)); return actorSystem; }
|
这里首先就是创建这个ActorSystem
然后下面new AkkaRpcService,我们再看一下这个new AkkaRpcService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
this.actorSystem = checkNotNull(actorSystem, "actor system"); this.configuration = checkNotNull(configuration, "akka rpc service configuration");
Address actorSystemAddress = AkkaUtils.getAddress(actorSystem); if(actorSystemAddress.host().isDefined()) { address = actorSystemAddress.host().get(); } else { address = ""; }
if(actorSystemAddress.port().isDefined()) { port = (Integer) actorSystemAddress.port().get(); } else { port = -1; }
captureAskCallstacks = configuration.captureAskCallStack(); internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
terminationFuture = new CompletableFuture<>();
stopped = false;
supervisor = startSupervisorActor(); }
|
前面就是一些校验和获取地址,后面最后是绑定了一个supervisor,我们看一下
1 2 3 4 5 6 7 8 9 10 11 12
| private Supervisor startSupervisorActor() {
final ExecutorService terminationFutureExecutor = Executors .newSingleThreadExecutor(new ExecutorThreadFactory("AkkaRpcService-Supervisor-Termination-Future-Executor"));
final ActorRef actorRef = SupervisorActor.startSupervisorActor(actorSystem, terminationFutureExecutor);
return Supervisor.create(actorRef, terminationFutureExecutor); }
|
我们可以发现这里有我们很熟悉的Actor的create这个方式
这里就是我们第一篇文章中写的akka框架中的内容,这里是在刚才创建的actorSystem下创建一个supervisorActor
现在我们再返回去,返回步骤十四看步骤十五
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
ioExecutor = Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration), new ExecutorThreadFactory("cluster-io"));
------------------------------------------------------------------ public static int getPoolSize(Configuration config) {
final int poolSize = config.getInteger(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE, 4 * Hardware.getNumberCPUCores()); Preconditions.checkArgument(poolSize > 0, String.format("Illegal pool size (%s) of io-executor, please re-configure '%s'.", poolSize, ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE.key())); return poolSize; }
|
这里是创建一个io的线程池,线程池的数量是4倍的cpu
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
|
haServices = createHaServices(configuration, ioExecutor);
-------------------------------------------------- protected HighAvailabilityServices createHaServices(Configuration configuration, Executor executor) throws Exception {
return HighAvailabilityServicesUtils .createHighAvailabilityServices(configuration, executor, HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION); } -------------------------------------------------------- public static HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor, AddressResolution addressResolution) throws Exception {
HighAvailabilityMode highAvailabilityMode = HighAvailabilityMode.fromConfig(configuration);
switch(highAvailabilityMode) {
case NONE: final Tuple2<String, Integer> hostnamePort = getJobManagerAddress(configuration);
final String resourceManagerRpcUrl = AkkaRpcServiceUtils .getRpcUrl(hostnamePort.f0, hostnamePort.f1, AkkaRpcServiceUtils.createWildcardName(ResourceManager.RESOURCE_MANAGER_NAME), addressResolution, configuration); final String dispatcherRpcUrl = AkkaRpcServiceUtils .getRpcUrl(hostnamePort.f0, hostnamePort.f1, AkkaRpcServiceUtils.createWildcardName(Dispatcher.DISPATCHER_NAME), addressResolution, configuration); final String webMonitorAddress = getWebMonitorAddress(configuration, addressResolution);
return new StandaloneHaServices(resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
case ZOOKEEPER:
BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration);
return new ZooKeeperHaServices(ZooKeeperUtils.startCuratorFramework(configuration), executor, configuration, blobStoreService); case FACTORY_CLASS: return createCustomHAServices(configuration, executor); default: throw new Exception("Recovery mode " + highAvailabilityMode + " is not supported."); } }
|
这里是主要是创建ha服务和初始化一个传输大文件的服务
1 2 3
| case NONE: return new StandaloneHaServices(resourceManagerRpcUrl, dispatcherRpcUrl, webMonitorAddress);
|
这里是获取配置文件中写的模式,如果不是高可用的话就使用standalone模式
接下来如果是Zookeeper模式就使用zk的HA机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| BlobStoreService blobStoreService = BlobUtils.createBlobStoreFromConfig(configuration); public static BlobStoreService createBlobStoreFromConfig(Configuration config) throws IOException {
if(HighAvailabilityMode.isHighAvailabilityModeActivated(config)) {
return createFileSystemBlobStore(config);
} else { return new VoidBlobStore(); } }
--------------------------------------------------- private static BlobStoreService createFileSystemBlobStore(Configuration configuration) throws IOException {
final Path clusterStoragePath = HighAvailabilityServicesUtils.getClusterHighAvailableStoragePath(configuration);
final FileSystem fileSystem; try {
fileSystem = clusterStoragePath.getFileSystem(); } catch(Exception e) { throw new IOException(String.format("Could not create FileSystem for highly available storage path (%s)", clusterStoragePath), e); }
return new FileSystemBlobStore(fileSystem, clusterStoragePath.toUri().toString()); }
|
这里的主要内容就是创建一个FileSystemBlobStore,也就是初始化一个文件传输机制
因为rpc不适用于传输大文件,就事先创建好这个来管理大文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
|
heartbeatServices = createHeartbeatServices(configuration);
metricRegistry = createMetricRegistry(configuration, pluginManager); final RpcService metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration, commonRpcService.getAddress()); metricRegistry.startQueryService(metricQueryServiceRpcService, null); final String hostname = RpcUtils.getHostname(commonRpcService); processMetricGroup = MetricUtils .instantiateProcessMetricGroup(metricRegistry, hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
archivedExecutionGraphStore = createSerializableExecutionGraphStore(configuration, commonRpcService.getScheduledExecutor());
|
webMonitorEndpoint(createRestEndpoint)
我们回到步骤十,看一下这个create方法,创建webMonitorEndpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
webMonitorEndpoint = restEndpointFactory .createRestEndpoint(configuration, dispatcherGatewayRetriever, resourceManagerGatewayRetriever, blobServer, executor, metricFetcher, highAvailabilityServices.getClusterRestEndpointLeaderElectionService(), fatalErrorHandler); log.debug("Starting Dispatcher REST endpoint.");
|

我们点进SessionRestEndpointFactory
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
|
@Override public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(Configuration configuration, LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever, LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, TransientBlobService transientBlobService, ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration);
return new DispatcherRestEndpoint(RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, configuration, restHandlerConfiguration, resourceManagerGatewayRetriever, transientBlobService, executor, metricFetcher, leaderElectionService, RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration), fatalErrorHandler); }
---------------------------------------------------------- public DispatcherRestEndpoint(RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever<DispatcherGateway> leaderRetriever, Configuration clusterConfiguration, RestHandlerConfiguration restConfiguration, GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever, TransientBlobService transientBlobService, ScheduledExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, ExecutionGraphCache executionGraphCache, FatalErrorHandler fatalErrorHandler) throws IOException { super(endpointConfiguration, leaderRetriever, clusterConfiguration, restConfiguration, resourceManagerRetriever, transientBlobService, executor, metricFetcher, leaderElectionService, executionGraphCache, fatalErrorHandler);
webSubmissionExtension = WebMonitorExtension.empty(); } ---------------------------------------------------------- public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndpoint implements LeaderContender, JsonArchivist {
protected final GatewayRetriever<? extends T> leaderRetriever; protected final Configuration clusterConfiguration; protected final RestHandlerConfiguration restConfiguration; private final GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever; private final TransientBlobService transientBlobService; protected final ScheduledExecutorService executor;
private final ExecutionGraphCache executionGraphCache; private final CheckpointStatsCache checkpointStatsCache;
private final MetricFetcher metricFetcher;
private final LeaderElectionService leaderElectionService;
private final FatalErrorHandler fatalErrorHandler;
private boolean hasWebUI = false;
private final Collection<JsonArchivist> archivingHandlers = new ArrayList<>(16);
@Nullable private ScheduledFuture<?> executionGraphCleanupTask;
|
我们可以看到WebMonitorEndpoint这个父类里面是很多的handler方法,这里是写了很多的处理器
我们回去工厂类,看一下里面
start
1 2
| webMonitorEndpoint.start();
|
这里是启动服务,我们看一下这个start里面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
handlers = initializeHandlers(restAddressFuture); -----------------------------------------------------
@Override protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture); -------------------------------------------------- @Override protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) {
ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(30);
final Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> webSubmissionHandlers = initializeWebSubmissionHandlers( localAddressFuture); ----------------------------------------------------------------
|
这里是一个handle的初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
|
@Override public void startInternal() throws Exception {
leaderElectionService.start(this);
startExecutionGraphCacheCleanupTask();
if(hasWebUI) { log.info("Web frontend listening at {}.", getRestBaseUrl()); } }
|
在这里还有一个方法,看一下这个方法startInternal
1 2 3 4 5 6 7
|
leaderElectionService.start(this);
|
这里首先去传入了一个this

这里的this我们需要去这里看一下ZooKeeperLeaderElectionService
看一下这里的start方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
|
@Override public void start(LeaderContender contender) throws Exception { Preconditions.checkNotNull(contender, "Contender must not be null."); Preconditions.checkState(leaderContender == null, "Contender was already set.");
LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
synchronized(lock) {
client.getUnhandledErrorListenable().addListener(this);
leaderContender = contender;
leaderLatch.addListener(this); leaderLatch.start();
cache.getListenable().addListener(this); cache.start();
client.getConnectionStateListenable().addListener(listener);
running = true; } }
|
这里在选举成功时候会自动调用isLeader方法,否则调用notLeader方法
在选举完成后会走到下面的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
startExecutionGraphCacheCleanupTask();
----------------------------------------------- private void startExecutionGraphCacheCleanupTask() { final long cleanupInterval = 2 * restConfiguration.getRefreshInterval();
executionGraphCleanupTask = executor .scheduleWithFixedDelay(executionGraphCache::cleanup, cleanupInterval, cleanupInterval, TimeUnit.MILLISECONDS); }
|
这里主要是在选举完成后开启定时任务,定期清理
这就是我们的三个工厂类里面的WebMonitorEndpoint的创建和启动