上一节编写了一个使用REST服务进行对象存取得单机程序git
本节接续对其进行扩展,为了知足加入新的节点就能够自由扩展服务器集群的需求,咱们须要将单机版的接口与数据存储进行解耦.github
让接口与数据存储成为互相独立的服务节点,二者互相合做提供对象存储服务。这样节点就能够按照须要添加,而且分布在不一样的服务器上。api
架构图服务器
如图 接口层服务提供对外的REST接口,数据服务层则提供数据的存储功能.架构
接口服务和数据服务之间有两种接口:1 REST服务接口 2 消息队里,采用RabbitMQ(也能够采用其余消息队列甚至是其余方式)框架
每种接口有格子的特色和用处 REST适合大数量的数据传输 消息队列则能够知足1:1 1:n n:1 n:n的消息传输ide
心跳消息测试
dataServer每隔5秒 经过消息队列想全部 apiServer发送心跳包spa
apiServer接收心跳包 而且每隔10秒移除未及时更新心跳包的dataServer设计
1 dataServer hearbeat 2 func StartHeartbeat() { 3 q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) 4 defer q.Close() 5 for { 6 q.Publish("apiServers", os.Getenv("LISTEN_ADDRESS")) 7 time.Sleep(5 * time.Second) 8 } 9 } 10 11 apiServer hearbeat 12 func ListenHeartbeat() { 13 q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) 14 defer q.Close() 15 q.Bind("apiServers") 16 c := q.Consume() 17 go removeExpiredDataServer() 18 for msg := range c { 19 dataServer, e := strconv.Unquote(string(msg.Body)) 20 if e != nil { 21 panic(e) 22 } 23 mutex.Lock() 24 dataServers[dataServer] = time.Now() 25 mutex.Unlock() 26 } 27 } 28 29 func removeExpiredDataServer() { 30 for { 31 time.Sleep(5 * time.Second) 32 mutex.Lock() 33 for s, t := range dataServers { 34 if t.Add(10 * time.Second).Before(time.Now()) { 35 delete(dataServers, s) 36 } 37 } 38 mutex.Unlock() 39 } 40 }
定位消息
apiServer locate func Locate(name string) string { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) q.Publish("dataServers", name) c := q.Consume() //广播消息 等待1秒内的回复信息 go func() { time.Sleep(time.Second) q.Close() }() msg := <-c s, _ := strconv.Unquote(string(msg.Body)) return s } dataServer locate func Locate(name string) bool { _, err := os.Stat(name) return !os.IsNotExist(err) } func StartLocate() { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) defer q.Close() q.Bind("dataServers") c := q.Consume() //绑定消息队列 等待询问定位信息 而后在本地查找 若找到则返回监听地址 for msg := range c { object, e := strconv.Unquote(string(msg.Body)) if e != nil { panic(e) } if Locate(os.Getenv("STORAGE_ROOT") + "/objects/" + object) { q.Send(msg.ReplyTo, os.Getenv("LISTEN_ADDRESS")) } } }
put get操做与上节相似
代码地址 https://github.com/stuarthu/go-implement-your-object-storage/tree/master/chapter2
由于测试机器有限 须要在一台机器上开启6个apiServer 2个dataServer 和 开启RabbitMQ
因此咱们须要在同一台机器上绑定多个地址
执行命令以下:
安装配置rabbitmq 这里就再也不介绍了
而后建立各个apiServer须要的存储文件夹 开启6个apiServer 2个dataServer 对应模拟不用的ip地址
咱们上传一个test2对象
下面进行测试
咱们上传一个叫作test2的对象 再locate定位他在哪一个数据服务节点 而后换另外一个数据服务节点查询 看看是否成功
OK 所有成功
下一节 解决这样一个问题 若是咱们屡次重复上传同一个对象 那么这个对象可能在全部服务器上都有存储 甚至多是不同的数据(存储是随机选择数据节点服务器)
咱们添加版本控制来解决这个问题 记录版本等数据的重要信息 也就是元数据