在多个内网系统之上,增长一个网关服务,统一对第三方应用进行鉴权与认证,方可对内部资源服务进行访问,网关服务主要起到鉴权认证,请求转发主要借助Servlet3.0的异步特性实现,结合springboot进行开发。web
同步请求会将整个请求链路的发起,解析,响应在一个同步逻辑中进行。spring
采用异步处化能够将请求中耗时操做交给线程池作异步处理,在高并发场景下,经过调用一个非web服务线程处理耗时逻辑,提升系统并发性。apache
因为线程池是隔离的,能够对线程池作业务隔离分组,进行请求分级,监控等。json
以前有几篇文章介绍了认证和鉴权的实现思路,可参考系统鉴权流程及签名生成规则,公网API安全--OAuth认证,互联网通用架构技术----公网API安全规范。api
转发的思路主要但愿能够将客户端请求直接转发到业务系统,网关系统对于请求api,经过识别入参的条件进行不一样业务系统的路由,请求api不作干扰直接转发。安全
经过业务线程池接收请求,将任务提交到线程池。springboot
@RequestMapping("/book") public void getBook( HttpServletRequest request, @RequestParam(value="skuId") final Long skuId, @RequestParam(value="cat1") final Integer cat1, @RequestParam(value="cat2") final Integer cat2) throws Exception { oneLevelAsyncContext.submitFuture(request, () -> bookService.getBook(skuId, cat1, cat2)); }
业务线程池封装。架构
public void submitFuture( final HttpServletRequest req, final Callable<Object> task) { final String uri = req.getRequestURI(); final Map<String, String[]> params = req.getParameterMap(); final AsyncContext asyncContext = req.startAsync(); //开启异步上下文 asyncContext.getRequest().setAttribute("uri", uri); asyncContext.getRequest().setAttribute("params", params); asyncContext.setTimeout(asyncTimeoutInSeconds * 1000); if(asyncListener != null) { asyncContext.addListener(asyncListener); } executor.submit(new CanceledCallable(asyncContext) { //提交任务给业务线程池 @Override public Object call() throws Exception { Object o = task.call(); //业务处理调用 if(o == null) { callBack(asyncContext, o, uri, params); //业务完成后,响应处理 } if(o instanceof CompletableFuture) { CompletableFuture<Object> future = (CompletableFuture<Object>)o; future.thenAccept(resultObject -> callBack(asyncContext, resultObject, uri, params)) .exceptionally(e -> { callBack(asyncContext, "", uri, params); return null; }); } else if(o instanceof String) { callBack(asyncContext, o, uri, params); } return null; } }); } private void callBack( AsyncContext asyncContext, Object result, String uri, Map<String, String[]> params) { HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse(); try { if(result instanceof String) { write(resp, (String)result); } else { write(resp, JSONUtils.toJSON(result)); } } catch (Throwable e) { resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); //程序内部错误 try { LOG.error("get info error, uri : {}, params : {}", uri, JSONUtils.toJSON(params), e); } catch (Exception ex) { } } finally { asyncContext.complete(); } }
线程池初始化。并发
@Override public void afterPropertiesSet() throws Exception { String[] poolSizes = poolSize.split("-"); //初始线程池大小 int corePoolSize = Integer.valueOf(poolSizes[0]); //最大线程池大小 int maximumPoolSize = Integer.valueOf(poolSizes[1]); queue = new LinkedBlockingDeque<Runnable>(queueCapacity); executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTimeInSeconds, TimeUnit.SECONDS, queue); executor.allowCoreThreadTimeOut(true); executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if(r instanceof CanceledCallable) { CanceledCallable cc = ((CanceledCallable) r); AsyncContext asyncContext = cc.asyncContext; if(asyncContext != null) { try { String uri = (String) asyncContext.getRequest().getAttribute("uri"); Map params = (Map) asyncContext.getRequest().getAttribute("params"); LOG.error("async request rejected, uri : {}, params : {}", uri, JSONUtils.toJSON(params)); } catch (Exception e) {} try { HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse(); resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } finally { asyncContext.complete(); } } } } }); if(asyncListener == null) { asyncListener = new AsyncListener() { @Override public void onComplete(AsyncEvent event) throws IOException { } @Override public void onTimeout(AsyncEvent event) throws IOException { AsyncContext asyncContext = event.getAsyncContext(); try { String uri = (String) asyncContext.getRequest().getAttribute("uri"); Map params = (Map) asyncContext.getRequest().getAttribute("params"); LOG.error("async request timeout, uri : {}, params : {}", uri, JSONUtils.toJSON(params)); } catch (Exception e) {} try { HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse(); resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } finally { asyncContext.complete(); } } @Override public void onError(AsyncEvent event) throws IOException { AsyncContext asyncContext = event.getAsyncContext(); try { String uri = (String) asyncContext.getRequest().getAttribute("uri"); Map params = (Map) asyncContext.getRequest().getAttribute("params"); LOG.error("async request error, uri : {}, params : {}", uri, JSONUtils.toJSON(params)); } catch (Exception e) {} try { HttpServletResponse resp = (HttpServletResponse) asyncContext.getResponse(); resp.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } finally { asyncContext.complete(); } } @Override public void onStartAsync(AsyncEvent event) throws IOException { } }; } }
/** * Description * * @author Mr. Chun. */ @WebListener public class AppContextListener implements ServletContextListener { /** * 经过ContextListener进行线程池初始化 * * @param servletContextEvent */ @Override public void contextInitialized(ServletContextEvent servletContextEvent) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 100, 200, 50000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100)); servletContextEvent.getServletContext().setAttribute("executor", executor); } /** * 经过ContextListener进行线程池销毁 * @param servletContextEvent */ @Override public void contextDestroyed(ServletContextEvent servletContextEvent) { ThreadPoolExecutor executor = (ThreadPoolExecutor) servletContextEvent.getServletContext().getAttribute("executor"); executor.shutdown(); } }
/** * Description * ... * @author Mr. Chun. */ @WebServlet(urlPatterns = "/qbs/route", asyncSupported = true) public class AsyncLongRunningServlet extends HttpServlet { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory.getLogger(AsyncLongRunningServlet.class); @Autowired private RestTemplate restTemplate; @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { doGet(req, resp); } @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { logger.info("==== 进入Servlet的时间:" + new Date() + " ===="); long startTime = System.currentTimeMillis(); logger.info("AsyncLongRunningServlet Start::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId()); req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true); //在子线程中执行业务调用,并由其负责输出响应,主线程退出 AsyncContext ctx = req.startAsync(); ctx.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent asyncEvent) throws IOException { System.out.println("AppAsyncListener onComplete"); } @Override public void onTimeout(AsyncEvent asyncEvent) throws IOException { System.out.println("AppAsyncListener onTimeout"); ServletResponse response = asyncEvent.getAsyncContext().getResponse(); response.setCharacterEncoding("UTF-8"); response.setContentType(MediaType.APPLICATION_JSON.toString()); PrintWriter out = null; try { out = response.getWriter(); out.print(ResponseBuilder.buildJsonString("请求超时")); } catch (IOException e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } } } @Override public void onError(AsyncEvent asyncEvent) throws IOException { System.out.println("AppAsyncListener onError"); ServletResponse response = asyncEvent.getAsyncContext().getResponse(); response.setCharacterEncoding("UTF-8"); response.setContentType(MediaType.APPLICATION_JSON.toString()); PrintWriter out = null; try { out = response.getWriter(); out.print(ResponseBuilder.buildJsonString("请求异常")); } catch (IOException e) { e.printStackTrace(); } finally { if (out != null) { out.close(); } } } @Override public void onStartAsync(AsyncEvent asyncEvent) throws IOException { System.out.println("AppAsyncListener onStartAsync"); } }); ctx.setTimeout(9000); ThreadPoolExecutor executor = (ThreadPoolExecutor) req.getServletContext().getAttribute("executor"); executor.execute(new AsyncRequestProcessor(restTemplate, ctx, req.getMethod(), req.getParameter("api"))); // 任务提交线程池 long endTime = System.currentTimeMillis(); logger.info("AsyncLongRunningServlet End::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId() + "::Time Taken=" + (endTime - startTime) + " ms."); logger.info("==== 结束Servlet的时间:" + new Date() + " ===="); } }
/** * Description * ... * * @author Mr. Chun. */ public class AsyncRequestProcessor implements Runnable { private static final Logger logger = LoggerFactory.getLogger(AsyncRequestProcessor.class); private String url = "http://localhost:8080/"; private RestTemplate restTemplate; private AsyncContext ctx = null; private String requestMethod = ""; public AsyncRequestProcessor(RestTemplate restTemplate, AsyncContext ctx, String requestMethod, String api) { this.restTemplate = restTemplate; this.ctx = ctx; this.requestMethod = requestMethod; url = url + api; } // 业务请求转发在这里处理 public void run() { try { long startTime = System.currentTimeMillis(); logger.info("AsyncLongRunningServlet Start::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId()); String url = this.api.replace("/qbs/api/", ""); String key = url; String param = ""; if (url.contains("/")) { key = url.substring(0, url.indexOf("/")); param = url.substring(url.indexOf("/"), url.length()); } this.api = routeService.getRoute(key) + param; if (!StringUtils.isEmpty(this.api)) { String json = ""; logger.info("======"); // 请求入参 MultiValueMap<String, String> paramMap = ResponseBuilder.getUsefulParam(ctx.getRequest().getParameterMap()); String requestMethod = request.getMethod(); String contentType = request.getContentType(); if ("GET".equals(requestMethod)) { // GET 请求 if (paramMap.size() > 0) { api = ResponseBuilder.buildGetUrl(api, paramMap); } logger.info("PARAM url: {} param: {}", api, paramMap); json = restTemplate.getForObject(api, String.class, paramMap); } else if ("POST".equals(requestMethod)) { // POST 请求 logger.info("PARAM url: {} param: {}", api, paramMap); HttpHeaders headers = new HttpHeaders(); headers.setContentType(contentType.equals("application/json") ? MediaType.APPLICATION_JSON : MediaType.APPLICATION_FORM_URLENCODED); if (contentType.equals("application/json")) { // json 提交 StringBuffer sb = new StringBuffer(""); String temp; BufferedReader br = new BufferedReader(new InputStreamReader(request.getInputStream(), "utf-8")); while ((temp = br.readLine()) != null) { sb.append(temp); } br.close(); String body = sb.toString(); HttpEntity<String> formEntity = new HttpEntity<>(body, headers); json = restTemplate.postForObject(api, formEntity, String.class); } else { // form 表单提交 HttpEntity<MultiValueMap<String, String>> request = new HttpEntity<>(paramMap, headers); ResponseEntity<String> response = restTemplate.postForEntity(api, request, String.class); json = response.getBody(); } } logger.info("======"); logger.info("RESULT json: {}", json); ResponseBuilder.responseWrite((HttpServletResponse) ctx.getResponse(), json); } else { logger.info("key: {}", key); ResponseBuilder.responseWrite((HttpServletResponse) ctx.getResponse(), ResponseBuilder.buildJsonString(400, "key无效,key: " + key)); } ctx.complete(); // 通知容器,异步处理完成 logger.info("======"); long endTime = System.currentTimeMillis(); logger.info("AsyncLongRunningServlet End::Name=" + Thread.currentThread().getName() + "::ID=" + Thread.currentThread().getId() + "::Time Taken=" + (endTime - startTime) + " ms."); } catch (Exception e) { logger.error("AsyncExecutor e: " + e.getMessage()); } } }
能够发如今请求进入以后将业务放到线程池中异步执行,请求退出,业务处理完成以后进行响应,转发和响应异步化。 异步化以后吞吐量提高了,可是响应时间长了,也就是异步化并不会提高响应时间,可是会增长吞吐量和增长咱们须要的灵活性。app