storm集群任务提交流程

storm集群任务提交流程

问题:
1、集群如何启动,任务如何执行?
java -server nimbus ,supervisor
client —> createTopology(序列化)—> 提交jar到nimbus的inbox文件夹—>nimbus分配任务(task总数/worker总数 )—>写到zk上。
zk<---watch---supervisor--->识别自己的任务—>启动worker
zk<---worker--->TaskInfo—>启动Spout/Bolt

一个worker=一个进程=一个jvm=一个端口号=一个槽

集群架构中的各个模块如何启动?
nimbus:用户启动
supervisor:用户启动
worker:supervisor启动
Task:worker启动

任务如何分配,如何执行?
看图说话

2、集群如何通信?
集群架构中的各个模块是如何通信的?外部通信
拓扑程序中的各个Task是如何通信的?内部通信

3、如何保证消息不丢失?
ack-fail机制如何实现的?

4、【手动练习】尝试自己实现一个而类似storm数据执行的框架

1 、 集群如何启动,任务如何执行?

storm启动流程分析

-——————————

程序员client

1、客户端运行storm nimbus时,会调用storm的脚本,生成一条java命令,命令格式如下:java -server xxxx.ClassName -args

1
2
numbus--->Running:/usr/local/jdk1.8.0_181/bin/java -server 一段参数 org.apache.storm.daemon.nimbus
supervisor--->Running:/usr/local/jdk1.8.0_181/bin/java -server 一些参数 org.apache.storm.daemon.supervisor.Supervisor

-——————————-

nimbus

2、nimbus启动之后,接受客户端提交任务
命令格式:storm jar xxx.jar xxx驱动类 (参数)
Running: /usr/local/jdk1.8.0_181/bin/java -client -Dstorm.jar=storm.jar cn.itcast.storm.WordCountTopologyMain

该命令会执行storm.jar中的cn.itcast.storm.WordCountTopologyMain类中main方法,main方法中会执行一下代码:StormSubmitter.submitTopology("mywordcount", config , topologyBuilder.createTopology());,topologyBuilder.createTopology()会将程序员编写的spout对象和bolt对象序列化
会将用户的jar上传到nimbus物理节点的/home/hadoop/app/storm/localdir/nimbus/inbox文件夹下,并且改名,改名的规则时添加一个UUID字符串
在即将运行的worker物理节点的 /home/hadoop/app/storm/localdir/supervisor/stormdist目录下生成一个文件夹,里面保存着正在运行的topology的jar包和配置文件,序列化对象文件

3、nimbus接受到任务之后,会将任务进行分配,分配会产生一个assignment对象,该对象会保存到zk中,目录时/storm/assignments中,该目录只保存正在运行的topology任务

-——————————–

supervisor

4supervisor通过watch机制,感知到nimbus在zk上的任务信息,从zk上拉取任务信息,分辨出属于自己的任务。(查看nimbus.log)

1
2
executor->node+port {[8 8] ["eb2ebcc7-9bf9-4b86-9150-b72e3bfff75e" 6701], [2 2] ["eb2ebcc7-9bf9-4b86-9150-b72e3bfff75e" 6701]等等
worker->resources {["feafc90f-afe4-4893-8148-2ce416a9de00" 6700] [0.0 0.0 0.0], ["1656a4db-3f89-4986-a496-51cd7f0d75ea" 6700] [0.0 0.0 0.0]}, :owner "hadoop"}

5根据自己的任务信息,启动自己的worker,并分配一个端口(查看日志文件supervisor.log)

1
2
'/usr/local/jdk1.8.0_181/bin/java' '-cp' '/home/hadoop/app/storm/localdir/supervisor/stormdist/mywordcount-3-1541871500/stormjar.jar' '-Xmx64m'
'org.apache.storm.daemon.worker' 'mywordcount-3-1541871500' '1656a4db-3f89-4986-a496-51cd7f0d75ea' '6700' 'aa39d565-6dd8-4631-a9cb-48a188415632'

-——————————–

worker

6worker启动之后,连接zk

1
2
executor->node+port {[8 8] ["eb2ebcc7-9bf9-4b86-9150-b72e3bfff75e" 6701], [2 2] ["eb2ebcc7-9bf9-4b86-9150-b72e3bfff75e" 6701]等等
worker->resources {["feafc90f-afe4-4893-8148-2ce416a9de00" 6700] [0.0 0.0 0.0], ["1656a4db-3f89-4986-a496-51cd7f0d75ea" 6700] [0.0 0.0 0.0]}, :owner "hadoop"}

假设任务信息为:
1—>spout—type : spout
2—>bolt—type : bolt
3—>acker—type :bolt

得到对象有几种方式?
new ClassName创建对象、class.forName反射对象、clone 克隆对象、序列化反序列化

worker通过反序列化,得到自定义的spout和bolt对象

7、worker根据任务类型,分别执行spout任务或者bolte任务
spout的生命周期:open、nextTuple、declareOutputFields
bolt的生命周期:prepare、execute(Tuple)、declareOutputFields