目录java
@(博客文章)[storm|大数据]node
待补充服务器
完整的默认配置文件见下面defaluts.yaml,若须要修改,则在storm.yaml中修改。重要参数以下:
一、storm.zookeeper.servers:指定使用哪一个zookeeper集群网络
storm.zookeeper.servers: - "gdc-nn01-test" - "gdc-dn01-test" - "gdc-dn02-test”
二、nimbus.host:指定nimbus是哪台机器session
nimbus.host: "gdc-nn01-test”
三、指定supervisor在哪一个端口上运行worker,每一个端口可运行一个worker,所以有多少个配置端口,则每一个supervisor有多少个slot(便可运行多少个worker)app
supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 storm.local.dir: "/home/hadoop/storm/data"
四、jvm设置dom
#jvm setting nimbus.childopts:"-4096m” supervisor.childopts:"-Xmx4096m" nimubs.childopts:"-Xmx3072m”
除此外,还有ui.childopts,logviewer.childoptsjvm
附完整配置文件:defaults.yamlide
########### These all have default values as shown ########### Additional configuration goes into storm.yaml java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib" ### storm.* configs are general configurations # the local dir is where jars are kept storm.local.dir: "storm-local" storm.zookeeper.servers: - "localhost" storm.zookeeper.port: 2181 storm.zookeeper.root: "/storm" storm.zookeeper.session.timeout: 20000 storm.zookeeper.connection.timeout: 15000 storm.zookeeper.retry.times: 5 storm.zookeeper.retry.interval: 1000 storm.zookeeper.retry.intervalceiling.millis: 30000 storm.cluster.mode: "distributed" # can be distributed or local storm.local.mode.zmq: false storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin" storm.messaging.transport: "backtype.storm.messaging.netty.Context" storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate" ### nimbus.* configs are for the master nimbus.host: "localhost" nimbus.thrift.port: 6627 nimbus.thrift.max_buffer_size: 1048576 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 nimbus.inbox.jar.expiration.secs: 3600 nimbus.task.launch.secs: 120 nimbus.reassign: true nimbus.file.copy.expiration.secs: 600 nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator" ### ui.* configs are for the master ui.port: 8080 ui.childopts: "-Xmx768m" logviewer.port: 8000 logviewer.childopts: "-Xmx128m" logviewer.appender.name: "A1" drpc.port: 3772 drpc.worker.threads: 64 drpc.queue.size: 128 drpc.invocations.port: 3773 drpc.request.timeout.secs: 600 drpc.childopts: "-Xmx768m" transactional.zookeeper.root: "/transactional" transactional.zookeeper.servers: null transactional.zookeeper.port: null ### supervisor.* configs are for node supervisors # Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.childopts: "-Xmx256m" #how long supervisor will wait to ensure that a worker process is started supervisor.worker.start.timeout.secs: 120 #how long between heartbeats until supervisor considers that worker dead and tries to restart it supervisor.worker.timeout.secs: 30 #how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary supervisor.monitor.frequency.secs: 3 #how frequently the supervisor heartbeats to the cluster state (for nimbus) supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true ### worker.* configs are for task workers worker.childopts: "-Xmx768m" worker.heartbeat.frequency.secs: 1 # control how many worker receiver threads we need per worker topology.worker.receiver.thread.count: 1 task.heartbeat.frequency.secs: 3 task.refresh.poll.secs: 10 zmq.threads: 1 zmq.linger.millis: 5000 zmq.hwm: 0 storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead. storm.messaging.netty.max_retries: 30 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. storm.messaging.netty.transfer.batch.size: 262144 # We check with this interval that whether the Netty channel is writable and try to write pending messages if it is. storm.messaging.netty.flush.check.interval.ms: 10 ### topology.* configs are for specific executing storms topology.enable.message.timeouts: true topology.debug: false topology.workers: 1 topology.acker.executors: null topology.tasks: null # maximum amount of time a message has to complete before it's considered failed topology.message.timeout.secs: 30 topology.multilang.serializer: "backtype.storm.multilang.JsonSerializer" topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null topology.max.spout.pending: null topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: null topology.executor.receive.buffer.size: 1024 #batched topology.executor.send.buffer.size: 1024 #individual messages topology.receiver.buffer.size: 8 # setting it too high causes a lot of problems (heartbeat thread gets starved, throughput plummets) topology.transfer.buffer.size: 1024 # batched topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy" topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 topology.max.error.report.per.interval: 5 topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer" topology.trident.batch.emit.interval.millis: 500 topology.classpath: null topology.environment: null dev.zookeeper.path: "/tmp/dev-storm-zookeeper"</span>
一、node(服务器):指一个storm集群中的supervisor服务器数量。
二、worker(jvm进程):指整个拓扑中worker进程的总数量,这些数量会随机的平均分配到各个node。
三、executor(线程):指某个spout或者bolt的总线程数量,这些线程会被随机平均的分配到各个worker。
四、task(spout/bolt实例):task是spout和bolt的实例,它们的nextTuple()和execute()方法会被executors线程调用。除非明确指定,storm会给每一个executor分配一个task。若是设置了多个task,即一个线程持有了多个spout/bolt实例.
注意:以上设置的都是总数量,这些数量会被平均分配到各自的宿主上,而不是设置每一个宿主进行多少个进程/线程。详见下面的例子。oop
一、node:买机器吧,而后加入集群中……
二、worker:Config#setNumWorkers() 或者配置项 TOPOLOGY_WORKERS
三、executor:Topology.setSpout()/.setBolt()
四、task:ComponentConfigurationDeclarer#setNumWorker()
// 建立topology TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);//设置executor数量为5 builder.setBolt("filter-bolt", new FilterBolt(), 3).shuffleGrouping( "kafka-reader");//设置executor数量为3 builder.setBolt("log-splitter", new LogSplitterBolt(), 3) .shuffleGrouping("filter-bolt");//设置executor数量为5 builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping( "log-splitter");//设置executor数量为2 // 启动topology Config conf = new Config(); conf.put(Config.NIMBUS_HOST, nimbusHost); conf.setNumWorkers(3); //设置worker数量 StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
一、经过config.setNumWorkers(3)将worker进程数量设置为3,假设集群中有3个node,则每一个node会运行一个worker。
二、executor的数量分别为:
spout:5
filter-bolt:3
log-splitter:3
hdfs-bolt:2
总共为13个executor,这13个executor会被随机分配到各个worker中去。
注:这段代码是从kafka中读取消息源的,而这个topic在kafka中的分区数量设置为5,所以这里spout的线程ovtn为5.
三、这个示例都没有单独设置task的数量,即便用每一个executor一个task的默认配置。若须要设置,能够:
builder.setBolt("log-splitter", new LogSplitterBolt(), 3) .shuffleGrouping("filter-bolt").setNumTasks(5);
来进行设置,这5个task会被分配到3个executor中。
(四)并行度的动态调整
对storm拓扑的并行度进行调整有2种方法:
一、kill topo—>修改代码—>编译—>提交拓扑
二、动态调整
第1种方法太不方便了,有时候topo不能说kill就kill,另外,若是加几台机器,难道要把全部topo kill掉还要修改代码?
所以storm提供了动态调整的方法,动态调整有2种方法:
一、ui方式:进入某个topo的页面,点击rebalance便可,此时能够看到topo的状态是rebalancing。但此方法只是把进程、线程在各个机器上从新分配,即适用于增长机器,或者减小机器的情形,不能调整worker数量、executor数量等
二、cli方式:storm rebalance
举个例子
storm rebalance toponame -n 7 -e filter-bolt=6 -e hdfs-bolt=8
将topo的worker数量设置为7,并将filter-bolt与hdfs-bolt的executor数量分别设置为六、8.
此时,查看topo的状态是rebalancing,调整完成后,能够看到3台机器中的worker数量分别为三、二、2
Storm经过分组来指定数据的流向,主要指定了每一个bolt消费哪一个流,以及如何消费。
storm内置了7个分组方式,并提供了CustomStreamGrouping来建立自定义的分组方式。
一、随机分组 shuffleGrouping
这种方式会随机分发tuple给bolt的各个task,每一个task接到到相同数量的tuple。
二、字段分组 fieldGrouping
按照指定字段进行分组,该字段具备相同组的会被发送到同一个task,具体不一样值的可能会被发送到不一样的task。
三、全复制分组 allGrouping(或者叫广播分组)
每个tuple都会发送给全部的task,必须当心使用。
四、全局分组 globlaGrouping
将全部tuple均发送到惟一的task,会选取task ID最小的task。这种分组下,设置task的并行度是没有意义的。另外,这种方式颇有可能引发瓶颈。
五、不分组 noneGrouping
留做之后使用,目前也随机分组相同。
六、指向型分组 directGrouping(或者叫直接分组)
数据源会调用emitDirect()方法来判断一个tuple应该由哪一个storm组件来接收,只能在声明了是指向型的数据流上使用。
七、本地或随机分组 localOrShuffleGrouping
若是接收bolt在同一个进程中存在一个或者多个task,tuple会优先发送给这个task。不然和随机分组同样。相对于随机分组,此方式能够减小网络传输,从而提升性能。
可靠性:spout发送的消息会被拓扑树上的全部节点ack,不然会一直重发。
致使重发的缘由有2个:
(1)fail()被调用
(2)超时无响应。
完整的可靠性示例请参考storm blueprint的chapter1 v4代码,或者P22,或者参考从零开始学storm P102页的例子。
关键步骤以下:
一、建立一个map,用于记录已经发送的tuple的id与内容,此为待确认的tuple列表。
private ConcurrentHashMap<UUID,Values> pending;
二、发送tuple时,加上一个参数用于指明该tuple的id。同时,将此tuple加入map中,等待确认。
UUID msgId = UUID.randomUUID();
this.pending.put(msgId,values);
this.collector.emit(values,msgId);
三、定义ack方法与fail方法。
ack方法将tuple从map中取出
this.pending.remove(msgId);
fail方法将tuple从新发送
this.collector.emit(this.pending.get(msgId),msgId);
对于没回复的tuple,会定时从新发送。
处理该tuple的每一个bolt均须要增长如下内容: 一、emit时,增长一个参数anchor,指定响应的tuple collector.emit(tuple,new Values(word)); 二、确认接收到的tuple已经处理 this.collector.ack(tuple);