一、添加maven依赖的pom.xmljava
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <parent> <artifactId>im_home</artifactId> <groupId>com.autohome</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>im-statistics</artifactId> <packaging>war</packaging> <name>im-statistics Maven Webapp</name> <url>http://maven.apache.org</url> <properties> <!-- spring版本号 --> <spring.version>4.0.4.RELEASE</spring.version> <!-- mybatis版本号 --> <mybatis.version>3.2.6</mybatis.version> <!-- log4j日志文件管理包版本 --> <slf4j.version>1.7.7</slf4j.version> <log4j.version>1.2.17</log4j.version> </properties> <dependencies> <!-- spring核心包 --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-oxm</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-tx</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jdbc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aop</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> </dependency> <!-- mybatis核心包 --> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>${mybatis.version}</version> </dependency> <!-- mybatis/spring包 --> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis-spring</artifactId> <version>1.2.2</version> </dependency> <!-- 导入Mysql数据库连接jar包 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.30</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <!-- 格式化对象,方便输出日志 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.1.41</version> </dependency> <!--数据库链接池--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.18</version> </dependency> <!-- 导入java ee jar 包 --> <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>1.0.2.RELEASE</version> </dependency> <dependency> <groupId>org.freemarker</groupId> <artifactId>freemarker</artifactId> <version>2.3.20</version> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.3</version> </dependency> <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz-jobs --> <!--<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz-jobs</artifactId> <version>2.2.3</version> </dependency>--> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>3.12</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.0</version> </dependency> </dependencies> <build> <finalName>im-statistics</finalName> <!-- <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins>--> <resources> <resource> <directory>src/main/profile/${prop.dir.name}</directory> <targetPath>./</targetPath> </resource> <resource> <directory>src/main/resources</directory> <targetPath>./</targetPath> </resource> </resources> </build> <profiles> <profile> <id>dev</id> <activation> <activeByDefault>true</activeByDefault> </activation> <properties> <prop.dir.name>dev</prop.dir.name> </properties> </profile> <profile> <id>local</id> <properties> <prop.dir.name>local</prop.dir.name> </properties> </profile> <profile> <id>beta</id> <properties> <prop.dir.name>beta</prop.dir.name> </properties> </profile> <profile> <id>online</id> <properties> <prop.dir.name>online</prop.dir.name> </properties> </profile> <profile> <id>ds-production</id> <properties> <prop.dir.name>ds-production</prop.dir.name> </properties> </profile> </profiles> </project>
二、web.xml配置mysql
<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd" > <web-app> <!-- Spring和mybatis的配置文件 --> <context-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring-mybatis.xml</param-value> </context-param> <!-- 编码过滤器 --> <filter> <filter-name>encodingFilter</filter-name> <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class> <init-param> <param-name>encoding</param-name> <param-value>UTF-8</param-value> </init-param> </filter> <filter-mapping> <filter-name>encodingFilter</filter-name> <url-pattern>/*</url-pattern> </filter-mapping> <!-- Spring监听器 --> <listener> <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class> </listener> <!-- 防止Spring内存溢出监听器 --> <listener> <listener-class>org.springframework.web.util.IntrospectorCleanupListener</listener-class> </listener> <!-- Spring MVC servlet --> <servlet> <servlet-name>SpringMVC</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring-mybatis.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> </servlet> <servlet-mapping> <servlet-name>SpringMVC</servlet-name> <!-- 此处能够能够配置成*.do,对应struts的后缀习惯 --> <url-pattern>/</url-pattern> </servlet-mapping> </web-app>
三、spring-rabbmitmq.xmlweb
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
<!-- 引入配置文件 --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:rabbitmq.properties</value> </list> </property> </bean>
<rabbit:connection-factory id="rabbitmqConnectionFactory" username="${rmq.manager.user}" password="${rmq.manager.password}" host="${rmq.ip}" virtual-host="${rmq.vhost}" /> <rabbit:admin connection-factory="rabbitmqConnectionFactory"/> <rabbit:template connection-factory="rabbitmqConnectionFactory" message-converter="jsonMessageConverter" exchange="testExchangeTopic" id="amqpTemplate"/> <rabbit:queue id="lanyangyang123" durable="true" auto-delete="false" exclusive="false" name="lanyangyang123"/> <rabbit:queue id="lanyangyang234" durable="true" auto-delete="false" exclusive="false" name="lanyangyang234"/> <!-- 1 direct exchange 一条消息能够发送到多个queue,只要这些queue的routeKey相同便可--> <rabbit:direct-exchange name="testExchange3" durable="true" id="testExchange3"> <rabbit:bindings> <!--此key就是routekey--> <rabbit:binding queue="lanyangyang123" key="lanyangyang_queue_key" /> <rabbit:binding queue="lanyangyang234" key="lanyangyang_queue_key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 2 topic exchange--> <rabbit:queue id="lanyangyang.log.test1" durable="true" auto-delete="false" exclusive="false" name="lanyangyang.log.test1"/> <rabbit:queue id="lanyangyang.log.test2" durable="true" auto-delete="false" exclusive="false" name="lanyangyang.log.test2"/> <rabbit:topic-exchange name="testExchangeTopic" durable="true" auto-delete="false" id="testExchangeTopic"> <rabbit:bindings> <rabbit:binding queue="lanyangyang.log.test1" pattern="#.log.#" /> <rabbit:binding queue="lanyangyang.log.test2" pattern="#.log.#" /> </rabbit:bindings> </rabbit:topic-exchange> <!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象--> <rabbit:listener-container connection-factory="rabbitmqConnectionFactory" acknowledge="auto"> <rabbit:listener queues="lanyangyang123" ref="rabbmitmqQueueListener"/> </rabbit:listener-container> <bean id="rabbmitmqQueueListener" class="com.autohome.statistics.mq.listener.RabbmitmqQueueListener" /> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,因为fastjson的速度快于jackson,这里替换为fastjson的一个实现 --> <bean id="jsonMessageConverter" class="com.autohome.statistics.mq.util.FastJsonMessageConverter" /> </beans>
四、rabbitmq.propertiesredis
rmq.ip=192.168.1.20 rmq.producer.num=20 rmq.manager.user=admin rmq.manager.password=admin rmq.vhost=/vhost1
五、FastJsonMessageConverter.javaspring
public class FastJsonMessageConverter extends AbstractMessageConverter { private static Log log = LogFactory.getLog(FastJsonMessageConverter.class); public static final String DEFAULT_CHARSET = "UTF-8"; private volatile String defaultCharset = DEFAULT_CHARSET; public FastJsonMessageConverter() { super(); //init(); } public void setDefaultCharset(String defaultCharset) { this.defaultCharset = (defaultCharset != null) ? defaultCharset : DEFAULT_CHARSET; } public Object fromMessage(Message message) throws MessageConversionException { return null; } public <T> T fromMessage(Message message,T t) { String json = ""; try { json = new String(message.getBody(),defaultCharset); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return (T) JSON.parseObject(json, t.getClass()); } protected Message createMessage(Object objectToConvert, MessageProperties messageProperties) throws MessageConversionException { byte[] bytes = null; try { String jsonString = JSON.toJSONString(objectToConvert); bytes = jsonString.getBytes(this.defaultCharset); } catch (UnsupportedEncodingException e) { throw new MessageConversionException( "Failed to convert Message content", e); } messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setContentEncoding(this.defaultCharset); if (bytes != null) { messageProperties.setContentLength(bytes.length); } return new Message(bytes, messageProperties); } }
六、RabbmitmqQueueListener.java #消费消息sql
public class RabbmitmqQueueListener implements MessageListener { public static final Logger log = Logger.getLogger(RabbmitmqQueueListener.class); public void onMessage(Message message) { try { String msg = new String(message.getBody(), "utf-8"); log.error(msg); Thread.sleep(2000); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
七、RabbitmqController.java #生产消息数据库
@Controller @RequestMapping("rabbitmq") public class RabbitmqController { public static final Logger log = Logger.getLogger(RabbitmqController.class); @Autowired private AmqpTemplate amqpTemplate; @RequestMapping(value = "/testRabbitmq", produces = "application/json;charset=UTF-8") @ResponseBody public String testRabbitmq(HttpServletRequest request, HttpServletResponse response){ String key = request.getParameter("key"); Message msg = new Message(key.getBytes(), new org.springframework.amqp.core.MessageProperties()); // amqpTemplate.send(msg); //1.测试direct exchange // amqpTemplate.convertAndSend("lanyangyang_queue_key", msg); //2.测试topic exchange amqpTemplate.convertAndSend("#.log.#", msg); log.info("消息发送成功"); return "success"; } }
ps:能够经过访问http://192.168.1.20:15672,查看消息的产生和消费状况,以及queue和exchange的状况apache