Kafka是一个持久化消息发布订阅系统,经常使用于消息队列、日志通道等场景java
1 Producer: 特指消息的生产者node
2 Consumer :特指消息的消费者web
3 Consumer Group :消费者组,能够并行消费Topic中partition的消息redis
4 Broker:缓存代理,Kafa 集群中的一台或多台服务器统称为 broker。spring
5 Topic:特指 Kafka 处理的消息源(feeds of messages)的不一样分类。apache
6 Partition:Topic 物理上的分组,一个 topic 能够分为多个 partition,每一个 partition 是一个有序的队列。partition 中的每条消息都会被分配一个有序的 id(offset)json
7 Message:消息,是通讯的基本单位,每一个 producer 能够向一个 topic(主题)发布一些消息bootstrap
8 稀疏索引:采用稀疏索引的方式,利用二分查找,定位消息。windows
@EnableBinding注解,绑定消息通道。该注解用来指定一个或者多个定义了@Input或@Output注解的接口。缓存
@EnableBinding(Sink.class),绑定了Sink接口,Sink接口是Spring Cloud 中默认绑定输入通道,除此以外,还有绑定输出通道Source,还有绑定输入输出通道的Processor通道。除了Spring Cloud定义的接口外,咱们也能够自定义。
@StreamListener注解是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名
zookeeper安装
进入Zookeeper设置目录,笔者D:\Java\Tool\zookeeper-3.4.6\conf
将“zoo_sample.cfg”重命名为“zoo.cfg”
在任意文本编辑器(如notepad)中打开zoo.cfg
找到并编辑dataDir=D:\\Java\\Tool\\zookeeper-3.4.6\\tmp
与Java中的作法相似,咱们在系统环境变量中添加:
a. 在系统变量中添加ZOOKEEPER_HOME = D:\Java\Tool\zookeeper-3.4.6
b. 编辑path系统变量,添加为路径%ZOOKEEPER_HOME%\bin;
在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)
启动zookeeper
启动kafka(重要:请确保在启动Kafka服务器前,Zookeeper实例已经准备好并开始运行)
.\bin\windows\kafka-server-start.bat .\config\server.properties
--------------- 注册中心server1启动类
package org.eureka.server;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
@EnableConfigServer
public class EurekaServerStarter {
public static void main(String[] args) {
new SpringApplicationBuilder(EurekaServerStarter.class).run(args);
}
}
--------------------------application.yml
server:
port: 8761
# 是否要开启基本的鉴权
security:
basic:
enabled: false
user:
name: admin
password: 123456
management:
security:
enabled: false
spring:
profiles:
active: peer2,native
cloud:
config:
server:
native:
search-locations: file:///D:/Users/xuzhi268/zhongchou/config_profiles
eureka:
instance:
hostname: peer1
lease-renewal-interval-in-seconds: 30 #指定续约更新频率,默认是 30s
environment: dev
client:
register-with-eureka: false # 禁用eureka做为客户端注册本身
fetch-registry: false # 表示是否从eureka server获取注册信息,若是是单一节点,不须要同步其余eureka server节点,则能够设置为false,但此处为集群,应该设置为true,默认为true,可不设置
serviceUrl:
defaultZone: http://peer1:8761/eureka/,http://peer2:8766/eureka/
#http://admin:123456@peer1:8761/eureka/,http://admin:123456@peer2:8766/eureka/ #多个用逗号隔开
-------------------------- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.eureka.server</groupId>
<artifactId>eureka_server</artifactId>
<version>1.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<!-- 开启基本的鉴权 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
--------------------- 注册中心server2
启动类
package org.eureka.server;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
@EnableConfigServer
public class EurekaServerStartRunner {
public static void main(String[] args) {
new SpringApplicationBuilder(EurekaServerStartRunner.class).run(args);
}
}
--------------------- application.yml
server:
port: 8766
security:
basic:
enabled: false
user:
name: admin
password: 123456
management:
security:
enabled: false
spring:
application:
name: eureka
profiles:
active: peer1,native
cloud:
config:
server:
native:
search-locations: file:///D:/Users/xuzhi268/zhongchou/config_profiles
eureka:
instance:
hostname: peer2
lease-renewal-interval-in-seconds: 30
environment: dev
client:
register-with-eureka: false
fetch-registry: false
serviceUrl:
defaultZone: http://peer1:8761/eureka/,http://peer2:8766/eureka/
#http://admin:123456@peer1:8761/eureka/,http://admin:123456@peer2:8766/eureka/
-------------------------pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.eureka.server</groupId>
<artifactId>eureka_server1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.7.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
</project>
--------------- 服务提供者代码
package com.cloud.eureka.client.ucenter;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@SpringBootApplication
@EnableEurekaClient
public class UcenterApplicationRunner {
public static void main(String[] args) {
new SpringApplicationBuilder(UcenterApplicationRunner.class).properties("server.port=" + 8765).run(args);
}
}
------------------------------------------------------
package com.cloud.eureka.client.ucenter.service;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.cloud.eureka.client.ucenter.biz.UserInfoService;
import com.cloud.eureka.client.ucenter.domain.UserDto;
import com.cloud.eureka.client.ucenter.network.request.QueryUserByIdRequest;
import com.cloud.eureka.client.ucenter.network.response.QueryUserByIdResult;
@RestController
@RefreshScope
public class UcenterInfoService {
private final Logger logger = LoggerFactory.getLogger(UcenterInfoService.class);
@Autowired
private DiscoveryClient discoveryClient;
@Value(value = "${redis.host}")
private String redisHost;
@Value(value = "${redis.port}")
private String redisPort;
@Value("${server.port}")
private String port;
@RequestMapping(value = "/hello", method = RequestMethod.GET)
public String index() {
logger.info("server.port : " + port);
logger.info("redis host " + redisHost + ":" + redisPort);
ServiceInstance instance = discoveryClient.getLocalServiceInstance();
logger.info("<=-=-=-= ucenter server access index() " + this.getClass().getSimpleName() + " " + Thread.currentThread().getName());
logger.info("/hello, host:" + instance.getHost() + ", service_id:" + instance.getServiceId());
return "server.port : " + port;
}
@Autowired
private UserInfoService userInfoService;
@RequestMapping(value = "/queryUserInfo", method = RequestMethod.GET)
public UserDto queryUserInfo (@RequestParam("userId") Long userId) {
logger.info("server.port : " + port);
logger.info("redis host " + redisHost + ":" + redisPort);
logger.info("<=-=-=-= ucenter server access queryUserInfo " + this.getClass().getSimpleName()
+ " " + Thread.currentThread().getName());
logger.info("<<=-=-=-=>>恭喜用户 " + userId + " 你链接成功<<=-=-=-=>>");
return userInfoService.queryUserInfo();
}
@RequestMapping(value = "queryUserList", method = RequestMethod.POST)
public List<UserDto> queryUserList() {
logger.info("server.port : " + port);
logger.info("redis host " + redisHost + ":" + redisPort);
logger.info("<=-=-=-= ucenter server access queryUserList " + this.getClass().getSimpleName()
+ " " + Thread.currentThread().getName());
List<UserDto> ulist = userInfoService.queryUserList();
return ulist;
}
@RequestMapping(value = "/queryUserById", method = RequestMethod.POST)
public QueryUserByIdResult<UserDto> queryUserUserId(QueryUserByIdRequest request) {
logger.info("server.port : " + port);
logger.info("redis host " + redisHost + ":" + redisPort);
logger.info("<=-=-=-= ucenter server access queryUserInfo " + this.getClass().getSimpleName()
+ " " + Thread.currentThread().getName());
QueryUserByIdResult<UserDto> result = new QueryUserByIdResult<UserDto>();
UserDto dto = userInfoService.queryUserInfo();
result.setModel(dto);
result.setRespCode("000");
result.setRespMsg("查询成功 server.port : " + port);
return result;
}
}
---------------------------------消息消费者-------------
package com.cloud.eureka.client.ucenter.service.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import com.alibaba.fastjson.JSON;
import com.cloud.eureka.client.ucenter.biz.UserInfoService;
import com.cloud.eureka.client.ucenter.domain.MessageBody;
import com.cloud.eureka.client.ucenter.domain.UserDto;
@EnableBinding(Sink.class)
public class MsgSink {
@Autowired
private UserInfoService userInfoService;
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
System.out.println("==== 消费者消费消息开始 : " + message.getPayload());
String message2 = String.valueOf(message.getPayload());
MessageBody body = JSON.parseObject(message2, MessageBody.class);
System.out.println("json 2 message body " + body);
if (null != body) {
UserDto userDto = userInfoService.queryUserInfo();
System.out.println("查询用户[" + body.getUserName() + "]信息 : " + JSON.toJSONString(userDto));
System.out.println("发放指定产品【" + body.getProductId() + "】代金券数量为:" + body.getCouponNum());
}
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("==== 消费者消费消息应答,Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
----------------------
package com.cloud.eureka.client.ucenter.facade;
import java.util.List;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import com.cloud.eureka.client.ucenter.domain.UserDto;
import com.cloud.eureka.client.ucenter.network.request.QueryUserByIdRequest;
import com.cloud.eureka.client.ucenter.network.response.QueryUserByIdResult;
@FeignClient("ucenter")
public interface UcenterCloudFacade {
@RequestMapping(value = "/hello", method = RequestMethod.GET)
public String index();
@RequestMapping(value = "/queryUserInfo", method = RequestMethod.GET)
public UserDto queryUserInfo (@RequestParam("userId") Long userId);
@RequestMapping(value = "/queryUserList", method = RequestMethod.POST)
public List<UserDto> queryUserList();
@RequestMapping(value = "/queryUserById", method = RequestMethod.POST)
public QueryUserByIdResult<UserDto> queryUserById(QueryUserByIdRequest request);
}
------------------------------
package com.cloud.eureka.client.ucenter.domain;
public class MessageBody {
private Long userId;
private Long productId;
private Integer couponNum;
private String userName;
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public Long getProductId() {
return productId;
}
public void setProductId(Long productId) {
this.productId = productId;
}
public Integer getCouponNum() {
return couponNum;
}
public void setCouponNum(Integer couponNum) {
this.couponNum = couponNum;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
---------------------------
package com.cloud.eureka.client.ucenter.domain;
import java.io.Serializable;
public class UserDto implements Serializable {
/*** */
private static final long serialVersionUID = 8541673794025166248L;
private Long id;
private String userName;
private String address;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
------------------------------
package com.cloud.eureka.client.ucenter.biz;
import java.util.ArrayList;
import java.util.List;
import org.springframework.stereotype.Service;
import com.cloud.eureka.client.ucenter.domain.UserDto;
@Service
public class UserInfoService {
public UserDto queryUserInfo() {
UserDto dto = new UserDto();
dto.setId(1l);
dto.setUserName("李世民");
dto.setAddress("陕西省咸阳市紫禁城");
System.out.println("<<=-=-=-=>>恭喜,恭喜你链接成功<<=-=-=-=>>");
return dto;
}
public List<UserDto> queryUserList() {
UserDto dto = new UserDto();
dto.setId(1203586l);
dto.setUserName("李世民");
dto.setAddress("陕西省咸阳市紫禁城");
UserDto dto1 = new UserDto();
dto1.setId(1022589l);
dto1.setUserName("朱江明");
dto1.setAddress("江苏省南京市");
UserDto dto2 = new UserDto();
dto2.setId(1022575l);
dto2.setUserName("刘如海");
dto2.setAddress("湖北省省武汉市市");
List<UserDto> ulist = new ArrayList<>();
ulist.add(dto);
ulist.add(dto1);
ulist.add(dto2);
return ulist;
}
}
------------------- application.yml
#server:
#当前服务端口号
# port: 8762
spring:
application:
#当前应用名称
name: ucenter
cloud:
instance-count: 1
instance-index: 0
stream:
binder: kafka
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-add-partitions: false
auto-create-topics: true
min-partition-count: 1
bindings:
input:
destination: event_demo
group: s1
consumer:
concurrency: 1
partitioned: false
eureka:
client:
serviceUrl:
#注册中心的地址
defaultZone: http://peer1:8761/eureka/,http://peer2:8766/eureka/
--------------------- bootstrap.yml
#禁用配置中心权限验证
management:
security:
enabled: false
spring:
cloud:
config:
uri: http://localhost:8761/
feign:
httpclient:
enabled: true
max-connections: 200 # 默认值
max-connections-per-route: 50 # 默认值
---------------------------- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloud.eureka.client</groupId>
<artifactId>ucenter</artifactId>
<packaging>war</packaging>
<version>1.0.1-SNAPSHOT</version>
<name>ucenter Maven Webapp</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.7.RELEASE</version>
<relativePath/>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency> -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
<build>
<finalName>ucenter</finalName>
</build>
</project>
-------------------------- 客户端代码
package com.cloud.profile.third;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.web.client.RestTemplate;
/**
* @EnableDiscoveryClient :启用服务注册与发现
* @EnableFeignClients:启用feign进行远程调用
* Feign是一个声明式Web Service客户端。使用Feign能让编写Web Service客户端更加简单,
* 它的使用方法是定义一个接口,而后在上面添加注解,同时也支持JAX-RS标准的注解。Feign也支持可拔插式的编码器和解码器。
* Spring Cloud对Feign进行了封装,使其支持了Spring MVC标准注解和HttpMessageConverters。
* Feign能够与Eureka和Ribbon组合使用以支持负载均衡。
*/
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@ComponentScan(basePackages = {"com.cloud.profile.controller", "com.cloud.profile.third.kafka"})
public class ComsumerAppliactionRunner {
public static void main(String[] args) {
SpringApplication.run(ComsumerAppliactionRunner.class, args);
}
@Autowired
private RestTemplateBuilder builder;
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return builder.build();
}
}
----------------------- 消息发送端
package com.cloud.profile.third.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.cloud.profile.dto.MessageBody;
@RestController
public class ProducerController {
@Autowired
private SendService service;
@RequestMapping(value = "/send/{msg}", method = RequestMethod.GET)
public void send(@PathVariable("msg") String msg){
MessageBody body = new MessageBody();
body.setCouponNum(5);
body.setProductId(10023l);
body.setUserId(13809825l);
body.setUserName("赵敏");
System.out.println("==== 生产者, 开始发送消息:" + JSON.toJSONString(body));
service.sendMessage(JSON.toJSONString(body));
System.out.println("==== 生产者,发送消息结束。");
}
}
package com.cloud.profile.third.kafka;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(Source.class)
public class SendService {
@Autowired
private Source source;
public void sendMessage(final String msg) {
try {
System.out.println("==== 开始发送消息:" + msg);
Message<String> message = new Message<String>() {
@Override
public String getPayload() {
return msg;
}
@Override
public MessageHeaders getHeaders() {
Map<String, Object> headers = new HashMap<>();
headers.put(KafkaHeaders.ACKNOWLEDGMENT, "yes");
MessageHeaders header = new MessageHeaders(headers);
return header;
}
};
boolean ret = source.output().send(message);
//boolean ret = source.output().send(MessageBuilder.withPayload(msg).build());
System.out.println("==== 发送消息结束。" + (ret ? "发送成功":"发送失败"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
------------------------------------
package com.cloud.profile.factory;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.client.RestTemplate;
import com.alibaba.fastjson.JSON;
public class CloudServiceFactory {
private static final Logger logger = LoggerFactory.getLogger(CloudServiceFactory.class);
private static Map<String, Object> claxxCash = new ConcurrentHashMap<>();
public static <T> T createGetServerFactory (RestTemplate restTemplate,
String serviceName, String serverUrl, Class<T> resultObject, Map<String, Object> paramMap) {
T returnObject = null;
try {
StringBuffer serverRequestUri = new StringBuffer("http://" + serviceName);
serverRequestUri.append("/" + serverUrl);
if (null != paramMap && paramMap.entrySet().size() > 0) {
serverRequestUri.append("?");
for (Map.Entry<String, Object> reqParam : paramMap.entrySet()) {
serverRequestUri.append(reqParam.getKey() + "=" + reqParam.getValue() + "&");
}
serverRequestUri.deleteCharAt(serverRequestUri.length() - 1);
}
logger.info("create cloud server request Uri :" + serverRequestUri.toString());
returnObject = restTemplate.getForObject(serverRequestUri.toString(), resultObject);
logger.info("create cloud server result :" + JSON.toJSONString(returnObject));
} catch (Exception e) {
logger.error("create cloud server instance exception " + e.getMessage());
}
return returnObject;
}
@SuppressWarnings("unchecked")
public static <T> T createGetFactory (RestTemplate restTemplate, String serviceName, String serverUrl, Class<T> resultObject) {
T returnObject = null;
try {
if (null != claxxCash) {
Object object = claxxCash.get(resultObject.getSimpleName());
if (null != object) {
return (T) object;
}
}
returnObject = resultObject.newInstance();
StringBuffer serverRequestUri = new StringBuffer("http://" + serviceName);
returnObject = restTemplate.getForObject(serverRequestUri.toString(), resultObject);
} catch (Exception e) {
logger.error("create cloud server instance exception " + e.getMessage());
}
if (null != returnObject) {
claxxCash.put(resultObject.getSimpleName(), resultObject);
}
return returnObject;
}
@SuppressWarnings("unchecked")
public static <T> T createPostServerFactory (RestTemplate restTemplate, String serviceName,
String serverUrl, Object request, Class<T> resultObject, Object... params) {
T returnObject = null;
try {
if (null != claxxCash) {
Object object = claxxCash.get(resultObject.getSimpleName());
if (null != object) {
return (T) object;
}
}
returnObject = resultObject.newInstance();
StringBuffer serverRequestUri = new StringBuffer("http://" + serviceName).append("/" + serverUrl);
returnObject = restTemplate.postForEntity(serverRequestUri.toString(), request, resultObject, params).getBody();
} catch (Exception e) {
logger.error("create cloud server instance exception " + e.getMessage());
}
if (null != returnObject) {
claxxCash.put(resultObject.getSimpleName(), resultObject);
}
return returnObject;
}
}
package com.cloud.profile.controller;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import com.alibaba.fastjson.JSON;
import com.cloud.eureka.client.ucenter.domain.UserDto;
import com.cloud.eureka.client.ucenter.network.request.QueryUserByIdRequest;
import com.cloud.eureka.client.ucenter.network.response.QueryUserByIdResult;
import com.cloud.profile.factory.CloudServiceFactory;
import com.cloud.profile.third.UcenterThirdService;
@RestController
@RefreshScope
public class SystemController {
private static Logger logger = LoggerFactory.getLogger(SystemController.class);
@Autowired
private RestTemplate restTemplate;
@Autowired
private LoadBalancerClient loadBalancerClient;
@RequestMapping(value = "/queryUser", method = RequestMethod.GET)
public String queryUser(@RequestParam("userId") Long userId) {
CloudServiceFactory.createGetFactory(restTemplate, "", "", QueryUserByIdResult.class);
ServiceInstance serviceInstance = this.loadBalancerClient.choose("ucenter");
System.out.println("===" + ":" + serviceInstance.getServiceId() + ":" + serviceInstance.getHost() + ":"
+ serviceInstance.getPort());// 打印当前调用服务的信息
String ret = this.restTemplate.getForObject("http://ucenter/hello", String.class);
//("http://ucenter/queryUserById?userId=" +userId, QueryUserByIdResult.class);
QueryUserByIdRequest queryRequest = new QueryUserByIdRequest();
queryRequest.setUserId(userId);
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("userId", userId);
logger.info("<== 根据userId查询用户 queryRequest : " + JSON.toJSONString(queryRequest));
@SuppressWarnings("unchecked")
QueryUserByIdResult<UserDto> rest = CloudServiceFactory.createGetServerFactory(restTemplate,
"ucenter", "queryUserById", QueryUserByIdResult.class, paramMap);
logger.info("query Result : " + JSON.toJSONString(rest));
return JSON.toJSONString(rest);
}
@Autowired
private UcenterThirdService ucenterThirdService;
@RequestMapping(value = "/queryUserInfo", method = RequestMethod.GET)
public String queryUserInfo (@RequestParam("userId") Long userId) {
logger.info("profile response " + Thread.currentThread().getName() + " " +
this.getClass().getSimpleName() + JSON.toJSONString(ucenterThirdService.queryUserInfo(userId)));
logger.info(ucenterThirdService.index());
logger.info("profile response " + Thread.currentThread().getName() + " " + this.getClass().getSimpleName() + "<<=-=-=-=>>恭喜用户 " + userId + " 你链接成功<<=-=-=-=>>");
logger.info("profile response " + Thread.currentThread().getName() + " " +
this.getClass().getSimpleName() + JSON.toJSONString(ucenterThirdService.queryUserList()));
QueryUserByIdRequest queryRequest = new QueryUserByIdRequest();
queryRequest.setUserId(userId);
logger.info("<== 根据userId查询用户 queryRequest : " + JSON.toJSONString(queryRequest));
QueryUserByIdResult<UserDto> result = ucenterThirdService.queryUserById(queryRequest);
logger.info("<== 根据userId查询用户 result : " + JSON.toJSONString(result));
if (null != result && "000".equals(result.getRespCode())) {
logger.info("根据userId查询用户信息成功");
return JSON.toJSONString(result);
}
return "hello success";
}
@RequestMapping(value = "/queryUserById.do", method = RequestMethod.GET)
public String queryUserById(@RequestParam("userId") Long userId) {
QueryUserByIdRequest queryRequest = new QueryUserByIdRequest();
queryRequest.setUserId(userId);
logger.info("<== 根据userId查询用户 queryRequest : " + JSON.toJSONString(queryRequest));
QueryUserByIdResult<UserDto> result = ucenterThirdService.queryUserById(queryRequest);
logger.info("<== 根据userId查询用户 result : " + JSON.toJSONString(result));
if (null != result && "000".equals(result.getRespCode())) {
logger.info("根据userId查询用户信息成功");
return JSON.toJSONString(result);
}
return "error page";
}
}
-------------- application.porperties
server.port=8673
spring.application.name=profile
#spring.cloud.stream.instance-count=1
#spring.cloud.stream.instance-index=0
eureka.client.fetchRegistry=true
eureka.client.serviceUrl.defaultZone: http://localhost:8761/eureka/,http://localhost:8766/eureka/
eureka.client.registry-fetch-interval-seconds=30
eureka.instance.lease-expiration-duration-in-seconds=45
profile.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RandomRule
------------------bootstrap.yml
spring:
cloud:
instance-count: 1
instance-index: 0
stream:
binder: kafka
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
min-partition-count: 1
auto-create-topics: true
auto-add-partitions: false
bindings:
output:
destination: event_demo
content-type: application/json #text/plain;charset=UTF-8
producer:
partition-count: 1
#zi ding yi binder can shu
#spring.cloud.stream.bindings.<channelName>.binder=<binderName>
#spring.cloud.stream.binders.<binderName>.type=kafka
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.brokers=10.79.96.52:9092
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.zk-nodes=10.79.96.52:2182
----------------------- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloud.profile</groupId>
<artifactId>profile</artifactId>
<packaging>war</packaging>
<version>1.0.1-SNAPSHOT</version>
<name>profile Maven Webapp</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.7.RELEASE</version>
<relativePath/>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-ribbon</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency> -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
<build>
<finalName>profile</finalName>
</build>
</project>
#方式1
#spring:
# cloud:
# instance-count: 1
# instance-index: 0
# stream:
# binder: kafka
# kafka:
# binder:
# brokers: localhost:9092
# zk-nodes: localhost:2181
# min-partition-count: 1
# auto-create-topics: true
# auto-add-partitions: false
# bindings:
# output: # sourceA: 这里指的是通道名字 ; output 是 Source 默认的通道名字
# destination: mess_data # 目标主题
# content-type: application/json # text/plain;charset=UTF-8
# producer:
# partition-count: 1
#zi ding yi binder can shu
#spring.cloud.stream.bindings.<channelName>.binder=<binderName>
#spring.cloud.stream.binders.<binderName>.type=kafka
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.zk-nodes=localhost:2182
spring.cloud.stream.bindings.sourceA.binder=messA
spring.cloud.stream.bindings.sourceA.destination=messA_data
spring.cloud.stream.bindings.sourceA.producer.partition-count=1
spring.cloud.stream.bindings.sourceA.producer.partitioned=false
spring.cloud.stream.binders.messA.type=kafka
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.zk-nodes=localhost:2181
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.auto-add-partitions=false
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.auto-create-topics=true
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.min-partition-count: 1
自定义渠道
package com.cloud.profile.third.kafka;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface SourceOutput { String OUT_PUT = "sourceA"; @Output(SourceOutput.OUT_PUT) MessageChannel output(); }