这篇笔记主要是基于文档展开一下 core.async 在 ClojureScript 当中的基本用法.
具体的内容能够看原文章, 已经比较详细了, 不少在 API 文档的 demo 当中.
关于基础知识跟 cljs 跟 clj 的区别, 这篇文章就不涉及了.html
以前用到 core.async , 发现本身中间不少理解缺失了, 趁有时间赶忙看一下.
从 API 文档能够看到 core.async 的函数语义是比较丰富的, 几十个函数,
我顺着看了一圈, 整理下来大体是几个功能, 大体分红几块.node
多对一git
Promise.all
多选一github
Promise.race
一拆二/过滤数组
一对多promise
原本个人触发点是想看 core.async 是都能对应到 Promise 经常使用的功能,
这样看下来, core.async 功能是过于丰富了, 反而有些用不上.
因为我对 Go 熟悉度有限, 很差跟 Go Channel 作对比了.async
后面逐个看一下示例. 为了方便演示, 我增长了两个辅助函数,函数
fake-task-chan
, 生成一个随机定时任务, 返回一个 channeldisplay-all
, 打印一个 channel 全部返回的非 nil 数据, nil 表示结束.Promise.all
(defn demo-all [] (go ; 首先 tasks 获得向量, 内部函数多个 channel (let [tasks (->> (range 10) (map (fn [x] (fake-task-chan (str "rand task " x) 10 x))))] (println "result" ; loop 函数逐个取 tasks 的值, 从头取, 一次次 recur, 直到取完, 结束 (loop [acc [], xs tasks] (if (empty? xs) acc ; <! 表示从 channel 取数据, 在 go block 内阻塞逻辑 (recur (conj acc (<! (first xs))) (rest xs))))))))
因为任务在 loop 以前已经开始了, 相似 Promise.all
的效果.
一个个等待结果, 最终就是所有的值, 耗时就是最长的那个等待的时间.oop
=>> node target/server.js rand task 0 will take 0 secs rand task 1 will take 1 secs rand task 2 will take 2 secs rand task 3 will take 3 secs rand task 4 will take 9 secs rand task 5 will take 6 secs rand task 6 will take 7 secs rand task 7 will take 1 secs rand task 8 will take 2 secs rand task 9 will take 9 secs rand task 0 finished rand task 1 finished rand task 7 finished rand task 2 finished rand task 8 finished rand task 3 finished rand task 5 finished rand task 6 finished rand task 4 finished rand task 9 finished result [0 1 2 3 4 5 6 7 8 9]
能够看到最终以数组形式返回了每一个 channel 返回的数据了..net
我其实不大清楚这个 map 用在什么样的场景, 就是取两个 channel 计算获得新的数字.
(defn demo-map [] (let [<c1 (to-chan! (range 10)) <c2 (to-chan! (range 100 120)) <c3 (async/map + [<c1 <c2])] (display-all <c3)))
因此就是 0 + 100
1 + 101
... 获得 10 个数据
=>> node target/server.js 100 102 104 106 108 110 112 114 116 118 nil
整体上仍是多个 channel 合并成一个了.
merge 就是把多个 channel 的数据合并到一个, 字面意义的意思.
从获得的新的 channel, 能够获取到原来 channel 的数据.
(defn demo-merge [] (let [<c1 (chan), <c2 (chan), <c3 (async/merge [<c1 <c2])] (go (>! <c1 "a") (>! <c2 "b")) (display-all <c3)))
因此从 c3 就能拿到写到原来的两个 channel 的数据了,
=>> node target/server.js a b
mix 跟 merge 很类似, 区别是中间多了一个控制层, 定义成 mix-out
,
经过 admix
unmix
两个函数能够调整 mix-out
上的关系,
这个例子当中
(defn demo-mix [] (let [<c0 (chan) <c1 (async/to-chan! (range 40)) <c2 (async/to-chan! (range 100 140)) mix-out (async/mix <c0)] ; mix 过来两个 channel (async/admix mix-out <c1) (async/admix mix-out <c2) (go ; 先取 20 个数据打印 (doseq [x (range 20)] (println "loop1" (<! <c0))) (println "removing c2") ; 去掉那个数字特别大的 channel (async/unmix mix-out <c2) ; 再取 20 个数据打印 (doseq [x (range 20)] (println "loop2" (<! <c0))))))
获得结果,
=>> node target/server.js loop1 0 loop1 100 loop1 1 loop1 101 loop1 2 loop1 102 loop1 3 loop1 103 loop1 104 loop1 4 loop1 105 loop1 5 loop1 106 loop1 6 loop1 107 loop1 108 loop1 109 loop1 110 loop1 7 loop1 8 removing c2 loop2 111 loop2 9 loop2 10 loop2 11 loop2 12 loop2 13 loop2 14 loop2 15 loop2 16 loop2 17 loop2 18 loop2 19 loop2 20 loop2 21 loop2 22 loop2 23 loop2 24 loop2 25 loop2 26 loop2 27
能够看到刚开始的时候, 从返回的 channel 能够获取到两个来源 channel 的数据,
进行一次 unmix 以后, 大数的来源不见了, 后面基本上是小的数字.
这个顺序看上去是有一些随机性的, 甚至 unmix 还有一次大数的打印, 后面稳定了.
注意 mix-out
只是用于控制, 获取数据在代码里仍是要经过 c0
获取的.
Promise.race
这个比较清晰的
(defn demo-alts [] (go (let [<search (fake-task-chan "searching" 20 "searched x") <cache (fake-task-chan "looking cache" 15 "cached y") <wait (fake-task-chan "timeout" 15 nil) ; 数组里边三个可选的 channel [v ch] (alts! [<cache <search <wait])] (if (= ch <wait ) (println "final: timeout") (println "get result:" v)))))
就是随机的时间, 取返回最快的结果. 我多跑几回
=>> node target/server.js searching will take 3 secs looking cache will take 14 secs timeout will take 9 secs searching finished get result: searched x ^C =>> node target/server.js searching will take 10 secs looking cache will take 1 secs timeout will take 4 secs looking cache finished get result: cached y timeout finished searching finished ^C =>> node target/server.js searching will take 19 secs looking cache will take 4 secs timeout will take 1 secs timeout finished final: timeout looking cache finished ^C =>> node target/server.js searching will take 0 secs looking cache will take 6 secs timeout will take 1 secs searching finished get result: searched x timeout finished looking cache finished ^C
能够看到打印的结果都是最短期结束的任务对应的返回值.
timeout 是这种状况当中比较经常使用的一个定时器, 控制超时.
alt! 跟 alts! 就是相似了, 主要是语法比较丰富一点,
(defn demo-alt-syntax [] (let [<search1 (fake-task-chan "search1" 10 "search1 found x1") <search2 (fake-task-chan "search2" 10 "search2 found x2") <log (chan) <wait (fake-task-chan "timeout" 10 nil)] (go (loop [] (let [t (rand-int 10)] (println "read log waits" t) (<! (timeout (* 1000 t))) (println "got log" (<! <log)) (recur)))) (go (println "result" (async/alt! ; 匹配单个 channel 的写法 <wait :timeout ; 这个是往 channel 发送消息的写法, 发送也是等待对方读取, 也受时间影响 ; 这个两层数组是挺邪乎的写法... [[<log :message]] :sent-log ; 这个匹配向量包裹的多个 channel, 后面经过 ch 能够区分命中的 channel [<search1 <search2] ([v ch] (do (println "got" v "from" ch) :hit-search)))))))
直接多跑几回了, 效果跟上边一个差很少的,
=>> node target/server.js search1 will take 3 secs search2 will take 7 secs timeout will take 3 secs read log waits 8 search1 finished timeout finished got search1 found x1 from #object[cljs.core.async.impl.channels.ManyToManyChannel] result :hit-search search2 finished ^C =>> node target/server.js search1 will take 2 secs search2 will take 0 secs timeout will take 4 secs read log waits 2 search2 finished got search2 found x2 from #object[cljs.core.async.impl.channels.ManyToManyChannel] result :hit-search search1 finished timeout finished ^C =>> node target/server.js search1 will take 9 secs search2 will take 0 secs timeout will take 9 secs read log waits 6 search2 finished got search2 found x2 from #object[cljs.core.async.impl.channels.ManyToManyChannel] result :hit-search search1 finished timeout finished ^C =>> node target/server.js search1 will take 2 secs search2 will take 2 secs timeout will take 2 secs read log waits 9 search1 finished search2 finished timeout finished got search1 found x1 from #object[cljs.core.async.impl.channels.ManyToManyChannel] result :hit-search ^C =>> node target/server.js search1 will take 6 secs search2 will take 3 secs timeout will take 1 secs read log waits 6 timeout finished result :timeout search2 finished
看文档好像就是直接这样拆成两个的, 对应 true/false,
(defn demo-split [] (let [<c0 (to-chan! (range 20))] (let [[<c1 <c2] (async/split odd? <c0)] (go (display-all <c2 "from c2")) (go (display-all <c1 "from c1")))))
而后获得数据就是分别从不一样的 channel 才能获得了, 奇书和偶数,
=>> node target/server.js from c2 0 from c1 1 from c1 3 from c2 2 from c2 4 from c2 6 from c1 5 from c1 7 from c1 9 from c2 8 from c2 10 from c2 12 from c1 11 from c1 13 from c1 15 from c2 14 from c2 16 from c2 18 from c1 17 from c1 19 from c1 nil from c2 nil
这个 pipeline 就是中间插入一个函数, 例子里是 filter, 直接进行过滤.
(defn demo-pipeline-filter [] (let [<c1 (to-chan! (range 20)), <c2 (chan)] (async/pipeline 1 <c2 (filter even?) <c1) (display-all <c2)))
效果就是从 c2 取数据时, 只剩下偶数的值了,
=>> node target/server.js 0 2 4 6 8 10 12 14 16 18 nil
transducer 比较高级一点, 用到高阶函数跟比较复杂的抽象,
可是简单的功能写出来, 主要发挥做用的函数那个 (filter even?)
,
柯理化的用法, 返回函数, 而后被 comp 拿去组合,
(defn demo-transduce-filter [] (let [<c1 (to-chan! (range 20)), <c2 (chan 1 (comp (filter even?)))] (async/pipe <c1 <c2) (display-all <c2)))
获得的结果跟上边是同样的, 都是过滤出偶数,
=>> node target/server.js 0 2 4 6 8 10 12 14 16 18 nil
就是把一个数据变成多份, 供多个 channel 过来取数据,
(defn demo-mult [] (let [<c0 (async/to-chan! (range 10)), <c1 (chan), <c2 (chan), ; mult-in 也是一个控制, 而不是一个 channel mult-in (async/mult <c0)] (async/tap mult-in <c1) (async/tap mult-in <c2) (display-all <c1 "from c1") (comment "need to take from c2, otherwise c0 is blocked") (display-all <c2 "from c2")))
能够看到运行之后就是 c1 c2 分别拿到一份同样的数据了,
=>> node target/server.js from c1 0 from c2 0 from c1 1 from c1 2 from c2 1 from c2 2 from c1 3 from c1 4 from c2 3 from c2 4 from c1 5 from c1 6 from c2 5 from c2 6 from c1 7 from c1 8 from c2 7 from c2 8 from c1 9 from c1 nil from c2 9 from c2 nil
大概的场景应该就是一个数据发布到多个 channel 去吧.
不过这个跟监听还有点不同, 监听广播时发送者是非阻塞的, 这边是 channel 是阻塞的.
代码后续会同步到 github 去 https://github.com/worktools/... .
这边主要仍是 API 的用法, 业务场景当中使用 core.async 会复杂一些,
好比 debounce 的逻辑用 timeout 搭配 loop 处理就比较顺,
具体代码参考, https://zhuanlan.zhihu.com/p/...
但通常都是会搅尾递归在里边, 进行逻辑控制和状态的传递.
网上别人的例子加上业务逻辑还会复杂不少不少...
但总的说, Clojure 提供的 API, 还有抽象能力, 应该是能够用来应对不少场景的.