书接手写MQ框架(二)-服务端实现 ,前面介绍了服务端的实现。可是具体使用框架过程当中,用户确定是以客户端的形式跟服务端打交道的。客户端的好坏直接影响了框架使用的便利性。html
虽然框架目前是经过web的形式提供功能的,可是某的目标实际上是经过socket实现,因此不只须要有客户端,还要包装一下,让用户在使用过程当中不须要关心服务端是如何实现的。java
简单来讲,就是客户端使用必须方便。git
目前客户端的核心功能是HttpUtil这个类,使用httpClient实现的,主要是为了请求服务端。web
具体实现以下:数据库
package com.shuimutong.gmq.client.util; import java.io.IOException; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.http.HttpEntity; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.utils.URIBuilder; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.shuimutong.gmq.client.bean.HttpResponseBean; import com.shuimutong.gutil.common.GUtilCommonUtil; /** * http请求工具类 * @ClassName: HttpUtil * @Description:(这里用一句话描述这个类的做用) * @author: 水木桶 * @date: 2019年10月29日 下午9:43:54 * @Copyright: 2019 [水木桶] All rights reserved. */ public class HttpUtil { private final static Logger log = LoggerFactory.getLogger(HttpUtil.class); private static CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal(); static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { HTTP_CLIENT.close(); } catch (IOException e) { log.error("HTTP_CLIENT-closeException", e); } } }); } /** * get请求 * * @param url * @return * @throws IOException */ public static HttpResponseBean get(String url) throws IOException { HttpResponseBean responseBean = null; HttpGet httpGet = new HttpGet(url); CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet); try { HttpEntity httpEntity = res.getEntity(); String body = EntityUtils.toString(httpEntity); responseBean = new HttpResponseBean(res.getStatusLine(), body); EntityUtils.consume(httpEntity); } finally { res.close(); } return responseBean; } /** * 带参数的get请求 * @param url * @param requsetParams * @return * @throws IOException * @throws URISyntaxException */ public static HttpResponseBean get(String url, Map<String, String> requsetParams) throws IOException { HttpResponseBean responseBean = null; HttpGet httpGet; try { URIBuilder uriBuilder = new URIBuilder(url); if(!GUtilCommonUtil.checkListEmpty(requsetParams)) { List<NameValuePair> nvps = new ArrayList<NameValuePair>(); requsetParams.forEach((k,v) -> { nvps.add(new BasicNameValuePair(k, v)); }); uriBuilder.setParameters(nvps); } httpGet = new HttpGet(uriBuilder.build()); } catch (Exception e) { throw new IOException(e); } CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet); try { HttpEntity httpEntity = res.getEntity(); String body = EntityUtils.toString(httpEntity); responseBean = new HttpResponseBean(res.getStatusLine(), body); EntityUtils.consume(httpEntity); } finally { res.close(); } return responseBean; } /** * post请求 * @param url * @param requsetParams * @return * @throws IOException */ public static HttpResponseBean post(String url, Map<String, String> requsetParams) throws IOException { HttpResponseBean responseBean = null; HttpPost httpPost = new HttpPost(url); if(!GUtilCommonUtil.checkListEmpty(requsetParams)) { List<NameValuePair> nvps = new ArrayList<NameValuePair>(); requsetParams.forEach((k,v) -> { nvps.add(new BasicNameValuePair(k, v)); }); httpPost.setEntity(new UrlEncodedFormEntity(nvps)); } CloseableHttpResponse response = HTTP_CLIENT.execute(httpPost); try { HttpEntity httpEntity = response.getEntity(); String body = EntityUtils.toString(httpEntity); responseBean = new HttpResponseBean(response.getStatusLine(), body); EntityUtils.consume(httpEntity); } finally { response.close(); } return responseBean; } }
封装了get请求和post请求,封装了响应结果。apache
加了一个钩子,在jvm关闭时可以主动关闭建立的资源。缓存
这两部分主要就是调用上面的HttpUtil,而后将结果包装一下。mvc
具体代码请参考前文的git。框架
为了使得用户不须要关心具体实现,因此建了实例管理类。jvm
package com.shuimutong.gmq.client.util; import com.shuimutong.gmq.client.cache.CommonObjCache; import com.shuimutong.gmq.client.cache.impl.CommonObjCacheImpl; import com.shuimutong.gmq.client.consumer.GmqConsumer; import com.shuimutong.gmq.client.producer.GmqProducer; public class GmqInstanceManage { public static GmqProducer getGmqProducer(String gmqServerUrl) { return new GmqProducer(gmqServerUrl); } public static GmqConsumer getGmqConsumer(String gmqServerUrl) { return new GmqConsumer(gmqServerUrl); } public static CommonObjCache getCommonCache(String serverUrl) { return new CommonObjCacheImpl(serverUrl); } }
主要是为了封装变化。由于以后再迭代的话,实例的具体实现确定不是目前这么简单,因此要尽可能让使用者少关心具体实现。
使用时关心的越多,后续项目迭代确定越困难。
@Test public void produceMsg() { GmqProducer producer = GmqInstanceManage.getGmqProducer(gmqServerUrl); for(int i=0; i<5; i++) { String message = "message:" + i; try { SendMqResult res = producer.sendMq(topic, message); System.out.println(res.getRes()); } catch (SendMqException e) { e.printStackTrace(); } } }
二、消费消息
主要思路是:消费消息以前,先查询当前已经消费到了哪条消息。消息消费以后,将消费的编号存入缓存。
典型的主动拉消息,消息是否消费由本身负责的模式。
实现以下:
@Test public void comsumerMsgByCache() { GmqConsumer comsumer = GmqInstanceManage.getGmqConsumer(gmqServerUrl); CommonObjCache commonCache = GmqInstanceManage.getCommonCache(gmqServerUrl); String gmqSign = "gmq_consumer_id"; long consumerId = 0; int size = 2; for(int i=0; i<5; i++) { try { CacheObj cacheId = commonCache.getById(gmqSign); if(cacheId != null) { consumerId = Long.parseLong(cacheId.getContent()); } List<MqContent> res = comsumer.getMq(topic, consumerId, size); for(MqContent mq : res) { System.out.println(JSONObject.toJSONString(mq)); if(mq.getId() > consumerId) { consumerId = mq.getId(); } } commonCache.save(gmqSign, String.valueOf(consumerId)); System.out.println("保存consumerId:" + consumerId); } catch (Exception e) { e.printStackTrace(); } } }
gmq的第一版至今已经完成,固然这只是开始。
后续计划先将gmvc框架替换掉,直接使用netty进行通讯。
而后把消息存到数据库改成存到磁盘上。
而后就是服务的高可用改造。
届时欢迎指导。
第2版设计、开发中……