源文档是
core.async
仓库的一个代码文件, 包含大量的教程性质的注释
https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
中间不肯定的两句留了原文, 有读懂的同窗请回复帮我纠正git
这份攻略介绍 core.async
核心的一些概念github
clojure.core.async
namespace 包含了公开的 API.异步
(require '[clojure.core.async :as async :refer :all])
数据经过相似队列的 Channel 来传输, Channel 默认不进行 buffer(长度为 0)
须要生产者和消费者进行约定从而在 Channel 当中传送数据socket
用 chan
能够建立一个不进行 buffer 的 Channel:async
(chan)
传一个数字以建立限定了 buffer 大小的 Channel:ui
(chan 10)
close!
用来关闭 Channel 终结接受消息传入, 已存在的数据依然能够取出
取尽的 Channel 在取值时返回 nil
, nil
是不能直接经过 Channel 发送的!spa
(let [c (chan)] (close! c))
对在通常的 Thread 中, 使用 >!!
(阻塞的 put) 和 <!!
(阻塞的 take)
与 Channel 进行通讯线程
(let [c (chan 10)] (>!! c "hello") (assert (= "hello" (<!! c))) (close! c))
因为是这些调用是阻塞的, 若是尝试把数据放进没有 buffer 的 Channel, 那么整个 Thread 都会被卡住.
因此须要 thread
(比如 future
) 在线程池当中执行代码主体, 而且经过 Channel 传回数据
例子中启动了一个后台任务把 "hello"
放进 Channel, 而后在主线程读取数据code
(let [c (chan)] (thread (>!! c "hello")) (assert (= "hello" (<!! c))) (close! c))
go
代码块和反转控制(IoC) threadgo
是一个宏, 能把它的 body 在特殊的线程池里异步执行
不一样的是原本会阻塞的 Channel 操做会暂停, 不会有线程被阻塞
这套机制封装了事件/回调系统当中须要外部代码的反转控制
在 go
block 内部, 咱们使用 >!
(put
) 和 <!
(take
)orm
这里把前面 Channel 的例子转化成 go
block:
(let [c (chan)] (go (>! c "hello")) (assert (= "hello" (<!! (go (<! c))))) (close! c))
这里使用了 go
block 来模拟生产者, 而不是直接用 Thread 和阻塞调用
消费者用 go
block 进行获取, 返回 Channel 做为结果, 对这个 Channel 作阻塞的读取
(原文: The consumer uses a go block to take, then returns a result channel, from which we do a blocking take.)
alts
)Channel 对比队列一个啥手机应用是可以同时等待多个 Channel(像是 socket select)
经过 alts!!
(通常 thread)或者 alts!
(用于 go
block)
能够经过 alts
建立后台线程讲两个任意的 Channel 结合到一块儿alts!!
获取集合中某个操做的来执行
或者是能够 take
的 Channel, 或者是能够 put
[channel value]
的 Channel
并返回包含具体的值(对于 put
返回 nil
)以及获取成功的 Channel:
(原文: alts!!
takes either a set of operations to perform either a channel to take from a [channel value] to put and returns the value (nil for put) and channel that succeeded:)
(let [c1 (chan) c2 (chan)] (thread (while true (let [[v ch] (alts!! [c1 c2])] (println "Read" v "from" ch)))) (>!! c1 "hi") (>!! c2 "there"))
打印内容(在 stdout, 可能你的 REPL 当中看不到):
从 #<ManyToManyChannel ...>
读取 hi
从 #<ManyToManyChannel ...>
读取 there
使用 alts!
来作和 go
block 同样的事情:
(let [c1 (chan) c2 (chan)] (go (while true (let [[v ch] (alts! [c1 c2])] (println "Read" v "from" ch)))) (go (>! c1 "hi")) (go (>! c2 "there")))
由于 go
block 是轻量级的进程而而不是限于 thread, 能够同时有大量的实例
这里建立 1000 个 go
block 在 1000 个 Channel 里同时发送 hi
它们稳当时用 alts!!
来读取
(let [n 1000 cs (repeatedly n chan) begin (System/currentTimeMillis)] (doseq [c cs] (go (>! c "hi"))) (dotimes [i n] (let [[v c] (alts!! cs)] (assert (= "hi" v)))) (println "Read" n "msgs in" (- (System/currentTimeMillis) begin) "ms"))
timeout
建立 Channel 并等待设定的毫秒时间, 而后关闭:
(let [t (timeout 100) begin (System/currentTimeMillis)] (<!! t) (println "Waited" (- (System/currentTimeMillis) begin)))
能够结合 timeout
和 alts
来作有时限的 Channel 等待
这里是花 100ms 等待数据到达 Channel, 没有的话放弃:
(let [c (chan) begin (System/currentTimeMillis)] (alts!! [c (timeout 100)]) (println "Gave up after" (- (System/currentTimeMillis) begin)))
Channel 能够定制不一样的策略来处理 Buffer 填满的状况
这套 API 中提供了两个实用的例子.
使用 dropping-buffer
控制当 buffer 填满时丢弃最新鲜的值:
(chan (dropping-buffer 10))
使用 sliding-buffer
控制当 buffer 填满时丢弃最久远的值:
(chan (sliding-buffer 10))