本节内容:Lect 2 RPC and Threadshtml
线程:Threads allow one program to (logically) execute many things at once.
The threads share memory. However, each thread includes some per-thread state: program counter, registers, stack.
java
下面以go语言写一个爬虫做为例子来介绍线程:golang
Go example: crawler.go数组
package main import ( "fmt" "sync" ) // Several solutions to the crawler exercise from the Go tutorial: https://tour.golang.org/concurrency/10 type fakeResult struct { body string urls []string } // fakeFetcher is Fetcher that returns canned results. type fakeFetcher map[string]*fakeResult // fetcher is a populated fakeFetcher. var fetcher = fakeFetcher{ "http://golang.org/": &fakeResult{ "Title: The Go Programming Language", []string{ "http://golang.org/pkg/", "http://golang.org/cmd/", }, }, "http://golang.org/pkg/": &fakeResult{ "Title: Packages", []string{ "http://golang.org/", "http://golang.org/cmd/", "http://golang.org/pkg/fmt/", "http://golang.org/pkg/os/", }, }, "http://golang.org/pkg/fmt/": &fakeResult{ "Title: Package fmt", []string{ "http://golang.org/", "http://golang.org/pkg/", }, }, "http://golang.org/pkg/os/": &fakeResult{ "Title: Package os", []string{ "http://golang.org/", "http://golang.org/pkg/", }, }, } type Fetcher interface { Fetch(urlstring string) (urllist []string, err error) // Fetch(urlstring) method returns a slice of URLs found on the page. } func (f fakeFetcher) Fetch(urlstring string) ([]string, error) { if res, ok := f[urlstring]; ok { //https://tour.golang.org/flowcontrol/6 fmt.Printf("found: %s\n", urlstring) return res.urls, nil } fmt.Printf("missing: %s\n", urlstring) return nil, fmt.Errorf("not found: %s", urlstring) } // ###### Serial crawler ###### func Serial(url string, fetcher Fetcher, fetched map[string]bool) { if fetched[url] { return } fetched[url] = true urls, err := fetcher.Fetch(url) if err != nil { return } for _, u := range urls { Serial(u, fetcher, fetched) } return } // ###### Concurrent crawler with shared state and Mutex ###### func makeState() *fetchState { f := &fetchState{} f.fetched = make(map[string]bool) return f } type fetchState struct { mu sync.Mutex fetched map[string]bool } func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) { f.mu.Lock() if f.fetched[url] { f.mu.Unlock() return } f.fetched[url] = true f.mu.Unlock() urls, err := fetcher.Fetch(url) if err != nil { return } var done sync.WaitGroup for _, u := range urls { done.Add(1) go func(u string) { defer done.Done() ConcurrentMutex(u, fetcher, f) }(u) } done.Wait() return } // ###### Concurrent crawler with channels ###### func worker(url string, ch chan []string, fetcher Fetcher) { urls, err := fetcher.Fetch(url) if err != nil { ch <- []string{} } else { ch <- urls } } func master(ch chan []string, fetcher Fetcher) { n := 1 fetched := make(map[string]bool) for urls := range ch { for _, u := range urls { if fetched[u] == false { fetched[u] = true n += 1 go worker(u, ch, fetcher) } } n -= 1 if n == 0 { break } } } func ConcurrentChannel(url string, fetcher Fetcher) { ch := make(chan []string) go func() { ch <- []string{url} }() master(ch, fetcher) } // ###### main ###### func main() { fmt.Printf("=== Serial===\n") Serial("http://golang.org/", fetcher, make(map[string]bool)) //Serial version of crawler fmt.Printf("=== ConcurrentMutex ===\n") ConcurrentMutex("http://golang.org/", fetcher, makeState()) fmt.Printf("=== ConcurrentChannel ===\n") ConcurrentChannel("http://golang.org/", fetcher) }
为了简便起见,这其实只是一个假的爬虫......并无涉及网络访问,它的做用就是在fetcher中创建一个string->fakeResult类型的hash table,表示每一个网页上的连接列表,并经过爬虫函数读取它们。为了演示go语言的并发,代码中实现了三种函数:Serial,ConcurrentMutex,ConcurrentChannel网络
在这段代码中,首先定义了一个接口Fetcher(go中接口的概念和java类似),其中有一个方法Fetch,用于在fetcher中返回urlstring所对应的连接列表。和java不同,go语言中方法和函数不是一个概念:方法是面向对象中的概念。go中方法和函数最大的区别就是方法带有一个接收器(Fetch()中的f fakeFetcher参数),表示调用f对象的Fetch()方法(用法即some_obj_f.Fetch(url),这样就能够自动适配不一样对象的同名方法;而函数是面向过程当中的概念,函数只有输入参数和输出参数,和对象无关。多线程
在58行这里的if有个神奇的用法,参考 https://tour.golang.org/flowcontrol/6并发
接下来咱们先来看serial的版本。它的输入参数包括根域名url,fetcher(前面提到过的hash table),和一个bool数组fetched(用来记录哪些网站被访问过了)。注意163行这里有个神奇的用法make(),参考https://www.jianshu.com/p/f01841004810。 serial函数自己比较简单,就不赘述了,基本思路就是对fetcher中的每一个域名,递归抓取它下面的连接(在fakeResult里面)。app
第二个版本是ConcurrentMutextcp
第三个版本是ConcurrentChannel,这个例子中用了Go channel。这部分能够参考http://www.javashuo.com/article/p-rucmpbsr-gp.htmlide
When to use sharing and locks, versus channels?
RPC
基本概念5105都学过了.....这里来看看用go语言如何实现吧。
在5105课上讲过Reliable RPC的概念,讲的是若是在server-client之间若是传输出了故障该怎么办。
17_reliable_comm 1. Reliable RPC: client-server
1.1 Server failure( client 不知道 server 啥时候挂的,是 operation 执行前仍是执行后) Sol: 分三种 operation semantics: Exactly once(保证操做刚好执行一次): impossible to achieve At least once(至少执行过一次): retry At most once(执行过 0 次或 1 次): send request only once 1.2 Client failure( client 已经挂了。 server 不必再执行了,浪费资源) Sol: Extermination: log at client stub and explicitly kill orphans
/ Reincarnation: Divide time into epochs between failures and delete computations from old epochs.
/ Expiration: give each RPC a fixed quantum T. Explicitly request extensions.
At least once适用于如下场景:If it's OK to repeat operations (e.g. read-only op), or if application has its own plan for coping w/ duplicates (which you will need for Lab 1)
at most once的思路是,server RPC code could detect duplicate requests, and returns previous reply instead of re-running the handler(RPC function). 在Lab2中就会用到这个方法。
Q: how to detect a duplicate request?
A: client includes unique ID (XID) when sending each request, and uses the same XID for re-send
server:
if seen[xid]:
r = old[xid]
else
r = handler()
old[xid] = r
seen[xid] = true
可是at most once也有个问题:若是server挂了,致使seen[]丢失了,那么server就不知道哪一个xid曾经接收过了。
exactly once须要在at most once的基础上增长容错协议。这个会在Lab3中用到。
Go RPC is "at-most-once"
STEP1 open TCP connection
STEP2 write request to TCP connection
STEP3 TCP may retransmit, but server's TCP will filter out duplicates
There is no retry in Go code (i.e. will NOT create 2nd TCP connection)
Go RPC code returns an error if it doesn't get a reply, when
perhaps after a timeout (from TCP)
perhaps server didn't see request
perhaps server processed request but server/net failed before reply came back
下面以go语言写的简易key-value storage为例:
Go example: kv.go
package main import ( "fmt" "log" "net" "net/rpc" "sync" ) // RPC request/reply definitions const ( OK = "OK" ErrNoKey = "ErrNoKey" ) type Err string type PutArgs struct { Key string Value string } type PutReply struct { Err Err } type GetArgs struct { Key string } type GetReply struct { Err Err Value string } // Client ------------------------------------------------------- func connect() *rpc.Client { //创建与server的链接 client, err := rpc.Dial("tcp", "127.0.0.1:1234") if err != nil { log.Fatal("dialing:", err) } return client } func get(key string) string { client := connect() args := GetArgs{"subject"} reply := GetReply{} err := client.Call("KV.Get", &args, &reply) //rpc调用server上的函数 if err != nil { log.Fatal("error:", err) } client.Close() //关闭链接 return reply.Value } func put(key string, val string) { client := connect() args := PutArgs{"subject", "6.824"} reply := PutReply{} err := client.Call("KV.Put", &args, &reply) if err != nil { log.Fatal("error:", err) } client.Close() } // Server ------------------------------------------------------- type KV struct { mu sync.Mutex //手动为数据区设置一个锁 data map[string]string } func server() { //创建server kv := new(KV) kv.data = map[string]string{} rpcs := rpc.NewServer() rpcs.Register(kv) l, e := net.Listen("tcp", ":1234") if e != nil { log.Fatal("listen error:", e) } go func() { for { conn, err := l.Accept() if err == nil { go rpcs.ServeConn(conn) } else { break } } l.Close() }() } func (kv *KV) Get(args *GetArgs, reply *GetReply) error { kv.mu.Lock() defer kv.mu.Unlock() val, ok := kv.data[args.Key] if ok { reply.Err = OK reply.Value = val } else { reply.Err = ErrNoKey reply.Value = "" } return nil } func (kv *KV) Put(args *PutArgs, reply *PutReply) error { kv.mu.Lock() defer kv.mu.Unlock() kv.data[args.Key] = args.Value reply.Err = OK return nil } // main ------------------------------------------------------- func main() { server() put("subject", "6.824") fmt.Printf("Put(subject, 6.824) done\n") fmt.Printf("get(subject) -> %s\n", get("subject")) }
逻辑仍是比较简单的...比java thrift简洁多了。
Ref:
https://golang.org/doc/effective_go.html
https://golang.org/pkg/net/rpc/
https://tour.golang.org/concurrency/10
https://www.cnblogs.com/pdev/p/10936485.html