目前经常使用的消息队列组建无非就是MSMQ和ActiveMQ,至于他们的异同,这里不想作过多的比较。简单来讲,MSMQ内置于微软操做系统之中,在部署上包含一个隐性条件:Server须要是微软操做系统。(对于这点我并去调研过MSMQ是否能够部署在非微软系统,好比:Linux,只是拍脑壳想了想,感受上是不能够)。对于ActiveMQ,微软系统和Linux都是能够部署的。从功能方面来讲,通常最经常使用的就是:消息的收/发,感受差别不大。从性能上来讲,通常的说法是ActiveMQ略高。在稳定性上,我的感受MSMQ更好。若是这两种经常使用队列都用过的同窗,应该来讲最大的差别在于:MSMQ若是要访问远程队列(好比机器A上的程序访问机器B上的队列),会比较恶心。在数据量比较大的状况之下,通常来讲队列服务器会专门的一台或者多台(多台的话,用程序去作热备+负载比较方便,也不须要额外的硬件成本。简单来讲作法能够这样:消息发送的时候随机向着多台队列服务器上发送消息;接受的时候开多个线程去分别监听;热备方面,能够维护一个带状态的队列链接池,若是消息收发失败那么将状态置为不可用,而后起一个线程去定时监测坏的链接是否可用,这个过程通常状况下能够不用加锁,为何,你们根据各自须要去取舍吧)。最近搞完了短彩信的网关链接服务,这两种队列我均使用了。大体的过程是这样的:上层应用若是要发端彩信,那么将消息直接发送至ActiveMQ(目前用的就是上面说的多台热备+负载,由于实际中下行量很是大5千万条/天以上),而后端彩信网关链接服务部署多套,每套均依赖本机的MSMQ。为何呢?用ActiveMQ的缘由是:上层应用程序和网关链接服务彼此独立,消息须要跨机访问。用MSMQ的缘由是:ActiveMQ中的数据是一条不分省的大队列,网关链接服务须要按省流控,因此端彩信网关链接服务:首先把消息从ActiveMQ取出来,而后存至本机上的分省MSMQ,这样作另外的一个好处就是:ActiveMQ不至于过多挤压,他的数据会分摊到N台短彩信网关链接服务所部署的机器上的MSMQ之中,也就说MSMQ能够起到分摊数据和缓冲的做用。html
在以前的随笔中,已经介绍过MSMQ,如今先介绍一下ActiveMQ一些配置,目前好像ActiveMQ配置上的介绍还比较少。如下是本身总结一些相关资料,贴出来给你们共享java
官方:node
From 5.3 onwards - we recommend you use KahaDB - which offers improved scalability and recoverability over the AMQ Message Store.apache
The AMQ Message Store which although faster than KahaDB - does not scales as well as KahaDB and recovery times take longer.后端
非官方:缓存
kaha文件系统实际上上是一个文件索引系统,有两部分组成,一个是数据文件系统,由一个个独立的文件组成,缺省文件大小是32M大(可配置),另一个是索引文件系统,记录消息在数据文件中的位置信息以及数据文件中的空闲块信息。数据文件是存储到硬盘上的,索引文件是缓存在内存中的。因此这个存储系统对大消息存储有利,象咱们的memberId之类的文本消息,其实是浪费,索引比消息还大,哈。安全
我方分析:服务器
推荐: Amq持久方式session
理由:虽然官方推荐使用KahaDB持久方式,但其提到的优点:可伸缩性和恢复性较好,对于咱们实际的应用意义不大。从咱们本身的使用经验来看,KahaDB持久方式,Data文件是一个大文件(感受文件过大后,形成队列服务瘫死的可能性会增大),从官网的相关配置(附录1)也找不到哪里能够设置数据的文件的最大Size。)而Amq持久方式能够设置Data文件最大Size,这样能够保证即时消息积压不少,Data文件也不至于过大。并发
解决方法:
在创建链接的Uri中加入: wireFormat.maxInactivityDuration=0
参考资源:
http://jinguo.iteye.com/blog/243153
You can do the following to fix the issues:
1) Append max inactivity duration to your Uri in the format below: wireFormat.maxInactivityDuration=0
2) Use the same Uri at the client side as well as at the server side
Regards,
若是不这样设置,对应的错误会出现:
2008-05-07 09:22:56,343 [org.apache.activemq.ActiveMQConnection]-[WARN] Async exception with no exception listener: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
org.apache.activemq.transport.InactivityIOException: Channel was inactive for too long: localhost/127.0.0.1:61616
ActiveMQ的tcp url:tcp://localhost:61616后面要加入?wireFormat.maxInactivityDuration=0 这样的参数,不然当一段时间没有消息发送时会抛出 "Channel was inactive for too long"异常
解决方法:
1)关闭ActiveMqLog4j
打开:conf/log4j.properties
将:log4j.rootLogger=INFO, console, logfile
修改成:log4j.rootLogger=OFF
2)在创建链接的Uri中加入: maxInactivityDurationInitalDelay=30000
例如北京的测试环境链接Uri:
tcp://192.168.199.80:61616?wireFormat.maxInactivityDuration=0&maxInactivityDurationInitalDelay=30000&connection.AsyncSend=true
参考资源:
If you get exception like this,it can mean one of three things:
1. You're connecting to the port not used by ActiveMQ TCP transport
Make sure to check that you're connecting to the appropriate host:port
2. You're using log4j JMS appender and doesn't filter out ActiveMQ log messages
Be sure to read How do I use log4j JMS appender with ActiveMQ and more importantly to never send ActiveMQ log messages to JMS appender
3. Your broker is probably under heavy load (or network connection is unreliable), so connection setup cannot be completed in a reasonable time
If you experience sporadic exceptions like this, the best solution is to use failover transport, so that your clients can try connecting again if the first attempt fails. If you're getting these kind of exceptions more frequently you can also try extending wire format negotiation period (default 10 sec). You can do that by using wireFormat.maxInactivityDurationInitalDelay property on the connection URL in your client.
For example
tcp://localhost:61616?wireFormat.maxInactivityDurationInitalDelay=30000
will use 30 sec timeout.(貌似有问题!!!)
解决方法:
1) 设置Java最大内存限制为合适大小:
Bin/activemq.bat 中ACTIVEMQ_OPTS=-Xmx512M(默认是512)
2)Activemq.xml配置节:systemUsage/ systemUsage配置大小合适,而且特别注意:大于全部durable desitination设置的memoryUsage之和。
备注:
1)尖括号:“>”表明通配符
2)ACTIVEMQ_OPTS的配置〉=memoryUsage中配置〉=全部durable desitination设置之和
3)SystemUsage配置设置了一些系统内存和硬盘容量,当系统消耗超过这些容量设置时,amq会“slow down producer”,仍是很重要的。
参考资料:
http://m.oschina.net/blog/26216
参考-- http://activemq.apache.org/javalangoutofmemory.html
对于MQ的内容实用是可管理和可配置的。首先须要判断的是MQ的哪部分系统因内存不足而致使泄漏,是JVM,broker仍是消费者、生产者?
1、内存管理
JVM内存管理:
1. 用bin/activemq命令在独立JVM中运行broker。用-Xmx和-Xss命令便可(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数便可);
2. 默认状况下,MQ用512M的JVM;
broker内存管理:
1. broker使用的内存并非由JVM的内存决定的。虽然受到JVM的限制,但broker确实独立管理器内存;
2. systemUsage和destination的内存限制与broker内存息息相关;
3. MQ中内存的关系是:JVM->Broker->broker features;
4. 全部destination的内存总量不能超过broker的总内存;
消费者:
1. 因为消息大小能够配置,prefetch limit每每是致使内存溢出的主要缘由;
2. 减小prefetch limit的大小,会减小消费者内存中存储的消息数量;
生产者:
1. 除非消息数量超过了broker资源的限制,不然生产者不会致使内存溢出;
2. 当内存溢出后,生产者会收到broker的阻塞信息提示;
2、其余
将消息缓冲之硬盘:
1. 只有当消息在内存中存储时,才容许消息的快速匹配与分发,而当消费者很慢或者离开时,内存可能会耗尽;
2. 当destination到达它的内存临界值时,broker会用消息游标来缓存非持久化的消息到硬盘。
3. 临界值在broker中经过memoryUsage和systemUsage两个属性配置,请参考activemq.xml;
4. 对于缓慢的消费者,当还没有耗尽内存或者转变为生产者并发控制模式前,这个特性容许生产者继续发送消息到broker;
5. 当有多个destination的时候,默认的内存临界值可能被打破,而这种状况将消息缓存到硬盘就显得颇有意义;
6. precentUsage配置:使用百分比来控制内存使用状况;
多个线程:
1. 默认状况下,MQ每一个destination都对应惟一的线程;
2. -Dorg.apache.activema.UseDedicatedTaskRunner=false(activemq.bat文件中修改ACTIVEMQ_OPTS选项参数便可),用线程池来限制线程的数量,从而减小内存消耗;
大数据传输:
1. destination policies--maxPageSize:控制进入内存中的消息数量;lazyDispatch:增长控制使用当前消费者列表的预取值;
2. 使用blogMessage或者streamsMessage类型来进行大量文件的传输;
泄漏JMS资源:
1. 当session或者producer或者consumer大量存在而没有关闭的时候;
2. 使用PooledConnectionFactory;
解决方法:
不采用failover链接
分析:
采用failover方式链接,若是所要链接的服务器或者Activemq服务宕了,那么程序会一直处于等待状态,不超时,不报错。
property name |
default value |
Comments |
directory |
activemq-data |
the path to the directory to use to store the message store data and log files |
indexWriteBatchSize |
1000 |
number of indexes written in a batch |
indexCacheSize |
10000 |
number of index pages cached in memory |
enableIndexWriteAsync |
false |
if set, will asynchronously write indexes |
journalMaxFileLength |
32mb |
a hint to set the maximum size of the message data logs |
enableJournalDiskSyncs |
true |
ensure every non transactional journal write is followed by a disk sync (JMS durability requirement) |
cleanupInterval |
30000 |
time (ms) before checking for a discarding/moving message data logs that are no longer used |
checkpointInterval |
5000 |
time (ms) before checkpointing the journal |
ignoreMissingJournalfiles |
false |
If enabled, will ignore a missing message log file |
checkForCorruptJournalFiles |
false |
If enabled, will check for corrupted Journal files on startup and try and recover them |
checksumJournalFiles |
false |
create a checksum for a journal file - to enable checking for corrupted journals |
Available since version 5.4: |
||
archiveDataLogs |
false |
If enabled, will move a message data log to the archive directory instead of deleting it. |
directoryArchive |
null |
Define the directory to move data logs to when they all the messages they contain have been consumed. |
databaseLockedWaitDelay |
10000 |
time (ms) before trying to get acquire a the database lock (used by shared master/slave) |
maxAsyncJobs |
10000 |
the maximum number of asynchronous messages that will be queued awaiting storage (should be the same as the number of concurrent MessageProducers) |
concurrentStoreAndDispatchTopics |
false |
enable the dispatching of Topic messages to interested clients to happen concurrently with message storage |
concurrentStoreAndDispatchQueues |
true |
enable the dispatching of Queue messages to interested clients to happen concurrently with message storage |
property name |
default value |
Comments |
directory |
activemq-data |
the path to the directory to use to store the message store data and log files |
useNIO |
true |
use NIO to write messages to the data logs |
syncOnWrite |
false |
sync every write to disk |
maxFileLength |
32mb |
a hint to set the maximum size of the message data logs |
persistentIndex |
true |
use a persistent index for the message logs. If this is false, an in-memory structure is maintained |
maxCheckpointMessageAddSize |
4kb |
the maximum number of messages to keep in a transaction before automatically committing |
cleanupInterval |
30000 |
time (ms) before checking for a discarding/moving message data logs that are no longer used |
indexBinSize |
1024 |
default number of bins used by the index. The bigger the bin size - the better the relative performance of the index |
indexKeySize |
96 |
the size of the index key - the key is the message id |
indexPageSize |
16kb |
the size of the index page - the bigger the page - the better the write performance of the index |
directoryArchive |
archive |
the path to the directory to use to store discarded data logs |
archiveDataLogs |
false |
if true data logs are moved to the archive directory instead of being deleted |
property name |
default value |
Comments |
memoryUsage |
20M |
amq使用内存大小,照amq论坛上说,这个值应该大于全部durable desitination设置的 memoryUsage之和,不然会致使硬盘swap,影响性能。 |
storeUsage |
1G |
kaha数据存储大小,若是设置不足,性能会降低到1个1个发 |
tempUsage |
100M |
非persistent的消息存储在temp区域 |
http://activemq.apache.org/nms/configuring.html
Option Name |
Default |
Description |
transport.timeout |
-1 |
Time that a send operation blocks before failing. |
transport.initialReconnectDelay |
10 |
Time in Milliseconds that the transport waits before attempting to reconnect the first time. |
transport.maxReconnectDelay |
30000 |
The max time in Milliseconds that the transport will wait before attempting to reconnect. |
transport.backOffMultiplier |
2 |
The amount by which the reconnect delay will be multiplied by if useExponentialBackOff is enabled. |
transport.useExponentialBackOff |
true |
Should the delay between connection attempt grow on each try up to the max reconnect delay. |
transport.randomize |
true |
Should the Uri to connect to be chosen at random from the list of available Uris. |
transport.maxReconnectAttempts |
0 |
Maximum number of time the transport will attempt to reconnect before failing (0 means infinite retries) |
transport.startupMaxReconnectAttempts |
0 |
Maximum number of time the transport will attempt to reconnect before failing when there has never been a connection made. (0 means infinite retries) (included in NMS.ActiveMQ v1.5.0+) |
transport.reconnectDelay |
10 |
The delay in milliseconds that the transport waits before attempting a reconnection. |
transport.backup |
false |
Should the Failover transport maintain hot backups. |
transport.backupPoolSize |
1 |
If enabled, how many hot backup connections are made. |
transport.trackMessages |
false |
keep a cache of in-flight messages that will flushed to a broker on reconnect |
transport.maxCacheSize |
256 |
Number of messages that are cached if trackMessages is enabled. |
transport.updateURIsSupported |
true |
Update the list of known brokers based on BrokerInfo messages sent to the client. |
Option Name |
Default |
Description |
connection.AsyncSend |
false |
Are message sent Asynchronously. |
connection.AsyncClose |
true |
Should the close command be sent Asynchronously |
connection.AlwaysSyncSend |
false |
Causes all messages a Producer sends to be sent Asynchronously. |
connection.CopyMessageOnSend |
true |
Copies the Message objects a Producer sends so that the client can reuse Message objects without affecting an in-flight message. |
connection.ProducerWindowSize |
0 |
The ProducerWindowSize is the maximum number of bytes in memory that a producer will transmit to a broker before waiting for acknowledgement messages from the broker that it has accepted the previously sent messages. In other words, this how you configure the producer flow control window that is used for async sends where the client is responsible for managing memory usage. The default value of 0 means no flow control at the client. See also Producer Flow Control |
connection.useCompression |
false |
Should message bodies be compressed before being sent. |
connection.sendAcksAsync |
false |
Should message acks be sent asynchronously |
connection.messagePrioritySupported |
true |
Should messages be delivered to the client based on the value of the Message Priority header. |
connection.dispatchAsync |
false |
Should the broker dispatch messages asynchronously to the connection's consumers. |
Option Name |
Default |
Description |
wireFormat.stackTraceEnabled |
false |
Should the stack trace of exception that occur on the broker be sent to the client? Only used by openwire protocol. |
wireFormat.cacheEnabled |
false |
Should commonly repeated values be cached so that less marshalling occurs? Only used by openwire protocol. |
wireFormat.tcpNoDelayEnabled |
false |
Does not affect the wire format, but provides a hint to the peer that TCP nodelay should be enabled on the communications Socket. Only used by openwire protocol. |
wireFormat.sizePrefixDisabled |
false |
Should serialized messages include a payload length prefix? Only used by openwire protocol. |
wireFormat.tightEncodingEnabled |
false |
Should wire size be optimized over CPU usage? Only used by the openwire protocol. |
wireFormat.maxInactivityDuration |
30000 |
The maximum inactivity duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to appear to die, so we allow the broker to kill connections if they are inactive for a period of time. Use by some transports to enable a keep alive heart beat feature. Set to a value <= 0 to disable inactivity monitoring. |
maxInactivityDurationInitalDelay |
10000 |
The initial delay in starting the maximum inactivity checks (and, yes, the word 'Inital' is supposed to be misspelled like that) |
一、控制台安全配置,打开conf/jetty.xml文件,找到
<bean id="securityConstraint" class="org.eclipse.jetty.http.security.Constraint">
<property name="name" value="BASIC" />
<property name="roles" value="admin" />
<property name="authenticate" value="false" />
</bean>
将“false”改成“true”便可。用户名和密码存放在conf/jetty-realm.properties文件中。
二、生产者和消费者链接MQ须要密码
打开conf/activemq.xml文件,在<broker>标签里的<systemUsage>标签前加入:
<plugins>
<simpleAuthenticationPlugin>
<users>
<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
</users>
</simpleAuthenticationPlugin>
</plugins>
注意必须在<systemUsage>标签前,不然启动ActiveMQ会报错。
用户名和密码存放在conf/credentials.properties文件中