今年,相信不少人都会在各个商场或者是电影院中能够看到各类娃娃机、幸运盒子、口红挑战等等相似机器。在抖音上《口红挑战》这款机子也是火的一塌糊涂,你只要花 10 块钱就有可能赢走一个 YSL 口红,想一想就以为颇有诱惑力。或许这些机器被程序员一瞧就知道其中的猫腻,可是这个机器瞄准的是那些容易冲动消费的消费者,好比情侣、女生、带小孩的大人;像程序员这么奇葩的生物,通常都是直接被无视的哈哈。css
而后呢,咱们公司就是为这些设备的正常运行提供解决方案的。所以才有我今天的爬坑总结,哈哈哈哈哈.....html
咱们提供的解决方案是这样的,在一个门店里面会包含以下的设备:娃娃机、口红挑战、排行榜、中控,固然其中还有咱们的后台服务。那么首先我会先介绍一下整个系统的架构以及各个设备的职责:java
服务后台android
中控(本地服务:不链接外网):这个中控也是也 Android 设备,它的功能有两个:git
娃娃机:这一块其实包含了两个部分,一个是 Android 设备,另一个硬件设备。程序员
口红挑战:关于口红挑战这个设备能够划分为 3 个模块,分别为见缝插针游戏、常规程序模块、硬件模块github
rocket:这个程序有点特殊,由于用户是看不到它的,在出厂的时候,这个程序就被写进去了,那么它负责的工做以下:web
关于资源同步的,首先咱们先理一下咱们须要同步的资源有哪些,这些资源分别为: apk 安装包、图片、h5 相关的 index 资源。
关于更新的方式,这里其实就有一个比较坑的地方了,一开始的时候咱们选择的资源更新方式比较傻,直接使用 websocket 进行资源更新的,一开始的时候只有一个设备进行链接,问题却是不大,可是后来发现多台设备链接同时更新资源的时候问题特别大,链接常常断开,致使资源更新失败。那么这里是我遇到的第一个坑。发现这个坑以后呢,个人选择资源更新的方式就更改成:NanoHttpd。NanoHttpd 是一个开源库,是用 Java 实现的,它能够在 Android 设备上创建一个轻量级的 web server。其实在 Android 设备上建立一个轻量级的 web server 才是咱们一开始就应该要选择的方向。为何呢?首先 NanoHttpd 的使用是比较简单的,所以咱们只须要几行代码就能够实现一个 web server 了;其次呢,NanoHttpd 是比较稳定的,相对于咱们手动使用 websocket 去实现一个资源分发要稳定太多了。算法
那么在咱们选择了资源的更新方式以后,有另一个问题浮出水面了,关于服务器的 IP 地址。咱们都知道,关于 Android 设备链接上移动互联网或者 WiFi 的时候都会被自动分配一个 IP 地址,所以这个 IP 地址是会变化的,咱们的设备在天天晚上都会关机,而后在次日开启重启的时候又会被分配到一个新的 IP 地址,所以服务器的 IP 地址是一直在变化的,因此这里咱们须要作的是想办法把某个设备的 IP 地址给固定下来。那么接下来就来说讲关于 NanoHttpd 建立轻量级的 web server 和如何解决 IP 变化的问题。shell
NanoHttpd 项目地址
implementation 'org.nanohttpd:nanohttpd-webserver:2.3.1'
File resourceDir = new File(Environment.getExternalStorageDirectory(), "myRootDir"); SimpleWebServer httpServer = new SimpleWebServer(null, 18103, resourceDir, true, "*"); httpServer.start(NanoHTTPD.SOCKET_READ_TIMEOUT, true);
SimpleWebServer 构造函数的参数
访问方式
在 Android 设备中,它的一个 IP 地址是会变化的,并且每一个门店都会有一个本身的内部中控机,那么咱们是必需要处理 IP 地址变化的这个问题的。咱们的解决方案有以下两个步骤:
关于资源更新的,咱们首先须要明确咱们须要更新的资源有哪些以及咱们须要更新的方式。
关于咱们资源跟新的全部数据都是保存在 Resource.json 这个文件夹里面的,那么咱们每隔 5min 就从中控服务端(局域网内)获取 Resource.json,而后每一个类型的资源就根据写在 Resource.json 中的数据进行判断。那么写入 Resource.json 文件中的实现及具体内容以下:
public class ResListModel { // 娃娃机 banner 的 h5 资源(index.html等文件) public HashMap<String, String> bannerFiles = new HashMap(); // 门店中全部娃娃机都会显示的轮播图 // key 为 图片的 hash 值 // value 为图片的在服务器中的相对路径 public HashMap<String, String> PublicFiles = new HashMap(); // 门店中特定娃娃机的私有显示轮播图 // key 为设备的 id // value 为图片图片的 hash 及路径信息(对应 PublicFiles) public HashMap<String, HashMap<String, String>> PrivateFiles = new HashMap(); // 更新的 apk 路径 public String UpdateApk; // 更新的 apk 包名 public String UpdateApkPackageName; // 更新的 apk 版本名 public String UpdateApkVersion; // 更新的 apk 版本号 public int UpdateApkVersionCode; }
ResListModel res = new ResListModel(); // 略过添加数据的过程 ...; File resourceFile = new File(baseDir, "Resource.json"); RandomAccessFile out = new RandomAccessFile(resourceFile, "rw"); byte[] json = JsonStream.serialize(res).getBytes("utf-8"); out.setLength(json.length); out.write(json); out.close();
{ "PrivateFiles":{}, "PublicFiles": { "1A7D3394A6F10D3668FB29D8CCA1CA8B":"Public/timg.jpg" }, "UpdateApk":null, "UpdateApkPackageName":null, "UpdateApkVersion":null, "UpdateApkVersionCode":0, "bannerFiles": { "C609D70832710E3DCF0FB88918113B18":"banner/Resource.json", "FC1CF2C83E898357E1AD60CEF87BE6EB":"banner/app.8113390c.js", "27FBF214DF1E66D0307B7F78FEB8266F":"banner/manifest.json", "A192A95BFF57FF326185543A27058DE5":"banner/index.html", "61469B10DBD17FDEEB14C35C730E03C7":"banner/app.8113390c.css" } }
try { // banner 资源文件 String fileName = fileFilter.getAbsolutePath().substring(baseDirLength); RandomAccessFile randomAccessFile = new RandomAccessFile(fileFilter,"r"); byte[] buf = new byte[(int) randomAccessFile.length()]; randomAccessFile.read(buf); randomAccessFile.close(); MessageDigest md5 = MessageDigest.getInstance("md5"); byte[] hash = md5.digest(buf); String hashStr = ByteToHex(hash,0,hash.length); res.bannerFiles.put(hashStr,fileName); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
// 字节转换为 16 进制 public static String ByteToHex(byte[] bt, int offset, int len) { StringBuffer sb = new StringBuffer(); for (int i = offset; i < offset + len; i++) { int tmp = bt[i] & 0xff; String tmpStr = Integer.toHexString(tmp); if (tmpStr.length() < 2) sb.append("0"); sb.append(tmpStr); } return sb.toString().toUpperCase(); }
public static Observable<Boolean> updateBannerRes(ResListBean resListBean) throws IOException, NoSuchAlgorithmException { // 获取远程 banner 的文件 HashMap<File, String> remoteFiles = new HashMap(); for (HashMap.Entry<String, String> entry : resListBean.bannerFiles.entrySet()) { remoteFiles.put(new File(entry.getValue()), entry.getKey()); } FileUtils.GetFilesInDir(bannerDir,localBannerList,null); int baseDirLength = resDir.getAbsolutePath().length()+1; // step1:删除本地文件(远程 banner 中没有的文件) for (File localFile : localBannerList) { File chileFile = new File(localFile.getAbsolutePath().substring(baseDirLength)); if (!remoteFiles.containsKey(chileFile)) { MainActivity.appendAndScrollLog(String.format("删除 banner 资源文件 %s\n", localFile.getAbsolutePath())); localFile.delete(); } } // 下载本地没有的文件 ArrayList<Observable<File>> taskList = new ArrayList(); for (Map.Entry<File, String> fileEntry : remoteFiles.entrySet()) { File file = new File(resDir,fileEntry.getKey().getAbsolutePath()); // step2:本地中存在和远程相同的文件名 if (localBannerList.contains(file)) { // step3:根据 hash 值判断是否为同一文件 String hashStr = FileUtils.getFileHashStr(file); if (TextUtils.equals(hashStr,fileEntry.getValue())){ MainActivity.appendAndScrollLog(String.format("保留 banner 文件 %s\n", file.getAbsolutePath())); taskList.add(Observable.just(file)); continue; } } // step4:下载本地没有的文件 String url = new URL("http", Config.instance.centralServerAddress, Config.instance.httpPort, new File(BuildConfig.APPLICATION_ID, fileEntry.getKey().getAbsolutePath()).getAbsolutePath()).toString(); // step5:加入文件下载列表 taskList.add(DownLoadUtils.getDownLoadFile(url,file)); } return Observable.concat(taskList) .toFlowable(BackpressureStrategy.MISSING) .parallel() .runOn(Schedulers.io()) .sequential() .toList() .observeOn(Schedulers.computation()) .map(new Function<List<File>, ArrayList<File>>() { @Override public ArrayList<File> apply(List<File> files) throws Exception { ArrayList<File> list = new ArrayList(); for (File file : files) { if (!file.getAbsolutePath().isEmpty()) { list.add(file); } } if (list.size() > 0) { if (!Utils.EqualCollection(list, localBannerList)) { Collections.sort(list); } else { list.clear(); } } return list; } }) .observeOn(AndroidSchedulers.mainThread()) .map(new Function<ArrayList<File>, Boolean>() { @Override public Boolean apply(ArrayList<File> list) throws Exception { if (list.size() > 0) { localBannerList = list; webViewHasLoad = false; loadH5(); } return true; } }) .observeOn(Schedulers.io()) .map(new Function<Boolean, Boolean>() { @Override public Boolean apply(Boolean aBoolean) throws Exception { FileUtils.DelEmptyDir(resDir); return true; } }) .toObservable(); }
关于程序的升级,相比较于图片资源的更新要简单许多。
咱们的实现版本更新的步骤以下:
public static Observable<Boolean> updateGame(ResListBean res) throws IOException, InterruptedException { ArrayList<File> apkList = new ArrayList(); FileUtils.GetFilesInDir(resDir, apkList, new String[]{ ".apk", }); // 删除本地存在的 apk 包 for (File file : apkList) { file.delete(); } do { if (res.UpdateApk == null || res.UpdateApkVersion == null) { break; } // 判断是否须要升级 if (BuildConfig.VERSION_CODE >= res.UpdateApkVersionCode) { break; } // apk 的 URL final String url = new URL("http", Config.instance.centralServerAddress, Config.instance.httpPort, new File(BuildConfig.APPLICATION_ID, res.UpdateApk).getAbsolutePath()).toString(); MainActivity.appendAndScrollLog(String.format("下载升级文件 %s\n", url)); // 下载 apk 文件 return DownLoadUtils.getDownLoadFile(url,resDir.getAbsolutePath(),res.UpdateApk) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .flatMap(new Function<File, ObservableSource<String>>() { @Override public ObservableSource<String> apply(File file) throws Exception { String path = file.getAbsolutePath(); MainActivity.appendAndScrollLog(String.format("升级文件下载完成 %s %s\n", path, url)); PackageManager pm = MainActivity.instance.getPackageManager(); PackageInfo pi = pm.getPackageArchiveInfo(path, 0); if (pi == null) { MainActivity.appendAndScrollLog(String.format("升级文件打开失败 %s\n", path)); return Observable.just(""); } MainActivity.appendAndScrollLog(String.format("升级文件对比:Native(%s %s)/Remote(%s %s)\n", BuildConfig.APPLICATION_ID, BuildConfig.VERSION_NAME, pi.packageName, pi.versionName)); if (!BuildConfig.APPLICATION_ID.equals(pi.packageName) || BuildConfig.VERSION_CODE >= pi.versionCode) { return Observable.just(""); } return Observable.just(path); } }) .flatMap(new Function<String, Observable<Boolean>>() { @Override public Observable<Boolean> apply(String updateApk) throws Exception { if (!updateApk.isEmpty()) { Log.e(TAG, "等待游戏结束后安装升级文件..."); MainActivity.appendAndScrollLog("等待游戏结束后安装升级文件...\n"); synchronized (GamePlay.class) {//防止在游戏运行时更新版本 Log.e(TAG, "发布广播"); Intent intent = new Intent(); intent.setAction(Config.updateBroadcast); intent.putExtra("apk", updateApk); MainActivity.instance.sendBroadcast(intent); System.exit(0); } } return Observable.just(true); } }); } while (false); return Observable.just(true); }
关于资源文件的下载,我是选择 okdownload。okdownload 是一个支持多线程,多任务,断点续传,可靠,灵活,高性能以及强大的下载引擎。详情能够去看 okdownload GitHub 地址
implementation 'com.liulishuo.okdownload:okdownload:1.0.5' implementation 'com.liulishuo.okdownload:okhttp:1.0.5'
单文件下载
DownloadTask task = new DownloadTask.Builder(url, parentFile) .setFilename(filename) // the minimal interval millisecond for callback progress .setMinIntervalMillisCallbackProcess(30) // do re-download even if the task has already been completed in the past. .setPassIfAlreadyCompleted(false) .build(); task.enqueue(listener); // cancel task.cancel(); // execute task synchronized task.execute(listener);
多文件下载
final DownloadTask[] tasks = new DownloadTask[2]; tasks[0] = new DownloadTask.Builder("url1", "path", "filename1").build(); tasks[1] = new DownloadTask.Builder("url2", "path", "filename1").build(); DownloadTask.enqueue(tasks, listener);
public class DownLoadUtils { /** * 从中控下载文件到本地 * @param url * @param parentPath 保存到本地文件的父文件路径 * @param downloadFileName 保存到本地的文件名 * @return */ public static Observable<File> getDownLoadFile(String url,String parentPath,String downloadFileName){ // 下载本地没有的文件 MainActivity.appendAndScrollLog(String.format("开始下载资源文件 %s\n", url)); final DownloadTask task = new DownloadTask.Builder(url, parentPath, downloadFileName).build(); return Observable.create(new ObservableOnSubscribe<File>() { @Override public void subscribe(final ObservableEmitter<File> emitter) throws Exception { task.enqueue(new DownloadListener2() { @Override public void taskStart(DownloadTask task) { } @Override public void taskEnd(DownloadTask task, EndCause cause, Exception realCause) { if (cause != EndCause.COMPLETED) { MainActivity.appendAndScrollLog(String.format("资源文件下载失败 %s %s\n", cause.toString(), task.getUrl())); emitter.onNext(new File("")); emitter.onComplete(); return; } File file = task.getFile(); MainActivity.appendAndScrollLog(String.format("资源文件下载完成 %s\n", file.getAbsolutePath())); emitter.onNext(file); emitter.onComplete(); } }); } }).retry(); } /** * 从中控下载文件到本地 * @param url * @param saveFile 保存到本地的文件 * @return */ public static Observable<File> getDownLoadFile(String url, File saveFile){ return getDownLoadFile(url,saveFile.getParentFile().getAbsolutePath(),saveFile.getName()); } }
像娃娃机和格子机这些设备都是在线下直接面向用户的,所以咱们不能将咱们的 Android 设备所有都展示给咱们的用户,咱们须要对用户的行为作些限制,例如禁止用户经过导航栏或者下拉菜单退出当前程序,防止他们作出一些危险的操做。个人解决方案是把当前的 rocket 程序设置为默认启动和桌面应用程序,并将 Android 设备中自带的 launcher 程序 和 systemui 程序给禁用掉,那么设备一开始启动的时候就会启动咱们的 rocket 应用,并成功的禁止了用户使用导航栏和下拉菜单来作非法的操做。
查找 Android 设备中自带的 launcher 程序 和 systemui 程序的对应包名
LW-PC0920@lw1002022 MINGW64 ~/Desktop $ adb shell pm list packages | grep launcher package:com.android.launcher3
LW-PC0920@lw1002022 MINGW64 ~/Desktop $ adb shell pm list packages | grep systemui package:com.android.systemui
禁止 Android 设备中自带的 launcher 程序 和 systemui 程序的使用
adb shell pm disable com.android.launcher3
- 禁止 systemui 程序的使用 ``` adb shell pm disable com.android.systemui ```
public static void enableLauncher(Boolean enabled) { List<PackageInfo> piList = MainActivity.instance.packageManager.getInstalledPackages(0); ArrayList<String> packages = new ArrayList(); for (PackageInfo pi : piList) { String name = pi.packageName; if (name.contains("systemui") || name.contains("launcher")) { packages.add(name); } } for (String packageName : packages) { su(String.format("pm %s %s\n", enabled ? "enable" : "disable", packageName)); } } /** * 执行 adb 指令 * */ public static int su(String cmd) { try { Process p = Runtime.getRuntime().exec("su"); DataOutputStream os = new DataOutputStream(p.getOutputStream()); os.writeBytes(cmd); os.writeBytes("exit\n"); os.flush(); os.close(); return p.waitFor(); } catch (Exception ex) { return -1; } }
关于 IoT 的实现,咱们这边使用的是阿里的《微消息队列 for IoT》服务,关于《微消息队列 for IoT》服务,阿里的解释以下:
微消息队列 for IoT 是消息队列(MQ)的子产品。针对用户在移动互联网以及物联网领域的存在的特殊消息传输需求,消息队列(MQ) 经过推出微消息队列 for IoT 开放了对 MQTT 协议的完整支持
MQTT 协议?
MQTT 的特色
关键名词的解释
名词 | 解释 |
---|---|
Parent Topic | MQTT 协议基于 Pub/Sub 模型,所以任何消息都属于一个 Topic。根据 MQTT 协议,Topic 存在多级,定义第一级 Topic 为父 Topic(Parent Topic),使用 MQTT 前,该 Parent Topic 须要先在 MQ 控制台建立。 |
Subtopic | MQTT 的二级 Topic,甚至三级 Topic 都是父 Topic 下的子类。使用时,直接在代码里设置,无需建立。须要注意的是 MQTT 限制 Parent Topic 和 Subtopic 的总长度为64个字符,若是超出长度限制将会致使客户端异常。 |
Client ID | MQTT 的 Client ID 是每一个客户端的惟一标识,要求全局惟一,使用相同的 Client ID 链接 MQTT 服务会被拒绝 |
关于显示 iot 链接的实现过程是这样的:首先咱们将设备的三元组从管理后台中批量生成,文件名的格式为 deviceName.json(例如:00001.json),里面是关于每一个设备的三元组信息;接着咱们将装有三元组文件的 U 盘插入到 Android 设备中(娃娃机或者口红挑战);rocket 程序会自动监测到 U 盘的插入并将文件剪切到 Android 设备的制定目录下;再接着 Android 设备能够去读取指定文件中三元组信息;最后使用此三元组进行链接 mqtt。
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
关于三元组
属性|用处 --|-- productKey|对应程序的 key,相似于 appid deviceName|对应上述的 Client ID,用来惟一识别一台 Android 设备的 deviceSecret|使用 HmacSHA1 算法计算签名字符串,并将签名字符串设置到 Password 参数中用于鉴权
关于订阅的 topic
代码实现 iot 链接
/** * 剪切配置文件(三元组) * @param packageName */ public static void moveConfig(String packageName) { File usbConfigDir = new File(UsbStorage.usbPath, Config.wejoyConfigDirInUsb); File extProjectDir = new File(Environment.getExternalStorageDirectory(), Config.resourceDirName); File extConfigFile = new File(extProjectDir, Config.wejoyConfigFileInSdcard); if (!usbConfigDir.exists() || extConfigFile.exists()) { return; } extProjectDir.mkdirs(); File[] configFiles = usbConfigDir.listFiles(); if (configFiles.length > 0) { Arrays.sort(configFiles); moveFile(configFiles[0], extConfigFile); } } public static void moveFile(File src, File dst) { su(String.format("mv -f %s %s\n", src.getAbsolutePath(), dst.getAbsolutePath())); }
public static File configFile = new File(new File(Environment.getExternalStorageDirectory(), "WejoyRes"), "Config.json"); static void read() throws IOException { if (configFile.exists()) { RandomAccessFile in = new RandomAccessFile(configFile, "r"); byte[] buf = new byte[(int) configFile.length()]; in.read(buf); in.close(); instance = JsonIterator.deserialize(new String(buf, "utf-8"), Config.class); } else { instance = new Config(); } mqttRequestTopic = String.format("/sys/%s/%s/rrpc/request/", instance.productKey, instance.deviceName); mqttResponseTopic = String.format("/sys/%s/%s/rrpc/response/", instance.productKey, instance.deviceName); mqttPublishTopic = String.format("/%s/%s/update", instance.productKey, instance.deviceName); }
- **链接 mqtt** ``` static void init() { instance = new IoT(); DeviceInfo deviceInfo = new DeviceInfo(); deviceInfo.productKey = Config.instance.productKey; deviceInfo.deviceName = Config.instance.deviceName; deviceInfo.deviceSecret = Config.instance.deviceSecret; final LinkKitInitParams params = new LinkKitInitParams(); params.deviceInfo = deviceInfo; params.connectConfig = new IoTApiClientConfig(); LinkKit.getInstance().registerOnPushListener(instance); initDisposable = Observable.interval(0, Config.instance.mqttConnectIntervalSeconds, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .map(new Function<Long, Boolean>() { @Override public Boolean apply(Long aLong) throws Exception { if (!initialized) { LinkKit.getInstance().init(MainActivity.instance, params, instance); } return initialized; } }) .subscribe(new Consumer<Boolean>() { @Override public void accept(Boolean aBoolean) throws Exception { if (aBoolean) { initDisposable.dispose(); } } }); } ``` - **发送消息:** 发送消息的时候,咱们须要指定 topic,不然服务器没法接收到咱们的消息。 ``` static void publish(String json) { Log.e(TAG, "publish: "+json ); MqttPublishRequest res = new MqttPublishRequest(); res.isRPC = false; res.topic = Config.mqttPublishTopic; res.payloadObj = json; LinkKit.getInstance().publish(res, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { } @Override public void onFailure(ARequest aRequest, AError aError) { } }); } ``` - **接收消息:** 接收消息的时候,咱们也须要判断是来自哪一个 topic 中的,除了咱们指定的 topic,其余的 topic 咱们都不作处理;当咱们接收到服务器中发送来的消息的时候,咱们是先判断消息的类型,而后根据相对应的类型作出不一样的反应。例如咱们收到后台请求给娃娃机的上分的指令,那么咱们就向设备中的硬件模块发送上分的指令,并等待设备反应并给后台发送一条响应信息。这条响应的消息是须要在指定的时间内完成,不然认为超时。 ``` @Override public void onNotify(String s, final String topic, final AMessage aMessage) { if (!topic.startsWith(Config.mqttRequestTopic)) { return; } Observable.create(new ObservableOnSubscribe<MqttMessage>() { @Override public void subscribe(ObservableEmitter<MqttMessage> emitter) throws Exception { MqttMessage msg = JsonIterator.deserialize(new String((byte[]) aMessage.data, "utf-8"), MqttMessage.class); if (msg == null) { return; } emitter.onNext(msg); emitter.onComplete(); } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .flatMap(new Function<MqttMessage, ObservableSource<MqttMessage>>() { @Override public ObservableSource<MqttMessage> apply(MqttMessage msg) throws Exception { Log.e(TAG, "收到消息 key:"+msg.key+" msg:"+msg.body.m); switch (msg.key) { case "h": {// SetHeartBeatDownstream setHeartBeatDownstream = msg.body.m.as(SetHeartBeatDownstream.class); // 和设备进行通讯,并等待设备的响应 return Device.setHeartBeat(setHeartBeatDownstream); } case "b": {// AddCoinsDownstream addCoinsDownstream = msg.body.m.as(AddCoinsDownstream.class); // 和设备进行通讯,并等待设备的响应 return Device.addCoins(addCoinsDownstream); } case "g": {// // 和设备进行通讯,并等待设备的响应 return Device.getParam(); } case "s": {// SetParamDownstream setParamDownstream = msg.body.m.as(SetParamDownstream.class); // 和设备进行通讯,并等待设备的响应 return Device.setParam(setParamDownstream); } } return Observable.never(); } }) .observeOn(Schedulers.io()) .map(new Function<MqttMessage, Boolean>() { @Override public Boolean apply(MqttMessage msg) throws Exception { MqttPublishRequest res = new MqttPublishRequest(); res.isRPC = false; res.topic = topic.replace("request", "response"); //res.msgId = topic.split("/")[6]; res.payloadObj = JsonStream.serialize(msg); LinkKit.getInstance().publish(res, new IConnectSendListener() { @Override public void onResponse(ARequest aRequest, AResponse aResponse) { } @Override public void onFailure(ARequest aRequest, AError aError) { } }); return true; } }) .subscribe(); } ```
在娃娃机和口红挑战的这两个设备中,咱们都须要和设备进行通讯,例如:娃娃机投币、娃娃机出礼反馈、按下选中口红的格子等等这些都是须要和硬件模块进行通讯的。在关于串口通讯的框架选择方面,咱们主要是选择 Google 的 android-serialport-api 来实现。项目原地址
依赖方式
allprojects { repositories { ... maven { url 'https://jitpack.io' } } }
dependencies { implementation 'com.github.licheedev.Android-SerialPort-API:serialport:1.0.1' }
// su默认路径为 "/system/bin/su" // 可经过此方法修改 SerialPort.setSuPath("/system/xbin/su");
链接串口的时候须要指定串口号以及波特率,以后定时处理机器发送的指令。
static void init() throws IOException { SerialPort.setSuPath("/system/xbin/su"); // 设置串口号及波特率 serialPort = new SerialPort(Config.serialPort, Config.baudrate); // 接收指令流 inputStream = serialPort.getInputStream(); // 发送指令流 outputStream = serialPort.getOutputStream(); // 每隔 100ms 处理机器信息 Observable.interval(100, TimeUnit.MILLISECONDS) .observeOn(serialScheduler) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { // 处理机器发送的指令 handleRecv(); } }); }
向机器发送指令的时候是结合 Rxjava 来实现的。除此以外,向机器发送指令是须要有规定格式的(内部制定的通讯协议),咱们发送及接收数据都是一个字节数组,所以咱们格式是须要严格按照咱们制定的协议进行的,以下是娃娃机投币的简单示例:
static ObservableSource<MqttMessage> addCoins(final AddCoinsDownstream msg) { return Observable.create(new ObservableOnSubscribe<MqttMessage>() { @Override public void subscribe(ObservableEmitter<MqttMessage> emitter) throws Exception { currentUser = msg.u; currentHeadUrl = msg.h; currentNickname = msg.nk; byte[] buf = new byte[]{0x11, addCoinsCmd, msg.num, msg.c, 0, 0x00, 0x00}; byte[] ret = sign(buf); try { outputStream.write(ret); } catch (IOException e) { e.printStackTrace(); } penddingCmd = addCoinsCmd; penddingEmitter = emitter; } }) .subscribeOn(serialScheduler); }
关于接受机器消息这一块是每隔 100ms 进行的,在处理机器指令的时候,首先须要过滤到无效的字节,以后再按照咱们制定的协议来处理消息,判断是娃娃机上分,仍是游戏结果等信息,最后并对机器的数据返回进行 CRC16 校验。
static void handleRecv() { try { for (; ; ) { int len = inputStream.available(); if (len <= 0) { break; } len = inputStream.read(buf, bufReadOffset, buf.length - bufReadOffset); //Log.d("serialPort", String.format("read: %s", byteToHex(buf, bufReadOffset, len))); bufReadOffset += len; for (; ; ) { if (bufParseEnd == -1) { for (; bufParseStart < bufReadOffset; bufParseStart++) { if (buf[bufParseStart] == (byte) 0xAA) { bufParseEnd = bufParseStart + 1; break; } } } if (bufParseEnd != -1) { for (; bufParseEnd < bufReadOffset; bufParseEnd++) { if (buf[bufParseEnd] == (byte) 0xAA) { bufParseStart = bufParseEnd; bufParseEnd += 1; continue; } if (buf[bufParseEnd] == (byte) 0xDD) { if (bufParseEnd - bufParseStart >= 5) { bufParseEnd += 1; byte size = buf[bufParseStart + 1]; byte index = buf[bufParseStart + 2]; byte cmd = buf[bufParseStart + 3]; byte check = (byte) (size ^ index ^ cmd); for (int i = bufParseStart + 4; i < bufParseEnd - 2; i++) { check ^= buf[i]; } if (check == buf[bufParseEnd - 2]) { //Log.d("serialPort", String.format("protocol: %s, size: %d, index: %d, cmd: %d, check: %d, data: %s", byteToHex(buf, bufParseStart, bufParseEnd - bufParseStart), size, index, cmd, check, byteToHex(buf, bufParseStart + 4, size - 3))); switch (cmd) { // 心跳 case heartBeatCmd: { } break; // 上分 case addCoinsCmd: { } break; // 游戏结果 case gameResultCmd: { boolean gift = buf[bufParseStart + 7] != 0; IoT.sendGameResult(gift); if (gift) { // 发送用户信息到中控,进行排行榜显示 WSSender.getInstance().sendUserInfo(currentUser, currentHeadUrl, currentNickname); } } break; default: break; } } } bufParseStart = bufParseEnd; bufParseEnd = -1; break; } } } if (bufParseStart >= bufReadOffset || bufParseEnd >= bufReadOffset) { break; } } if (bufReadOffset == buf.length) { System.arraycopy(buf, bufParseStart, buf, 0, bufReadOffset - bufParseStart); if (bufParseEnd != -1) { bufParseEnd -= bufParseStart; bufReadOffset = bufParseEnd; } else { bufReadOffset = 0; } bufParseStart = 0; } } } catch (IOException e) { e.printStackTrace(); } }
在中控和娃娃机进行通讯的方式咱们是选择 websocket 进行的。中控端是 server,而后娃娃机是 client。
class WSServer extends WebSocketServer { private MainActivity mainActivity; public void setMainActivity(MainActivity mainActivity) { this.mainActivity = mainActivity; } WSServer(InetSocketAddress address) { super(address); } @Override public void onOpen(WebSocket conn, ClientHandshake handshake) { mainActivity.appendAndScrollLog("客户端:" + conn.getRemoteSocketAddress() + " 已链接\n"); } @Override public void onClose(WebSocket conn, int code, String reason, boolean remote) { mainActivity.appendAndScrollLog("客户端:" + conn.getRemoteSocketAddress() + " 已断开\n"); } @Override public void onMessage(WebSocket conn, final String message) { Observable.create(new ObservableOnSubscribe<SocketMessage>() { @Override public void subscribe(ObservableEmitter<SocketMessage> emitter) throws Exception { final SocketMessage socketMessage = JsonIterator.deserialize(message, SocketMessage.class); emitter.onNext(socketMessage); emitter.onComplete(); } }) .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<SocketMessage>() { @Override public void accept(SocketMessage socketMessage) throws Exception { if (socketMessage.getCode() == SocketMessage.TYPE_USER) { // 夹到娃娃 } else if (socketMessage.getCode() == SocketMessage.TYPE_SAY_HELLO) { // 链接招呼语 } } }); } @Override public void onError(WebSocket conn, Exception ex) { } @Override public void onStart() { } }
appendAndScrollLog("初始化WebSocket服务...\n"); WSServer wsServer = new WSServer(18104); wsServer.setMainActivity(MainActivity.this); wsServer.setConnectionLostTimeout(5); wsServer.setReuseAddr(true); wsServer.start(); appendAndScrollLog("初始化WebSocket服务完成\n");
在 client 端,目前须要作的人物有断开重连以及数据发送的操做。断开重连的时候须要在新的子线程中进行,不然会报以下错误:
You cannot initialize a reconnect out of the websocket thread. Use reconnect in another thread to insure a successful cleanup
所以,咱们每次断开从新的时候是须要在新的子线程中进行的。除此以外,在发送数据的时候,若是恰好 socket 没有链接上,那么发送数据是会报异常的,所以咱们有数据要发送的时候若是 socket 没有链接,那么就先缓存到本地,等到 socket 链接上以后再把滞留的数据一次性发送出去。
implementation 'org.java-websocket:Java-WebSocket:1.3.9'
class WSClient extends WebSocketClient { private static final String TAG = "WSClient"; private static WSClient instance; private static URI sUri; private WSReceiver mWSReceiver; private Disposable mReconnectDisposable; private ConnectCallback mConnectCallback; /** * step 1:须要先调用,设置 url * @param uri */ public static void setUri(URI uri){ sUri = uri; } /** * step 1: * 须要先调用,设置服务端的 url * @param ipAddress * @param port */ public static void setUri(String ipAddress,int port){ try { sUri = new URI(String.format("ws://%s:%d", ipAddress, port)); } catch (URISyntaxException e) { e.printStackTrace(); } } public static WSClient getInstance(){ if (instance == null) { synchronized (WSClient.class){ if (instance == null) { instance = new WSClient(sUri); } } } return instance; } /** * step 2:链接 websocket */ public void onConnect(){ setConnectionLostTimeout(Config.instance.webSocketTimeoutSeconds); setReuseAddr(true); connect(); } private WSClient(URI server) { super(server); // 初始化消息发送者 WSSender.getInstance().setWSClient(this); // 初始化消息接收者 mWSReceiver = new WSReceiver(); mWSReceiver.setWSClient(this); mWSReceiver.setWSSender(WSSender.getInstance()); } @Override public void onOpen(ServerHandshake handshakedata) { Log.d(TAG, "onOpen: "); MainActivity.appendAndScrollLog("websocket 已链接\n"); Observable.just("") .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { if (mConnectCallback != null) { mConnectCallback.onWebsocketConnected(); } } }); // 清除滞留的全部消息 WSSender.getInstance().clearAllMessage(); } @Override public void onMessage(String message) { Log.d(TAG, "onMessage: "); mWSReceiver.handlerMessage(message); } @Override public void onClose(int code, String reason, boolean remote) { Log.d(TAG, "onClose: "); MainActivity.appendAndScrollLog(String.format("websocket 已断开,断开缘由:%s\n",reason)); Observable.just("") .subscribeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<Object>() { @Override public void accept(Object o) throws Exception { if (mConnectCallback != null) { mConnectCallback.onWebsocketClosed(); } } }); onReconnect(); } @Override public void onError(Exception ex) { if (ex != null) { Log.d(TAG, "onError: "+ex.getMessage()); MainActivity.appendAndScrollLog(String.format("websocket 出现错误,错误缘由:%s\n",ex.getMessage())); } onReconnect(); } public void onReconnect() { if (mReconnectDisposable != null && !mReconnectDisposable.isDisposed()){ return; } mReconnectDisposable = Observable.timer(1, TimeUnit.SECONDS) .subscribeOn(Schedulers.io()) .subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { Log.d(TAG, "websocket reconnect"); WSClient.this.reconnect(); mReconnectDisposable.dispose(); } }); } public void setConnectCallback(ConnectCallback mConnectCallback) { this.mConnectCallback = mConnectCallback; } public interface ConnectCallback{ void onWebsocketConnected(); void onWebsocketClosed(); } }
/** * Created by runla on 2018/10/26. * 文件描述:Websocket 的消息发送者 */ public class WSSender { private static final String TAG = "WSSender"; public static final int MAX_MESSAGE_COUNT = 128; private static WSSender instance; private WSClient mWSClientManager; // 消息队列 private LinkedList<String> mMessageList = new LinkedList<>(); private WSSender() { } public static WSSender getInstance() { if (instance == null) { synchronized (WSSender.class) { if (instance == null) { instance = new WSSender(); } } } return instance; } public void setWSClient(WSClient wsClientManager) { this.mWSClientManager = wsClientManager; } /** * 发送全部滞留的消息 */ public void clearAllMessage() { if (mWSClientManager == null) { return; } while (mMessageList.size() > 0 && mMessageList.getFirst() != null) { Log.d(TAG, "sendMessage: " + mMessageList.size()); mWSClientManager.send(mMessageList.getFirst()); mMessageList.removeFirst(); } } /** * 发送消息,若是消息发送不出去,那么就等到链接成功后再次尝试发送 * * @param msg * @return */ public boolean sendMessage(String msg) { if (mWSClientManager == null) { throw new NullPointerException("websocket client is null"); } if (TextUtils.isEmpty(msg)) { return false; } // 将须要发送的数据添加到队列的尾部 mMessageList.addLast(msg); while (mMessageList.size() > 0 && mMessageList.getFirst() != null) { Log.d(TAG, "sendMessage: " + mMessageList.size()); if (!mWSClientManager.isOpen()) { // 尝试重连 mWSClientManager.onReconnect(); break; } else { mWSClientManager.send(mMessageList.getFirst()); mMessageList.removeFirst(); } } // 若是消息队列中超过咱们设置的最大容量,那么移除最早添加进去的消息 if (mMessageList.size() >= MAX_MESSAGE_COUNT) { mMessageList.removeFirst(); } return false; } }
/** * Created by runla on 2018/10/26. * 文件描述:Websocket 的消息接收者 */ public class WSReceiver { private WSClient mWSClientManager; private WSSender mWSSender; private OnMessageCallback onMessageCallback; public WSReceiver() { } public void setWSClient(WSClient mWSClientManager) { this.mWSClientManager = mWSClientManager; } public void setWSSender(WSSender mWSSender) { this.mWSSender = mWSSender; } /** * 处理接收消息 * @param message */ public void handlerMessage(String message){ if (onMessageCallback != null){ onMessageCallback.onHandlerMessage(message); } } public void setOnMessageCallback(OnMessageCallback onMessageCallback) { this.onMessageCallback = onMessageCallback; } public interface OnMessageCallback{ void onHandlerMessage(String message); } }
appendAndScrollLog("初始化WebSocket客户端...\n"); WSClient.setUri( Config.instance.centralServerAddress, Config.instance.webSocketPort); WSClient.getInstance().onConnect(); WSClient.getInstance().setConnectCallback(MainActivity.this); appendAndScrollLog("初始化WebSocket客户端完成\n");
// 清除滞留的全部消息 WSSender.getInstance().clearAllMessage(); // 发送消息 WSSender.getInstance().sendMessage(msg);
在中控端,咱们须要显示排行版,用来显示夹中娃娃机的用户在本月及本周夹中娃娃的排行,所以咱们须要再中控端保存用户的夹中娃娃数量以及我的的其余信息,GreenDAO 是一款开源的面向 Android 的轻便、快捷的 ORM 框架,将 Java 对象映射到 SQLite 数据库中,咱们操做数据库的时候,不在须要编写复杂的 SQL语句, 在性能方面,GreenDAO 针对 Android 进行了高度优化, 最小的内存开销 、依赖体积小,同时仍是支持数据库加密。关于 GreenDAO 的用法我就不在这里作,具体的用法能够参考官网 GreenDAO。
关于整个系统的架构搭建过程当中遇到了好多坑,以上是我为这个项目提供的部分解决方案,当前所有的是不可能都放写出来的,此项目目前已经在西安和成都等地都有门店点了,据反馈,利润极大,不过这种类型的项目红利期不会太长,估计也是 2~3 年左右吧。若是有须要咱们为 口红机开发 或者是 娃娃机开发 提供解决方案的,能够联系咱们,目前咱们在这个方面已经有相对较为成熟的解决方案了。