一、TopologyBuilder.class Topology模版
TopologyBuilder提供了Java API的模版,我们通过TopologyBuilder可以建立属于自己的Topology并调用submitTopology方法即可在JStorm集群上启动该Topology。在JStorm源码中,TopologyBuilder.class中有如下注释,为我们提供了建立Topology的模版:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29//集群模式下
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("1", new TestWordSpout(true), 5);
builder.setSpout("2", new TestWordSpout(true), 3);
builder.setBolt("3", new TestWordCounter(), 3).fieldsGrouping("1", new Fields("word")).fieldsGrouping("2", new Fields("word"));
builder.setBolt("4", new TestGlobalCount()).globalGrouping("1");
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 4);
StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());
//本地模式下
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("1", new TestWordSpout(true), 5);
builder.setSpout("2", new TestWordSpout(true), 3);
builder.setBolt("3", new TestWordCounter(), 3).fieldsGrouping("1", new Fields("word")).fieldsGrouping("2", new Fields("word"));
builder.setBolt("4", new TestGlobalCount()).globalGrouping("1");
Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 4);
conf.put(Config.TOPOLOGY_DEBUG, true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Utils.sleep(10000);
cluster.shutdown();
在开发JStorm作业时,我们首先建立自己的Spout和Bolt类,处理相关的逻辑,然后使用TopologyBulider中的SetSpout与SetBolt方法确定各Spout与Bolt之间的数据流,最后调用TopologyBuilder的createTopology方法建立StormTopology,并将该Topology提交到JStorm集群中运行。
二、ComponentCommon.class
1. 在此之前,我们首先介绍ComponentCommon.class。CompoenentCommon对象包含了各个Spout和Bolt的配置信息,正是通过该对象,才能完美描绘出整个Topology的数据流拓扑结构,其中包括每个数据流所包含的Fields(字段)、各个Spout和Bolt之间的Grouping信息等。
2. ComponentCommon主要包含以下域:1
2
3
4private Map<GlobalStreamId,Grouping> inputs; // required
private Map<String,StreamInfo> streams; // required
private int parallelism_hint; // optional
private String json_conf; // optional
Map<GlobalStreamId,Grouping> inputs
中的GlobalSteamId由ComponentId和StreamId共同构成,Grouping包含分组信息。Map<String,StreamInfo> streams
中的String是指StreamId,StreamInfo包含了每个数据流中所包含了Fields(字段),这些字段其实是由我们在各个Spout和Bolt中的declareOutpoutFields
方法中进行定义的。parallelism_hint
指并行度,即每个Spout或Bolt在集群中的Task总数。该字段为可选的,若没有设置该字段,则默认为1。json_conf
为每个Spout和Bolt的Json字符串形式的配置信息。该配置信息可在每个Spout或Bolt中的getComponentConfiguration()
方法中进行配置。
三、TopologyBuilder.class setSpout方法
1 | public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) { |
setSpout方法中,id为Spout的名字,IRichSpout是实现IRichSpout接口的对象实例,parallelism_hint是该Spout的并行度,即JStorm集群中运行该Spout的task总数。每一个task都是worker中的一个线程。
1. setSpout首先调用validateUnusedID(id)方法,检测_bolts、_spouts这两个map中是否已经包含该id,若包含则说明注册了已存在的id,则报重复异常。
2. 调用initCommon方法,建立该Spout对应的ComponentCommon对象,并对其初始化,ComponentCommon对象包含了数据流的分组方式Map<GlobalStreamId,Grouping> inputs
,数据流所包含的output_fields信息Map<String, StreamInfo> stream
, 并行度parallelism_hint
,以及json格式的每个Component所对应的配置conf(该conf是独属于每个Componnet的,有别于Topology的conf)。配置完成后,将其放入全局变量Map<String, ComponentCommon> _commons
中保存。
3. 调用_spouts.put(id,spout)
,将spout对象实例放入全局变量Map<String, IRichSpout> _spouts
中。
四、TopologyBuilder.class setBolt方法
1 | public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) { |
1. 在createTopology()方法中,首先遍历_bolts和_spouts这两个map中所存储的boltId、spoutId,以及Bolt实例和Spout实例,然后调用getComponentCommon方法,该方法从_commons中获得setSpout和setBolt时所设置的ComponnetCommon对象,并调用Spout实例和Bolt实例中的declareOutputFields方法,初始化ComponentCommon对象的Map<String, StreamInfo> streams域,streams保存了每个数据流所包含的各个字段。然后将Bolt和Spout实例进行Java序列化生成ComponentObject对象(这里需要注意的是,因为creatTopology方法需要将我们自己定义的Bolt和Spout类对象进行序列化,所以我们在定义Bolt和Spout不要在其构造器中新建不能被序列化的对象,即使需要新建不能序列化的对象,那也应该放在prepare方法中进行。作者在一开始开发JStorm作业时就曽碰到过这种问题,就是因为在类的构造器中新建了不能被序列化的对象,修改为在prepare方法中新建该对象,即可解决此类异),与之前得到的ComponentCommon对象一起存入Map<String, Bolt> boltSpecs和Map<String, SpoutSpec> spoutSpecs中。
2. 最后调用StormTopology的构造器,用每个Bolt和Spout的Java序列化对象和包含数据流各种信息的ComponentCommon对象,共同生成StormTopology对象。
1 | private ComponentCommon getComponentCommon(String id, IComponent component) { |