一、Supervisor.class main方法
JStorm源码版本:2.1.0
在Linux中输入$JSTORM_HOME/bin/jstorm supervisor
命令,
即可运行supervisor启动程序,系统会调用com.alibaba.jstorm.daemon.supervisor包中Supervisor.class的main方法。在main方法中,首先构造Supervisor实例,并调用该实例的run方法。1
2
3
4
5public static void main(String[] args) {
JStormServerUtils.startTaobaoJvmMonitor();
Supervisor instance = new Supervisor();
instance.run();
}
二、Supervisor.class run方法
1. 调用Utils.readStormConfig()方法,从default.yaml、storm.yaml和storm.options中读取JStorm的相关配置,返回(Map)conf,后面的许多操作都要用到conf。
2. StormConfig.distributed_mode(conf)验证JStorm模式是否为分布式模式,否则报出错异常。本文主要围绕distributed模式,暂不讨论local模式。
3. createPid(conf)在storm.local.dir/supervisor中创建supervisor对应pid的文件。
4. 调用mkSupervisor(conf,null)方法,创建并启动一个supervisor。
5. initShutdownHook(supervisorManager),注册一个JVM ShutDown Hook方法。
6. 循环检测supervisorManager.shutdown方法是否被调用,若没有则阻塞1s并继续循环,否则打印“Shutdown supervisor”。
三、Supervisor.class mkSupervisor方法
1. 删除storm.local.dir/supervisor/tmp目录下的文件。
2. 创建与Zookeeper通信/操作的实例StormZkClusterState,在JStorm中,集群与Zookeeper通信都是通过StormZkClusterState类来实现的,Supervisor需要从Zookeeper获得分配到该节点的topology代码以及对应的tasks,同时把节点上的worker状态发送给Zookeeper,所以需要一个与
Zookeeper通信的实例。
3. 创建数据库LocalState实例,LocalState以Key-Value形式将相应信息读写到本地磁盘保存,Supervisor会把supervisorID等配置信息以及节点中运行的task也会把自身状态周期性的写入LocalState,LocalState会把这些信息放入本地磁盘保存。
4. 创建一个异步循环线程集合Vector
5. 创建一个Heartbeat线程,并将该线程放入threads中,每隔几秒,调用Heartbeat的update方法,将SupervisorInfo信息,同步至Zookeeper。SupervisorInfo包括supervisor的hostname,supervisorid, 同步时间,与上次同步的间隔时间,workerPorts的Set,以及availableWorkerPorts的Set。通过将SupervisorInfo信息发送至Zookeeper,Nimbus可以实时监控supervisor状态以及worker的数量。
6. 创建管理worker的SyncProcessEvent线程,在该线程中,Supervisor实现启动新worker,关闭无用worker的功能。
7. 创建SyncSupervisorEvent线程,该线程每隔几秒运行一次,从Zookeeper读取Supervisor的相关信息,包括topolgoy的代码、tasks等,并调用SyncProcessEvent的run方法,来启动worker。
8. 启动Httpserver。
四、Assignment类
Assignment是Supervisor从Zookeeper得到的Topology的相关信息,包括
Topology对应Supervisors的主机名、workers以及task的启动时间等信息。
五、LocalAssignment类
LocalAssignment是Supervisor在本地存储的Topology信息,包括
topologyId、topologyName、taskIds(set)、mem、cpu、jvm等。LocalAssignment是Supervisor从Zookeeper读取Assignment并转换得到的。Supervisor周期性的从Zookeeper读取Assignment,得到Nimbus分配到该节点的Topology信息,并与节点本地存储的LocalAssignment对比,若有不同则对worker进行管理调度,启动新的worker,关闭无用的worker,使节点与Zookeeper中存储的信息相匹配。
六、SyncSupervisorEvent.class run方法
该方法是Supervisor中非常重要的方法,完成整个Topology的同步工作,根据
Nimbus的调度,实现对节点中Topology的管理:
1. 从Zookeeper中读取topologyid对应的信息Map<String, Assignment> assignments。
2. 从storm.local.dir/supervisor/stormdist/本地目录中读取topologyid列表。
3. 从Zookeeper中获取zkAssignment,从localstate中获取localAssignment。
4. 对比zkAssignment和localAssignment,获取需要更新和重新下载的topologyid。
5. 从Zookeeper中获取对应topology在Nimbus存放jar包的目录,并调用Nimbus Client,从Nimbus的storm-dir/storm-dist/topologyid/storm.jar中下载代码,将jar包下载到本地目录。该部分通过层层方法调用,最终使用downloadFromMaster方法来实现的:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public static void downloadFromMaster(Map conf, String file, String localFile) throws IOException, TException {
WritableByteChannel out = null;
NimbusClient client = null;
try {
client = NimbusClient.getConfiguredClient(conf, 10 * 1000);
String id = client.getClient().beginFileDownload(file);
out = Channels.newChannel(new FileOutputStream(localFile));
while (true) {
ByteBuffer chunk = client.getClient().downloadChunk(id);
int written = out.write(chunk);
if (written == 0) {
client.getClient().finishFileDownload(id);
break;
}
}
} finally {
if (out != null)
out.close();
if (client != null)
client.close();
}
}
其中的beginFileDownload、downloadChunk方法使用Thrift与Nimbus通信。
6. 运行syncProcessesEvent。
7. 触发heartbeat更新,启动心跳包发送线程,将supervisorInfo发送中Zookeeper。
七、SyncProcessEvent.class run方法
该方法在SyncSupervisorEvent被调用,通过该方法,Supervisor实现对节点
中worker的管理,启动新的worker,关闭无用的worker:
1. 从LocalAssignment得到已经发布的tasks。
2. 根据localstate和localassignment得到每个worker的state,即节点中所有worker的状态,worker总数量是通过storm.yaml文件进行配置的,与ports对应。
3. 根据每个work的heartbeat来判断,关闭无用的workers。
4. checkNewWorkers(conf);检查新的workers。
5. checkNeedUpdateTopologys(localWorkerStats, localAssignments);检查需要更新的topology
6. 在startNewWorkers方法根据cluster_mode分为local和distributed,调用不同的lauchWorkers方法,启动新的workers。
八、SyncProcessEvent.class lauchWorker(distributed)方法
在SyncSupervisorEvent已经将每个topology的jar包从Zookeeper下载到了本
地storm.local.dir/supervisor/stormdist/topologyId目录下,因此通过StringBuilder构造一个命令行,其中包括jstrom.log.dir、jstorm.home、java.library.path等配置以及com.alibaba.jstorm.daemon.worker.Worker、topologyId、portId等信息,然后调用JStormUtils.launch_process(cmd, environment, true)
方法,即可调用com.alibaba.jstorm.daemon.worker.Worker类中的main方法。
九、Worker.class execute方法
在main方法中会创建Worker实例,并调用execute方法:
1. execute方法从Assignement得到topologyid-supervisorid-port对应的tasks,并create Thread(task)。 Worker每隔几秒将自身状态放入localstate KV database中。Supervisor调度worker时可以从localstate中得到worker状态。
2. worker是通过WorkerData来得到需要执行的tasks,而WorkerData中包含topologyId、supervisorId、port、worker、taskids(set)等信息。
十、总结
回顾整个Supervisor启动流程,Supervisor的作用可以总结为以下几点:
1. 周期性调用SyncSupervisorEvent,从Zookeeper得到相应的Topolgoy的信息Assignment,并将topology信息存入本地,同时将topology的jar包下载到本地,使节点运行的topology与Nimbus分配的topology保持同步。
2. 周期性调用SyncProcessEvent,管理Worker;
3. 周期性向Zookeeper发现心跳包,将SupervisorInfo发送至Zookeeper。
4. Supervisor中的Worker通过Woker.class类实现对task的管理,同事Worker将自身运行状态周期性的写入LocalState中,供Supervisor管理Worker使用。