一、NimbusServer.class main方法
运行$JSTORM_HOME/bin/jstorm nimbus即可运行Nimbus启动程序,JStorm会
调用com.alibaba.jstorm.daemon.nimbus包中的NimbusServer.class的main方法。
1. 调用Utils.readStormConfig()方法,从default.yaml、storm.yaml和storm.options中读取JStorm的相关配置,返回(Map)conf,后面的许多操作都要用到conf。
2. 新建DefaultInimbus实例,DefaultInimbus类实现了INimbus接口,实现了allSlotsAvailableForScheduling方法,在该方法中可以得到整个可以供Nimbus调度的Collection1
2
3
4
5
6
7
8
9
10
11
public Collection<WorkerSlot> allSlotsAvailableForScheduling(Collection<SupervisorDetails> existingSupervisors, Topologies topologies,
Set<String> topologiesMissingAssignments) {
// TODO Auto-generated method stub
Collection<WorkerSlot> result = new HashSet<WorkerSlot>();
for (SupervisorDetails detail : existingSupervisors) {
for (Integer port : detail.getAllPorts())
result.add(new WorkerSlot(detail.getId(), port));
}
return result;
}
3. 新建NimbusServer实例instance,并调用instance.launchServer方法。
二、NimbusServer.class lauchServer方法
1. 调用StormConfig.validate_distributed_mode(conf)验证是否为distributed模式。
2. createPid(conf)在storm.local.dir/nimbus中创建nimbus对应pid的文件。
3. 调用initShutdownHook()方法—>NimbusServer.cleanup()。
4. createNimbusData()方法,建立NimbusData对象,NimbusData类包含了Nimbus需要用到的各种数据,包括配置信息conf,与Zookeeper通信的StormZkClusterState,tasksHeartbeat,JStormMetricCache等。在NimbusData的构造器中,会完成对各种域的初始化工作。
5. 调用initFollowerThread方法,新建并启动FollowerRunnable后台服务线程(daemon线程)。
6. 调用ConfigExtension.getNimbusDeamonHttpserverPort(conf)方法,得到NimbusHttpserver的port,初始化Httpserver,并启动Httpserver。
7. 调用initContainerHBThread(conf),如果JStorm运行在Apsara/Yarn集群上则初始化container,若没有则返回null。
8. 循环检测Nimbus是否为leader,若不是则休眠5s,直到Nimbus是leader为止。
9. 调用init(conf)方法。
三、FollowerRunnable.class
1. 该类的构造器中首先得到nimbus的网络地址和端口,并通过StormZkClusterState与Zookeeper通信,调用StormZkClusterState的update_nimbus_slave和update_nimbus_detail方法,向Zookeeper注册host和detail。然后调用tryToBeLeader方法,通过StormZkClusterState的try_to_be_leader方法使Nimbus成为leader。
2. run()方法是在后台周期性循环执行的。在该类的run()方法中,首先从NimbusData中得到StormZkClusterState,然后每隔sleepTime进行周期性休眠,线程唤醒后会循环调用StormZkClusterState的接口,监测Nimbus是否为leader,如果不是则尝试将Nimbus设置为leader,直到成功为止。接下来会调用check()方法,并向Zookeeper更新Nimbus的信息。
3. Zookeeper在整个JStorm集群中有着非常重要的作用。
四、NimbusServer.class的init方法
1. 调用NimbusUtils.cleanupCorruptTopologies(data)方法,清除在Zookeeper中存在,在本地目录却不存在的Topology(Nimbus刚启动时,要先清除掉Zookeeper中无用的Topology)。
2. 调用initTopologyAssign方法,初始化Topology Assign后台服务线程,该daemon线程通过使用DefaultTopologyScheduler,用来进行Topology发布。
3. 调用initTopologyStatus方法,更新Zookeeper中的Topology状态。
4. 调用initCleaner方法,启动一个线程,每隔600s,清理storm.local.dir/supervisor/inbox
目录,该目录是uploading topology的目录。
5. 调用ServiceHandler方法,新建ServiceHandler,该类是一个thrift server callback,是所有命令的入口,包括submitTopology、killTopology等众多命令。
6. 调用initMonitor()方法,初始化监控线程。
7. 调用initThrift()方法,初始化Nimbus的ThriftServer,并启动它,从而完成整个的Nimbus启动流程。