车辆网项目架构设计
系统概述
本系统主要使用Spring Cloud技术,包含的模块 Gateway(网关),Nacos(微服务注册与发现),OpenFeign(HTTP+Restful客户端),Hystrix(断路器),ribbon(负载均衡),Security+OAuth2(安全认证),Kafka(消息队列),MybatisPlus(对象关系映射),Redis(缓存数据库),Netty等组件。html
系统架构图
组件功能简介
Gateway:Web服务的统一入口,进行消息的转发和限流操做
Nacos:微服务的注册中心和数据中心,提供服务的发现和注册,数据的统一配置
OpenFeign:HTTP+Restfull客户端,实现服务之间的调用
Hystrix:断路器,实现服务的熔断和降级
Ribbon:微服务调用的负载均衡
Security+OAuth2:提供微服务调用的认证和受权
Kafka:消息中间件,缓存终端数据,支持大并发
MybatisPlus:对象关系映射,用于访问数据库
Redis:数据缓存服务器,内存数据库,并发量大
Netty:JavaNio架构,实现终端的Socket链接,支持更大的链接数
vue
项目工程结构
- obd车联网项目
- auth 认证服务器
- cloudcore Spring cloud 核心项目
- common 工具类项目
- nettysocket 终端数据接收项目
- obd-feign-api OpenFeign接口项目
- obd-feign-client OpenFeign客户端项目
- obd-gateway 网关
- obd-member-auth app帐户认证中心
- obd-task 定时任务
- obd-terminal-simulator 终端模拟器
- obd-third-park 三方服务项目
- obd-zhb 真惠保项目
- 真惠保APP服务项目
- 真惠保后台管理项目
- portal 车辆网后台项目
- protoolanalysis 协议分析器
Authren认证服务器
- 客户端认证配置
@EnableAuthorizationServer @Configuration public class AuthServerConfig extends AuthorizationServerConfigurerAdapter{ @Autowired AuthenticationManager authenticationManager; @Autowired RedisConnectionFactory connectionFactory; @Autowired private DataSource dataSource; @Override public void configure(ClientDetailsServiceConfigurer clients) throws Exception { clients.jdbc(dataSource); } //配置AuthorizationServer tokenServices @Override public void configure(AuthorizationServerEndpointsConfigurer endpoints)throws Exception { endpoints.tokenStore(redisTokenStore()) .accessTokenConverter(accessTokenConverter()) .authenticationManager(authenticationManager) //禁用刷新令牌 .reuseRefreshTokens(false); } //定义TokenStore 使用redis存储token @Bean public TokenStore redisTokenStore() { RedisTokenStore redisTokenStore = new RedisTokenStore(connectionFactory); //token key生成规则 redisTokenStore.setAuthenticationKeyGenerator(oAuth2Authentication -> UUID.randomUUID().toString()); return redisTokenStore; } //token 封装 @Bean public JwtAccessTokenConverter accessTokenConverter() { JwtAccessTokenConverter converter = new JwtAccessTokenConverter(); converter.setSigningKey("******"); return converter; } //认证请求设置 @Override public void configure(AuthorizationServerSecurityConfigurer security) throws Exception { //容许全部人请求令牌 //以验证的客户端才能请求check_token security.tokenKeyAccess("permitAll()") .checkTokenAccess("isAuthenticated()") .allowFormAuthenticationForClients(); } }
- 登陆帐户认证设置
@Configuration @EnableWebSecurity public class SecurityConfig extends WebSecurityConfigurerAdapter { @Autowired private UserDetailsService userDetailsService; @Override protected void configure(AuthenticationManagerBuilder auth) throws Exception { //设置服务认证提供者 auth.authenticationProvider(authenticationProvider()); } /** * @return 封装身份认证提供者 */ @Bean public DaoAuthenticationProvider authenticationProvider() { DaoAuthenticationProvider authenticationProvider = new DaoAuthenticationProvider(); //设置用户加载服务类 authenticationProvider.setUserDetailsService(userDetailsService); //设置加密类 authenticationProvider.setPasswordEncoder(passwordEncoder()); return authenticationProvider; } @Bean @Override public AuthenticationManager authenticationManagerBean() throws Exception { //使用父级认证管理器 AuthenticationManager manager = super.authenticationManagerBean(); return manager; } @Override protected void configure(HttpSecurity http) throws Exception { //容许访问/oauth受权接口 http.csrf().disable() //设置会话管理器,不是用HttpSession .sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS) .and() .requestMatchers().anyRequest() .and() .formLogin().permitAll() .and() .authorizeRequests() //调用认证接口 不须要认证 .antMatchers("/oauth/*").permitAll() .and(); } //配置密码解码器 @Bean public BCryptPasswordEncoder passwordEncoder() { return new MyPasswordEncoder(); } }
- 用户加载服务类设置
@Service public class MyUserDetailsService implements UserDetailsService { @Autowired private SUserInfoMapper userInfoMapper; @Override public UserDetails loadUserByUsername(String username) throws UsernameNotFoundException { QueryWrapper<SUserInfo> qw = new QueryWrapper<SUserInfo>(); qw.eq("userName", username); qw.ne("status", 9); SUserInfo sysUser = userInfoMapper.selectOne(qw); if(sysUser == null) { throw new UsernameNotFoundException("用户不存在"); } User user = new User(); user.setSysUser(sysUser); //不提供受权 user.setAuthorities(new ArrayList<>()); return user; } }
后台服务器配置
- 资源服务器设置
@Configuration @EnableResourceServer//开启资源服务器 @EnableGlobalMethodSecurity(prePostEnabled = true)//开启方法级别的校验 https://www.cnblogs.com/felordcn/p/12142497.html public class ResourceServerConfig extends ResourceServerConfigurerAdapter{ @Autowired RestTemplate resourceRestTemplate; //本地受权服务 @Autowired MyLocalUserAuthoritiesService userAuthoritiesService; @Override public void configure(ResourceServerSecurityConfigurer resources) { resources .tokenStore(new JwtTokenStore(accessTokenConverter())) .stateless(true); //配置RemoteTokenServices, 用于向AuthorizationServer验证令牌 MyRemoteTokenServices tokenServices = new MyRemoteTokenServices(userAuthoritiesService); tokenServices.setAccessTokenConverter(accessTokenConverter()); //为restTemplate配置异常处理器,忽略400错误 resourceRestTemplate.setErrorHandler(new DefaultResponseErrorHandler() { @Override //忽略 400 public void handleError(ClientHttpResponse response) throws IOException { if(response.getRawStatusCode() != 400) { super.handleError(response); } } }); tokenServices.setRestTemplate(resourceRestTemplate); //设置认证服务器地址 tokenServices.setCheckTokenEndpointUrl("http://"+NacosServerHostConstant.AUTH_SERVER_NAME+NacosServerHostConstant.AUTH_SERVER_ADDRESS+"/oauth/check_token"); //客户端id tokenServices.setClientId(OAuth2ClientEnum.OBD_PORTAL.getClientId()); //客户端密码 tokenServices.setClientSecret(OAuth2ClientEnum.OBD_PORTAL.getPassword()); //无状态 resources.tokenServices(tokenServices).stateless(true); //设置资源服务id resources.resourceId(OAuth2ClientEnum.OBD_PORTAL.getClientId()); } //token封装 @Bean public JwtAccessTokenConverter accessTokenConverter() { JwtAccessTokenConverter converter = new JwtAccessTokenConverter(); converter.setSigningKey("*****"); return converter; } @LoadBalanced @Bean public RestTemplate resourceRestTemplate() { return new RestTemplate(); } //资源请求路径设置 @Override public void configure(HttpSecurity http) throws Exception { //容许跨域 http.cors(); //配置资源服务器拦截规则 http.sessionManagement().sessionCreationPolicy(SessionCreationPolicy.STATELESS) .and() .requestMatchers().anyRequest() .and() .anonymous() .and() .authorizeRequests() //设置不须要认证的请求路径 .antMatchers("/login/loginByPassword.vue","/**/*.vueNologin").permitAll() .anyRequest().authenticated() .and() .exceptionHandling().accessDeniedHandler(new OAuth2AccessDeniedHandler()); } }
- 本地受权服务
@Component public class MyLocalUserAuthoritiesServiceImpl implements MyLocalUserAuthoritiesService { @Autowired private SUserInfoMapper sysUserMapper; @Autowired private SMenuMapper sysMenuMapper; /** * redis客户端 */ @Autowired private RedisClient redisClient; @SuppressWarnings("unchecked") @Override public List<String> loadUserAuthoritiesByUserName(String accessToken ,String userName) { //缓存中获取权限列表 List<String> authorityList = (List<String>) redisClient.getValue(RedisKeyPreConstant.USER_LOGIN_OAUTH, accessToken); //缓存中没有 从数据库中获取 if(authorityList == null) { QueryWrapper<SUserInfo> qw = new QueryWrapper<SUserInfo>(); qw.eq("userName", userName); SUserInfo sysUser = sysUserMapper.selectOne(qw); if(sysUser == null) { throw new UsernameNotFoundException("用户不存在"); } authorityList = sysMenuMapper.getPermsListByUserId(sysUser.getUserId()); //获取token对象 AuthAccessToken saveToken = (AuthAccessToken) redisClient.getValue(RedisKeyPreConstant.USER_LOGIN_TOKEN, accessToken); if(saveToken != null) { //有效时间和token的有效时间一直 redisClient.setValue(RedisKeyPreConstant.USER_LOGIN_OAUTH, accessToken,authorityList,saveToken.getExpires_in()-(System.currentTimeMillis()-saveToken.getCreate_time())); }else { //设置默认的有效时间 redisClient.setValue(RedisKeyPreConstant.USER_LOGIN_OAUTH, accessToken,authorityList,Constant.AUNTH_MESSAGE_IN_REDIS_TIME); } } return authorityList; } }
- 控制类设置
@RestController @RequestMapping("sys/user") public class UserController extends BaseController{ @Autowired private UserService userService; /** * 用户分页查询 * @param loginUser 登陆用户 * @param userName 用户名 * @param pageModel 分页参数 * @return */ @RequestMapping("getSysUserPageList.vue") //设置求情权限 @PreAuthorize("hasAuthority('user:view')") //设置事物级别为只读 @Transactional(readOnly = true) public ReturnModel getUserPageList(@ModelAttribute("loginUser") LoginUser loginUser,String userName,String useMan,String linkPhone,Date beginTime,Date endTime,PageModel pageModel) { ReturnModel returnModel = new ReturnModel(); returnModel.setData(userService.getPageList(getLoginUserOrganizationFullId(loginUser),userName, useMan, linkPhone, beginTime, endTime, pageModel)); return returnModel; } /** * 新增用户 * @param loginUser 登陆用户 * @param user 添加用户对象 * @return */ @PostMapping("addSysUser.vue") //开启事物 @Transactional //设置请求权限 @PreAuthorize("hasAuthority('user:add')") public ReturnModel addRole(HttpServletRequest request,SUserInfo user,/*自动注入登陆用户对象*/@ModelAttribute("loginUser") LoginUser loginUser) { ReturnModel returnModel = new ReturnModel(); SUserInfo oldUser = userService.getUserByUserName(user.getUserName()); if(oldUser != null) { returnModel.setResultCode(ResponseCodeConstant.EXISTED); returnModel.setResultMessage("用户名名称不能重复"); return returnModel; } oldUser = userService.getUser(user.getUseMan(),user.getOrganizeId()); if(oldUser != null) { returnModel.setResultCode(ResponseCodeConstant.EXISTED); returnModel.setResultMessage("使用人不能重复"); return returnModel; } //正常状态 user.setStatus(1); user.setCreaterId(loginUser.getUser().getUserId()); user.setCreaterName(loginUser.getUser().getUseMan()); user.setOperatorId(loginUser.getUser().getUserId()); user.setOperatorName(loginUser.getUser().getUseMan()); userService.addUser(user); return returnModel; } }
- 服务类设置
@Service public class UserService { @Autowired private SUserInfoMapper userInfoMapper; /** * 添加用户信息 * @param user 用户对象 * @return */ //添加日志注解 查询功能能够不添加 @Log("新增用户") public int addUser(SUserInfo user) { Date now = new Date(); user.setCreateTime(now); user.setModifiedTime(now); return userInfoMapper.insert(user); } }
OpenFeign工程配置
- 接口定义
@RequestMapping("feign/location") public interface IFeignLocationService { /** * 根据矩阵获及车辆id列表取gps信息 * * @param organizeId 机构Id * @param bounds 矩阵 右上角精度,右上角维度|左下角精度,左下角维度 * @return */ @RequestMapping("getVehicleInMap") ReturnModel getVehicleInMap(@RequestBody String vehicleIds, @RequestParam(required=false) String bounds); }
- 客户端配置
@FeignClient(/**客户端名称 对应nacos注册中心的服务名称*/name = NacosServerHostConstant.OBD_PORTAL_NAME,/**调用接口异常,快速失败了*/fallback= FeignLocationServiceFallback.class,/**config类*/configuration = OAuth2FeignAutoConfig.class) //实现IFeignLocationService接口 public interface FeignLocationService extends IFeignLocationService{ }
- fallback类
//注册spring bean @Component //重载父类映射路径 避免发生路径冲突 @RequestMapping("feign/locationback") public class FeignLocationServiceFallback implements FeignLocationService { @Override public ReturnModel getVehicleInMap(String vehicleIds,String bounds) { return ReturnModel.feignFail(); } @Override public ReturnModel getVehicleRealTimeStatus(int vehicleId) { return ReturnModel.feignFail(); } @Override public ReturnModel getVehicleLocation(String vehicleIds) { return ReturnModel.feignFail(); } }
- config类
public class OAuth2FeignAutoConfig { //获取Auth2token类 private CloudTokenService cloudTokenService; public OAuth2FeignAutoConfig(CloudTokenService cloudTokenService) { this.cloudTokenService = cloudTokenService; } //feign请求拦截器 @Bean public RequestInterceptor OAuth2FeignRequestInterceptor() { return new OAuth2FeignRequestInterceptor(cloudTokenService); } }
- Fegin控制器实现类(服务端)
@RestController public class FeignLocationController extends ClientBaseController implements IFeignLocationService{ @Autowired private VehicleMonitorService vehicleMonitorService; /** * 获取矩形区域内的车辆信息 * @param organizeId 机构Id * @param bounds 矩阵 右上角精度,右上角维度|左下角精度,左下角维度 * @return */ @Override public ReturnModel getVehicleInMap(String vehicleIds, String bounds) { ReturnModel model = new ReturnModel(); model.setData(vehicleMonitorService.getVehicleInMap(JSONArray.parseArray(vehicleIds), bounds)); return model; } }
网关项目
- 路由配置
server: port: 8080 #服务端口号 tomcat: max-http-form-post-size: 20971520 spring: profiles: active: - prod application: #微服务注册名称 name: obd-gateway cloud: gateway: #默认过滤器 处理跨域 default-filters: - DedupeResponseHeader=Access-Control-Allow-Origin, RETAIN_UNIQUE globalcors: cors-configurations: '[/**]': allowedHeaders: "*" allowedOrigins: "*" allowedMethods: "*" discovery: locator: enabled: true routes: #基于服务发现配置 portal - id: portal #lb 负载均衡 obd-portal 调用微服务的名称 uri: lb://obd-portal predicates: - Path=/obd-portal/** filters: - StripPrefix=1 #应用路由截掉路径的第一部分前缀 - name: Hystrix #熔断降级 args: name: fallbackcmd fallbackUri: forward:/hystrixFallback?a=test # 限流配置 # - name: RequestRateLimiter # args: # 速率 # redis-rate-limiter.replenishRate: 10 #容量 # redis-rate-limiter.burstCapacity: 20 #基于服务发现配置 manage #基于服务发现配置 zhb - id: zhb uri: lb://obd-zhb predicates: - Path=/obd-zhb/** filters: - StripPrefix=1 #应用路由截掉路径的第一部分前缀 - name: Hystrix #熔断降级 args: name: fallbackcmd fallbackUri: forward:/hystrixFallback?a=test #基于服务发现配置 zhbManage - id: zhbManage uri: lb://obd-zhb-manage predicates: - Path=/obd-zhb-manage/** filters: - StripPrefix=1 #应用路由截掉路径的第一部分前缀
接收终端数据项目
- 启动socket项目
@Component @Sharable @Slf4j public class OBDServer { @Autowired private MessageHandler messageHandler; @Autowired private TerminalAuthHandler terminalAuthHandler; @Autowired private TerminalCommonResponse terminalCommonResponse; @Autowired private TerminalRegisterHandler terminalRegisterHandler; public void bind(int port) { log.info("OBDNettySocket启动 port="+port); //建立NIO线程组 其实是reactor线程组 //用于接收客户端链接 EventLoopGroup boosGroup = new NioEventLoopGroup(); //用于进行SocketChnnel的网络读写 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //启用nio服务对象 ServerBootstrap b = new ServerBootstrap(); //绑定nio线程 b.group(boosGroup, workerGroup) //设置channel 为NioServerSocketChannel 对应java nio中的ServerSocketChannel .channel(NioServerSocketChannel.class) //设置NioServerSocketChannel的TCP参数 backlog .option(ChannelOption.SO_BACKLOG, 1024) //绑定nio事件处理类 做用相似于reactor模式中的handler .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new OBDNettyMessageDecoder()); ch.pipeline().addLast(new OBDNettyMessageEncoder()); ch.pipeline().addLast(terminalRegisterHandler); ch.pipeline().addLast(terminalAuthHandler); ch.pipeline().addLast(terminalCommonResponse); ch.pipeline().addLast(new HeartBeatHandler()); //必须放到最后 ch.pipeline().addLast(messageHandler); } }); //绑定端口 同步等待成功 ChannelFuture f = b.bind(port).sync(); //等待服务监听端口关闭 f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { log.info("OBDNettySocket关闭 port="+port); //优雅退出 释放线程资源 boosGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
- 终端注册
@Component @Sharable @Slf4j public class TerminalRegisterHandler extends BaseHandler{ /** * 终端注册 256 */ private final int TERMINAL_REGISTER = 0x0100; /** * 终端注册 应答 33024 */ private final int TERMINAL_REGISTER_RESPONSE = 0x8100; @Autowired private CheckTerminalIdIsCanUseService checkTerminalIdIsCanUseService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { byte[] array = (byte[]) msg; int messageId = OBDUtil.getIntFromODBIntArray(array, 0, 2); if(messageId == TERMINAL_REGISTER) { String terminalId = OBDUtil.getTerminalNumber(array); //失败 byte status = 1; String authCode = null; if(checkTerminalIdIsCanUseService.check(terminalId)) { status = 0; //认证码 用于终端认证 目前没有使用 authCode = OBDConstant.TERMINAL_REGISTER_AUTH_CODE; log.info("注册成功:terminalId="+terminalId); }else { log.info("注册失败:terminalId="+terminalId); } //发送注册结果 ctx.writeAndFlush(getTerminalRegisterResponse(array,new byte[] { array[10],array[11]}, status,authCode)); } ctx.fireChannelRead(msg); }
- 终端认证
@Component @Sharable @Slf4j public class TerminalAuthHandler extends BaseHandler{ /** * 终端鉴权 258 */ private final static int TERMINAL_AUTH = 0x0102; /** * 鉴权终端是否可用服务类 */ @Autowired private CheckTerminalIdIsCanUseService checkTerminalIdIsCanUseService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { byte[] array = (byte[]) msg; //消息id int messageId = OBDUtil.getIntFromODBIntArray(array, 0, 2); if(messageId == TERMINAL_AUTH) { //终端号 String terminalId = OBDUtil.getTerminalNumber(array); //失败 byte status = 1; //认证成功 if(checkTerminalIdIsCanUseService.check(terminalId)) { status = 0; //没有保存链路信息 if(SocketConfig.getSocketModel(terminalId) == null) { //心跳检查定时任务 HeartBeatTask task = new HeartBeatTask(terminalId); ctx.executor().schedule(task, OBDConstant.HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS); } //保存链路信息 SocketConfig.putSocket(terminalId, new SocketModel(ctx)); log.info("认证成功:terminalId="+terminalId); }else { log.info("认证失败:terminalId="+terminalId); } //返回认证结果 ctx.writeAndFlush(OBDCommonResponseUtil.getOBDCommonResponse(array,new byte[] { array[10],array[11]}, status)); }else { ctx.fireChannelRead(msg); } } }
- 消息处理
@Component @Sharable @Slf4j public class MessageHandler extends BaseHandler{ @Autowired private DealMessageService dealMessageService; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { byte[] array = (byte[]) msg; int messageId = OBDUtil.getIntFromODBIntArray(array, 0, 2); String terminalId = OBDUtil.getTerminalNumber(array); //log.info("收到终端消息:terminalId="+terminalId+" messageId="+messageId); SocketModel socketModel = SocketConfig.getSocketModel(terminalId); //有链路信息 说明终端设备认证 if(socketModel != null) { //处理消息 dealMessageService.dealMessage(String.valueOf(messageId),array); socketModel.resetHeartNoRespCount(); ctx.writeAndFlush(OBDCommonResponseUtil.getOBDCommonResponse(array, new byte[] { array[10],array[11]}, (byte)0)); }else { System.out.println(messageId); log.info("终端未认证 terminalId="+terminalId); //没有发现终端 ctx.writeAndFlush(OBDCommonResponseUtil.getOBDCommonResponse(array,new byte[] { array[10],array[11]}, (byte)5)); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(ctx); ctx.close(); cause.printStackTrace(); log.error("链路异常关闭"); } }
轨迹解析服务
- 从kafa中获取数据
@Slf4j @Component public class KafkaConsumer { @Autowired private GpsVehicleTraceService gpsVehicleTraceService; @Autowired private TerminalNumberVehicleService terminalNumberVehicleService; @Autowired private RedisClient redisClient; //处理轨迹上传数据 @KafkaListener(id="gpsLocation1",topics= KafkaTopicConstant.GPS_LOCATION) public void onGPSLocation(ConsumerRecord<String, byte[]> record) { //协议解析 OBDModel obd = ProtocolAnalysis.parse(record.value()); if(obd != null) { String terminalNumber = obd.getTerminalNumber(); //获取绑定车辆信息 Integer vehicleId = terminalNumberVehicleService.getVehicleIdByTerminalNumber(terminalNumber); if(vehicleId != null) { PositionModel positionModel = (PositionModel)obd.getBody(); if(positionModel != null) { //处理轨迹 gpsVehicleTraceService.dealGpsVehicle(vehicleId, positionModel,false); } }else { log.info("未找到绑定信息 terminalNumber ="+terminalNumber); } } } //轨迹补传处理 @KafkaListener(id="gpsLocationBeath1",topics=KafkaTopicConstant.GPS_LOCATION_BEATCH) public void onGPSLocationBeath(ConsumerRecord<String, byte[]> record) { OBDModel obd = ProtocolAnalysis.parse(record.value()); if(obd != null) { String terminalNumber = obd.getTerminalNumber(); Integer vehicleId = terminalNumberVehicleService.getVehicleIdByTerminalNumber(terminalNumber); if(vehicleId != null) { LocationBatchModel positionBatch = (LocationBatchModel)obd.getBody(); if(positionBatch != null) { List<PositionModel> list = positionBatch.getList(); if(list != null && !list.isEmpty()) { for (PositionModel positionModel : list) { gpsVehicleTraceService.dealGpsVehicle(vehicleId, positionModel,true); } } } } } } //终端注册 @KafkaListener(id="terminalRegister",topics=KafkaTopicConstant.TERMINAL_REGISTER) public void onTerminalRegister(ConsumerRecord<String, byte[]> record) { ProtocolAnalysis.parse(record.value()); } //终端注销 @KafkaListener(id="terminalUnRegister",topics=KafkaTopicConstant.TERMINAL_UNREGISTER) public void onTerminalUnRegister(ConsumerRecord<String, byte[]> record) { ProtocolAnalysis.parse(record.value()); } /** * 查询终端参数 */ @KafkaListener(id="terminalAnswer",topics=KafkaTopicConstant.TERMINAL_ANSWER) public void getTerminalAnswer(ConsumerRecord<String, byte[]> record){ OBDModel obd = ProtocolAnalysis.parse(record.value()); if(obd != null){ String terminalNumber = obd.getTerminalNumber(); TerminalAnswerData answerData = (TerminalAnswerData)obd.getBody(); List<TerminalAnswerValue> answerValue = (List<TerminalAnswerValue>)answerData.getValue(); redisClient.setValue(RedisKeyPreConstant.TERMINAL_DATA, terminalNumber, answerValue); } } }
- 协议解析类
public class ProtocolAnalysis { //协议解析类map private final static Map<Integer,DealMessageService> dealMessageServiceMap = new HashMap<>(); static { //初始化 dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_AUTH, new TerminalAuthDealMessageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.POSSITION, new PositionBaseDealMessageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_GENER_RESPONSE, new TerminalGeneralResponseDealMessageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.POSSITION_BATCH, new PossitionBatchDealMassageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_REGISTER, new TerminalRegisterDealMessageServiceImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.TERMINAL_ANSWER, new TerminalAnswerDealMessageImpl()); dealMessageServiceMap.put(OBDConstant.MessageId.ALARM_INFO, new AlarmDealMessageServiceImpl()); } // public static void main(String[] args) { // byte[] data = new byte[] {0, 1, 0, 5, 1, 65, 68, 120, 116, -107, 0, 33, 0, 1, -126, 7, 0, 60}; // // OBDModel obdModel =parse(data); // System.out.println(obdModel); // System.out.println(Integer.toHexString(obdModel.getMessageId())); // } public static OBDModel parse(byte[] chars) { //转化为整行 int[] dataArray = new int[chars.length]; for(int i=0;i<dataArray.length;i++) { if(chars[i] < 0 ) { //数据修正 传输数据为无符号数 dataArray[i] = OBDUtil.obdDataCorrection(chars[i]); }else { dataArray[i] = chars[i]; } } OBDModel obdModel = new OBDModel(); //消息 ID 占用两个字节 高八位0 低八位 1 obdModel.setMessageId(OBDUtil.getIntFromODBIntArray(dataArray, 0, 2)); //消息体属性 占用两个字节 高八位2 低八位3 obdModel.setBodyDescribe(getBodyDescribe(OBDUtil.getIntFromODBIntArray(dataArray, 2, 2))); //终端号 占用 4~9位 obdModel.setTerminalNumber(getTerminalNumber(dataArray)); //消息流水号 占用两个字节 高八位10低八位11 obdModel.setSerialNumber(OBDUtil.getIntFromODBIntArray(dataArray, 10, 2)); int bodyIndex = 12; if(obdModel.getBodyDescribe().getIsSubpackage()) { //消息总包数 占用两个字节 高八位12低八位13 obdModel.setTotalPackage(OBDUtil.getIntFromODBIntArray(dataArray, 12, 2)); //包序号 占用两个字节 高八位14低八位15 obdModel.setPackageNum(OBDUtil.getIntFromODBIntArray(dataArray, 14, 2)); bodyIndex += 4; } int bodyLength = obdModel.getBodyDescribe().getBodyLength(); if(bodyLength>0) { if(OBDConstant.TOP_MIN_LENGTH + bodyLength <= dataArray.length) { int[] bodyIntArray = new int[bodyLength]; System.arraycopy(dataArray, bodyIndex, bodyIntArray, 0, bodyLength); //设置消息体 obdModel.setBody(getBody(obdModel.getMessageId(),bodyIntArray)); } } return obdModel; }; /** * 获取body信息 */ public static Object getBody(int messageid,int[] bodyArray) { //是否支持该消息类型的解析 if(dealMessageServiceMap.containsKey(messageid)) { return dealMessageServiceMap.get(messageid).getBody(bodyArray); }else { return Arrays.toString(bodyArray); } } /** * 获取终端号 BCD[6] 4-9 */ public static String getTerminalNumber(int[] dataArray) { String result = ""; for(int i=4;i<=9;i++) { result += OBDUtil.getBCDStr(dataArray[i]); } return result; } /** * 获取消息体属性 */ public static BodyDescribe getBodyDescribe(int bodyInt) { BodyDescribe bodyDescribe = new BodyDescribe(); //低9为字节表示长度 bodyDescribe.setBodyLength(bodyInt % (1<<10)); //第10-12字节全为0表示消息不加密 bodyDescribe.setIsEncryption((bodyInt&((1<<10)+(1<<11)+(1<<12)))!=0); //第10字节为1表示RSA加密 bodyDescribe.setEncryptionType(OBDUtil.checkIntBitIsOne(bodyInt, 10)?1:null); //第13位为1表示分包传输 bodyDescribe.setIsSubpackage(OBDUtil.checkIntBitIsOne(bodyInt, 13)); return bodyDescribe; } /** * 验证码校验 */ public static boolean check(int[] dataArray) { int result = dataArray[dataArray.length-1]; int calR = dataArray[0] ^ dataArray[1]; for(int i=2;i<dataArray.length-1;i++) { calR = calR^dataArray[i]; } return calR == result; }
- 认证消息类型解析
public class TerminalAuthDealMessageServiceImpl implements DealMessageService{ @Override public Object getBody(int[] bodyArray) { return new String(bodyArray,0,bodyArray.length); } }