一、StormSubmitter.class submitTopology方法
在完成Spout类开发->Bolt类开发->通过setSpout和setBolt方法定义Topology结构并createTopology这几个步骤后,最后一步即是调用submitTopology方法,将我们定义的Topology作业提交到JStorm集群中运行。我们代码中调用的submitTopology方法并非在JStom源码中,而是JStorm提供的jstorm-core.jar包中backtype.storm.StormSubmitter.class类的方法。submitTopology方法用于向JStorm集群中提交Topology作业,Topology作业一旦提交成功则会一直运行下去,直到显示的kill掉。
1. submitTopology方法中首先会验证参数stormConf是否能进行json序列化,若不能则报异常。
2. 然后会调用Utils.readCommandLineOpts()
和Utils.readStormConfig()
方法,从storm.option、default.yaml和storm.yaml中读取Storm的相关配置,并将配置中%JSTORM_HOME%参数替换为实际真实路径,然后与我们传递的参数stormConf(包含用户在storm整体的个人配置),共同构成Storm的conf,最后将存有所有相关配置的Map conf转化为Json字符串serConf。
3. 通过localNimbus对象来判定Topology是本地模式运行还是集群模式运行,若为本地模式则调用本地相关的submitTopology方法。若为集群模式,则首先根据conf,调用NimbusClient.getConfiguredClient(conf)
,新建NimbusClient对象。
4. 得到NimbusClient对象后,首先调用topologyNameExists(client, conf, name)
通过client与集群中的Nimbus通信,判断集群中是否已经存在该name的Topology作业,若存在则不能正常提交我们新的作业,因此报topology already exists异常。然后,调用submitJar(client, conf)
向集群中提交我们的Topology jar包。
5. 最后调用client.getClient().submitTopology(name, path, serConf, topology)
方法,通过Thrift与Nimbus通信,使用sendBase("submitTopology", args)
发送给JStorm集群。其中参数args为submitTopology_args类对象,该对象包含了Topolgoy名字name、jar的upload路径uploadedJarLocation、jsonConf和StormTopology对象,参数”submitTopology”为methodName,集群通过Thrift收到请求,并根据methodName调用相应方法。
二、StormSubmitter.class submitJar方法
1. submitJar方法用来向JStorm的Nimbus提交我们的Toplogy jar包,首先得到我们的本地Topology jar包名localJar,然后使用NimbusClient与Nimbus通信,获得jar包的上传路径;
2. 若我们设置了TOPOLOGY_LIB_NAME和TOPOLOGY_LIB_PATH,则需要上传Topology依赖的lib,若没有则跳过。
3. 最后调用ubmitJar(conf, localJar, uploadLocation, client)
方法,使用client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit))
将jar包上传至Nimbus中。