rxjava2 案列之并发执行任务

rxjava2 案列之并发执行任务

需求

有一批ip节点,须要并发去ping,而且拿到每个ip ping开销的时间。java

private List<String> mPingNodeList = new ArrayList<>(); // ip节点 private List<PingNodeBean> pingNodeBeanList = new ArrayList<>(); // ip和ping花费的时间,可不用管 private long startMillis; private void initData() { // ping的ip mPingNodeList.add("www.baidu.com"); mPingNodeList.add("234"); mPingNodeList.add("www.qq.com"); mPingNodeList.add("www.sina.com.cn"); mPingNodeList.add("345"); mPingNodeList.add("www.google.com.hk"); mPingNodeList.add("123"); } private List<String> mPingNodeList = new ArrayList<>(); // ip节点 private List<PingNodeBean> pingNodeBeanList = new ArrayList<>(); // ip和ping花费的时间,可不用管 private long startMillis; private void initData() { // ping的ip mPingNodeList.add("www.baidu.com"); mPingNodeList.add("234"); mPingNodeList.add("www.qq.com"); mPingNodeList.add("www.sina.com.cn"); mPingNodeList.add("345"); mPingNodeList.add("www.google.com.hk"); mPingNodeList.add("123"); }
使用线程池
private ExecutorService executorService = Executors.newCachedThreadPool(); private void useExecutors() { pingNodeBeanList.clear(); startMillis = System.currentTimeMillis(); for (final String node : mPingNodeList) { executorService.submit(new Callable<PingNodeBean>() { @Override public PingNodeBean call() throws Exception { double time = ping(node); Log.e(TAG, "ip=" + node + "time=" + time); PingNodeBean pingNodeBean = new PingNodeBean(node, time); pingNodeBeanList.add(pingNodeBean); if (pingNodeBeanList.size() == mPingNodeList.size()) { // 对全部节点进行排序 long payTime = System.currentTimeMillis() - startMillis; Log.e(TAG, "花费的时间是" + payTime); } return pingNodeBean; } }); } } private ExecutorService executorService = Executors.newCachedThreadPool(); private void useExecutors() { pingNodeBeanList.clear(); startMillis = System.currentTimeMillis(); for (final String node : mPingNodeList) { executorService.submit(new Callable<PingNodeBean>() { @Override public PingNodeBean call() throws Exception { double time = ping(node); Log.e(TAG, "ip=" + node + "time=" + time); PingNodeBean pingNodeBean = new PingNodeBean(node, time); pingNodeBeanList.add(pingNodeBean); if (pingNodeBeanList.size() == mPingNodeList.size()) { // 对全部节点进行排序 long payTime = System.currentTimeMillis() - startMillis; Log.e(TAG, "花费的时间是" + payTime); } return pingNodeBean; } }); } }
使用rxjava2
private void startChooseNode() { pingNodeBeanList.clear(); startMillis = System.currentTimeMillis(); Observable observable = Observable.fromIterable(mPingNodeList); observable = observable.concatMap(new Function<String, Observable<PingNodeBean>>() { @Override public Observable<PingNodeBean> apply(String o) throws Exception { return Observable.just(o).observeOn(Schedulers.newThread()).map(new Function<String, PingNodeBean>() { @Override public PingNodeBean apply(String node) throws Exception { double time = ping(node); PingNodeBean pingNodeBean = new PingNodeBean(node, time); return pingNodeBean; } }); } }); observable = observable.toList().toObservable().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); observable.subscribe(new Consumer<List<PingNodeBean>>() { @Override public void accept(List<PingNodeBean> list) throws Exception { long payTime = System.currentTimeMillis() - startMillis; Log.e(TAG, "花费的时间是" + payTime); for (PingNodeBean pingNodeBean : list) { Log.e(TAG, pingNodeBean.getIp() + "time=" + pingNodeBean.getTime()); } } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { throwable.printStackTrace(); } }); } private void startChooseNode() { pingNodeBeanList.clear(); startMillis = System.currentTimeMillis(); Observable observable = Observable.fromIterable(mPingNodeList); observable = observable.concatMap(new Function<String, Observable<PingNodeBean>>() { @Override public Observable<PingNodeBean> apply(String o) throws Exception { return Observable.just(o).observeOn(Schedulers.newThread()).map(new Function<String, PingNodeBean>() { @Override public PingNodeBean apply(String node) throws Exception { double time = ping(node); PingNodeBean pingNodeBean = new PingNodeBean(node, time); return pingNodeBean; } }); } }); observable = observable.toList().toObservable().subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()); observable.subscribe(new Consumer<List<PingNodeBean>>() { @Override public void accept(List<PingNodeBean> list) throws Exception { long payTime = System.currentTimeMillis() - startMillis; Log.e(TAG, "花费的时间是" + payTime); for (PingNodeBean pingNodeBean : list) { Log.e(TAG, pingNodeBean.getIp() + "time=" + pingNodeBean.getTime()); } } }, new Consumer<Throwable>() { @Override public void accept(Throwable throwable) throws Exception { throwable.printStackTrace(); } }); }

1.先使用fromIterable遍历须要ping的ip集合
2.而后使用concatMap按照ping集合的顺序一个一个执行,在concatMap方法里面,咱们采用map操做符来将String转变为PingNodeBean对象,而且在map里面进行ping操做,拿到ping花销的时间,封装对象返回.
3. toList将全部的返回对象放进一个list集合中。node