memcached分布式主要实现的功能为:每份数据即有主服务器又有备份服务器,全部的数据会按照必定的策略保存到不一样的服务器,从而下降外部请求对服务器的压力。要实现这部分功能须要keepalived和repcached这连个插件。html
一、每一个数据经过客户端xmemcached,将数据按照哈希一致性分发到不一样的主服务器。java
二、keepalived的功能主要是用来将外部请求按照必定路由规则分发到主服务器或者备份服务器。node
三、主服务器和备份服务器之间的数据同步时经过repcached实现。python
如今主要分为连部分进行介绍memcached的分布式,基础设备介绍和分布式配置。linux
XMemcached是一个新java memcached client。经过Xmemcached与memcached进行数据交互。算法
Memcached的分布只能经过客户端来实现,XMemcached实现了此功能,而且提供了一致性哈希(consistent hash)算法的实现。编程
XMemcached容许经过设置节点的权重来调节memcached的负载,设置的权重越高,该memcached节点存储的数据将越多,所承受的负载越大。后端
XMemcached容许经过JMX或者代码编程实现节点的动态添加或者移除,方便用户扩展和替换节点等。数组
刚才已经提到java nio一般对一个memcached节点使用一个链接,而XMemcached一样提供了设置链接池的功能,对同一个memcached能够建立N个链接组成链接池来提升客户端在高并发环境下的表现,而这一切对使用者来讲倒是透明的。启用链接池的前提条件是保证数据之间的独立性或者数据更新的同步,对同一个节点的各个链接之间是没有作更新同步的,所以应用须要保证数据之间是相互独立的或者所有采用CAS更新来保证原子性。缓存
XMemcached一样支持客户端的分布策略,默认分布的策略是按照key的哈希值模以链接数获得的余数,对应的链接就是将要存储的节点。若是使用默认的分布策略,你不须要作任何配置或者编程。
XMemcached一样支持一致性哈希(consistent hash),经过编程设置:
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddressMap(“192.168.121.16:11211”));
builder.setSessionLocator(new KetamaMemcachedSessionLocator());//设置哈希一致策略
MemcachedClient client=builder.build();
Memcached是经过cas协议实现原子更新,所谓原子更新就是compare and set,原理相似乐观锁,每次请求存储某个数据同时要附带一个cas值,memcached比对这个cas值与当前存储数据的cas值是否相等,若是相等就让新的数据覆盖老的数据,若是不相等就认为更新失败,这在并发环境下特别有用。XMemcached提供了对CAS协议的支持(不管是文本协议仍是二进制协议),CAS协议实际上是分为两个步骤:获取CAS值和尝试更新。先经过gets方法获取一个GetsResponse,此对象包装了存储的数据和cas值,而后经过cas方法尝试原子更新,若是失败打印”cas error”。显然,这样的方式很繁琐,而且若是你想尝试多少次原子更新就须要一个循环来包装这一段代码,所以XMemcached提供了一个*CASOperation*接口包装了这部分操做,容许你尝试N次去原子更新某个key存储的数据,无需显式地调用gets获取cas值。所以一个典型的使用场景以下:
GetsResponse<Integer> result = client.gets("a"); long cas = result.getCas(); //尝试将a的值更新为2 if (!client.cas("a", 0, 2, cas)) { System.err.println("cas error"); }
Memcached提供了统计协议用于查看统计信息,getStats方法返回一个map,其中存储了全部已经链接而且有效的memcached节点返回的统计信息:
Map<InetSocketAddress,Map<String,String>> result=client.getStats();
你也能够统计具体的项目,如统计items项目:
Map<InetSocketAddress,Map<String,String>> result=client.getStatsByItem("items");
Memcached 1.4.3开始支持SASL验证客户端,在服务器配置启用SASL以后,客户端须要经过受权验证才能够跟memcached继续交互,不然将被拒绝请求。XMemcached 1.2.5开始支持这个特性。假设memcached设置了SASL验证,典型地使用CRAM-MD5或者PLAIN的文本用户名和密码的验证机制,假设用户名为cacheuser,密码为123456,那么编程的方式以下:
MemcachedClientBuilder builder = new XMemcachedClientBuilder( AddrUtil.getAddresses("192.168.121.16:11211")); builder.addAuthInfo(AddrUtil.getOneAddress("192.168.121.16:11211"), AuthInfo.typical("cacheuser", "123456"));
传入一个int数组,里面的元素就是节点对应的权重值,好比这里设置"192.168.121.16:11211"节点的权重为3。注意,xmemcached的权重是经过复制链接的多个引用来实现的,好比权重为3,那么就复制3个同一个链接的引用放在集合中让MemcachedSessionLocator查找。
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("192.168.121.16:11211"),new int[]{3}); MemcachedClient memcachedClient = builder.build();
MemcachedClient client=new XMemcachedClient(AddrUtil.getAddresses("192.168.121.16:11211")); //Add two new memcached nodes client.addServer("192.168.121.17:11211"); //Remove memcached servers client.removeServer("192.168.121.16:11211");
Xmemcached是基于java nio的client实现,默认对一个memcached节点只有一个链接,这在一般状况下已经有很是优异的表现。可是在典型的高并发环境下,nio的单链接也会遇到性能瓶颈。所以XMemcached支持设置nio的链接池,容许创建多个链接到同一个memcached节点,可是请注意,这些链接之间是不一样步的,所以你的应用须要本身保证数据更新的同步,启用链接池能够经过下面代码:
MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddresses("192.168.121.16:11211")); builder.setConnectionPoolSize(5);
从1.3版本开始,xmemcached支持failure模式。所谓failure模式是指,当一个memcached节点down掉的时候,发往这个节点的请求将直接失败,而不是发送给下一个有效的memcached节点。具体能够看memcached的文档。默认不启用failure模式,启用failure模式能够经过下列代码:
MemcachedClientBuilder builder=…… builder.setFailureMode(true);
不只如此,xmemcached还支持主辅模式,你能够设置一个memcached的节点的备份节点,当主节点down掉的状况下,会将原本应该发往主节点的请求转发给standby备份节点。使用备份节点的前提是启用failure模式。备份节点设置以下:
MemcachedClient builder=new XmemcachedClientBuilder(AddrUtil.getAddressMap("localhost:11211,localhost:11212 host2:11211,host2:11212"));
上面的例子,将localhost:11211的备份节点设置为localhost:11212,而将host2:11211的备份节点设置为host2:11212
memcached存储大数据的效率是比较低的,当数据比较大的时候xmemcached会帮你压缩在存储,取出来的时候自动解压并反序列化,这个大小阀值默认是16K,能够经过Transcoder接口的setCompressionThreshold(1.2.1引入)方法修改阀值,好比设置为1K:
memcachedClient.getTranscoder()).setCompressionThreshold(1024);
这个方法是在1.2.1引入到Transcoder接口,在此以前,你须要经过强制转换来设置:
((SerializingTranscoder)memcachedClient.getTranscoder()).setCompressionThreshold(1024);
Memcached的序列化转换器在序列化数值类型的时候有个特殊处理,若是前面N个字节都是0,那么将会去除这些0,缩减后的数据将更小,例如数字3序列化是0x0003,那么前面3个0将去除掉成一个字节0x3。反序列化的时候将自动在前面根据数值类型补0。这一特性默认是开启的,若是考虑到与其余client兼容的话须要关闭此特性能够经过:
memcachedClient.getTranscoder()).setPackZeros(false);
在官方客户端有提供一个sanitizeKeys选项,当选择用URL当key的时候,MemcachedClient会自动将URL encode再存储。默认是关闭的,想启用能够经过:
memcachedClient.setSanitizeKeys(true);
keepalived是以VRRP协议为实现基础的,VRRP全称Virtual Router Redundancy Protocol,即虚拟路由冗余协议。虚拟路由冗余协议,能够认为是实现路由器高可用的协议,即将N台提供相同功能的路由器组成一个路由器组,这个组里面有一个master和多个backup,master上面有一个对外提供服务的vip(该路由器所在局域网内其余机器的默认路由为该vip),master会发组播,当backup收不到vrrp包时就认为master宕掉了,这时就须要根据VRRP的优先级来选举一个backup当master。这样的话就能够保证路由器的高可用了。keepalived主要有三个模块,分别是core、check和vrrp。core模块为keepalived的核心,负责主进程的启动、维护以及全局配置文件的加载和解析。check负责健康检查,包括常见的各类检查方式。vrrp模块是来实现VRRP协议的。原理图为下图:
keepalived只有一个配置文件keepalived.conf,里面主要包括如下几个配置区域,分别是global_defs、static_ipaddress、static_routes、vrrp_script、vrrp_instance和virtual_server。
global_defs { notification_email { a@abc.com b@abc.com ... } notification_email_from alert@abc.com smtp_server smtp.abc.com smtp_connect_timeout 30 enable_traps router_id host163 }
notification_email 故障发生时给谁发邮件通知。
notification_email_from 通知邮件从哪一个地址发出。
smpt_server 通知邮件的smtp地址。
smtp_connect_timeout 链接smtp服务器的超时时间。
enable_traps 开启SNMP陷阱(Simple Network Management Protocol)。
router_id 标识本节点的字条串,一般为hostname,但不必定非得是hostname。故障发生时,邮件通知会用到。
static_ipaddress { 10.210.214.163/24 brd 10.210.214.255 dev eth0 ... } static_routes { 10.0.0.0/8 via 10.210.214.1 dev eth0 ... }
static_ipaddress和static_routes区域配置的是是本节点的IP和路由信息。若是你的机器上已经配置了IP和路由,那么这两个区域能够不用配置。其实,通常状况下你的机器都会有IP地址和路由信息的,所以不必再在这两个区域配置。
vrrp_script chk_http_port { script "</dev/tcp/127.0.0.1/80" interval 1 weight -10 }
用来作健康检查的,当时检查失败时会将vrrp_instance
的priority
减小相应的值。以上意思是若是script
中的指令执行失败,那么相应的vrrp_instance
的优先级会减小10个点。
virtual_server IP Port { delay_loop <INT> lb_algo rr|wrr|lc|wlc|lblc|sh|dh lb_kind NAT|DR|TUN persistence_timeout <INT> persistence_granularity <NETMASK> protocol TCP ha_suspend virtualhost <STRING> alpha omega quorum <INT> hysteresis <INT> quorum_up <STRING>|<QUOTED-STRING> quorum_down <STRING>|<QUOTED-STRING> sorry_server <IPADDR> <PORT> real_server <IPADDR> <PORT> { weight <INT> inhibit_on_failure notify_up <STRING>|<QUOTED-STRING> notify_down <STRING>|<QUOTED-STRING> # HTTP_GET|SSL_GET|TCP_CHECK|SMTP_CHECK|MISC_CHECK HTTP_GET|SSL_GET { url { path <STRING> # Digest computed with genhash digest <STRING> status_code <INT> } connect_port <PORT> connect_timeout <INT> nb_get_retry <INT> delay_before_retry <INT> } } }
delay_loop 延迟轮询时间(单位秒)。
lb_algo 后端调试算法(load balancing algorithm)。
virtualhost 用来给HTTP_GET和SSL_GET配置请求header的。
sorry_server 当全部real server宕掉时,sorry server顶替。
real_server 真正提供服务的服务器。
weight 权重。
notify_up/down 当real server宕掉或启动时执行的脚本。
健康检查的方式,N多种方式。
path 请求real serserver上的路径。
digest/status_code 分别表示用genhash算出的结果和http状态码。
connect_port 健康检查,若是端口通则认为服务器正常。
connect_timeout,nb_get_retry,delay_before_retry分别表示超时时长、重试次数,下次重试的时间延迟。
vrrp_sync_group VG_1 { group { inside_network # name of vrrp_instance (below) outside_network # One for each moveable IP. ... } notify_master /path/to_master.sh notify_backup /path/to_backup.sh notify_fault "/path/fault.sh VG_1" notify /path/notify.sh smtp_alert } vrrp_instance VI_1 { state MASTER interface eth0 use_vmac <VMAC_INTERFACE> dont_track_primary track_interface { eth0 eth1 } mcast_src_ip <IPADDR> lvs_sync_daemon_interface eth1 garp_master_delay 10 virtual_router_id 1 priority 100 advert_int 1 authentication { auth_type PASS auth_pass 12345678 } virtual_ipaddress { 10.210.214.253/24 brd 10.210.214.255 dev eth0 192.168.1.11/24 brd 192.168.1.255 dev eth1 } virtual_routes { 172.16.0.0/12 via 10.210.214.1 192.168.1.0/24 via 192.168.1.1 dev eth1 default via 202.102.152.1 } track_script { chk_http_port } nopreempt preempt_delay 300 debug notify_master <STRING>|<QUOTED-STRING> notify_backup <STRING>|<QUOTED-STRING> notify_fault <STRING>|<QUOTED-STRING> notify <STRING>|<QUOTED-STRING> smtp_alert }
vrrp_instance用来定义对外提供服务的VIP区域及其相关属性。
vrrp_rsync_group用来定义vrrp_intance组,使得这个组内成员动做一致。举个例子来讲明一下其功能:
两个vrrp_instance同属于一个vrrp_rsync_group,那么其中一个vrrp_instance发生故障切换时,另外一个vrrp_instance也会跟着切换(即便这个instance没有发生故障)。
notify_master/backup/fault 分别表示切换为主/备/出错时所执行的脚本。
notify 表示任何一状态切换时都会调用该脚本,而且该脚本在以上三个脚本执行完成以后进行调用,keepalived会自动传递三个参数($1 = "GROUP"|"INSTANCE",$2 = name of group or instance,$3 = target state of transition(MASTER/BACKUP/FAULT))。
smtp_alert 表示是否开启邮件通知(用全局区域的邮件设置来发通知)。
state 能够是MASTER或BACKUP,不过当其余节点keepalived启动时会将priority比较大的节点选举为MASTER,所以该项其实没有实质用途。
interface 节点固有IP(非VIP)的网卡,用来发VRRP包。
use_vmac 是否使用VRRP的虚拟MAC地址。
dont_track_primary 忽略VRRP网卡错误。(默认未设置)
track_interface 监控如下网卡,若是任何一个不通就会切换到FALT状态。(可选项)
mcast_src_ip 修改vrrp组播包的源地址,默认源地址为master的IP。(因为是组播,所以即便修改了源地址,该master仍是能收到回应的)
lvs_sync_daemon_interface 绑定lvs syncd的网卡。
garp_master_delay 当切为主状态后多久更新ARP缓存,默认5秒。
repcached的全称 replication cached是由日本人发明的memcached的高可用性技术,简称复制缓冲区技术。它是一个单master单 slave的方案,但它的 master/slave都是可读写的,并且能够相互同步,若是 master 宕机, slave侦测到链接断了,它会自动 listen而成为 master;而若是 slave坏掉, master也会侦测到链接断,它就会从新 listen等待新的 slave加入。
如今有一台主服务器IP为192.168.121.12,从服务器:192.168.121.14,虚拟ip:192.168.121.16。分别在12和14服务器上配置keepalived和memcached-repcached。
安装
cd /usr/local/softnew wget http://www.keepalived.org/software/keepalived-1.4.1.tar.gz tar -zxvf keepalived-1.4.1.tar.gz cd keepalived-1.4.1 ./configure --prefix=/usr/local/keepalived make make --prefix=/usr/local/keepalived install
配置
tree -l /usr/local/keepalived/etc -- keepalived | |-- keepalived.conf | `-- samples | |-- keepalived.conf.status_code | |-- keepalived.conf.track_interface | |-- keepalived.conf.vrrp | |-- 。。。 |-- rc.d | `-- init.d | `-- keepalived `-- sysconfig `-- keepalived
若是没有找到init.d目录,能够到keepalived-1.4.1/keepalived/etc/init.d/目录下找keepalivew。
将配置文件拷贝到系统对应的目录下:
cp /usr/local/keepalived/etc/keepalived.conf /etc/keepalived/keepalived.conf cp /usr/local/keepalived/etc/rc.d/init.d/keepalived /etc/rc.d/init.d/keepalived cp /usr/local/keepalived/etc/sysconfig/keepalived /etc/sysconfig/keepalived
配置文件修改:
! Configuration File for keepalived global_defs { notification_email { acassen@firewall.loc failover@firewall.loc sysadmin@firewall.loc } notification_email_from Alexandre.Cassen@firewall.loc # smtp_server 192.168.200.1 # smtp_connect_timeout 30 router_id LVS_DEVEL vrrp_skip_check_adv_addr # vrrp_strict vrrp_garp_interval 0 vrrp_gna_interval 0 } vrrp_instance VI_1 { state BACKUP interface ens33 virtual_router_id 51 priority 100 advert_int 1 authentication { auth_type PASS auth_pass 1111 } virtual_ipaddress { 192.168.121.16 } } virtual_server 192.168.121.16 11211 { delay_loop 6 lb_algo rr lb_kind NAT persistence_timeout 7200 protocol TCP nat_mask 255.255.255.0 # sorry_server 192.168.200.200 1358 real_server 192.168.121.12 11211 { weight 1 TCP_CHECK { connect_timeout 3 retry 3 delay_before_retry 3 connect_port 11211 } } real_server 192.168.121.14 11211 { weight 1 TCP_CHECK { connect_timeout 3 retry 3 delay_before_retry 3 connect_port 11211 } } }
启动与中止
service keepalived start #启动服务 service keepalived stop #中止服务 service keepalived restart #重启服务
repcached最新下载地址:http://sourceforge.net/projects/repcached/files/repcached有两种方式,一种经过打补丁形式,另外一种直接安装融合版的memcached。这里选择第二种。
安装
一、安装libevent环境。
二、安装memcached
启动
启动master:
./memcached -v -d -p 11211 -l 192.168.121.12 -u root
启动slave:
./memcached -v -d -p 11211 -l 192.168.121.14 -u root -x 192.168.121.12
package memcached; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import net.rubyeye.xmemcached.MemcachedClient; import net.rubyeye.xmemcached.MemcachedClientBuilder; import net.rubyeye.xmemcached.XMemcachedClientBuilder; import net.rubyeye.xmemcached.auth.AuthInfo; import net.rubyeye.xmemcached.command.BinaryCommandFactory; import net.rubyeye.xmemcached.exception.MemcachedException; import net.rubyeye.xmemcached.impl.KetamaMemcachedSessionLocator; import net.rubyeye.xmemcached.transcoders.CompressionMode; import net.rubyeye.xmemcached.transcoders.SerializingTranscoder; import net.rubyeye.xmemcached.utils.AddrUtil; public class TestCluster { private static Map<Integer, MemcachedClient> clientMap = new HashMap<Integer, MemcachedClient>();//client的集合 private static int maxClient = 3; private static int expireTime = 900;//900s(默认的缓存过时时间) private static int maxConnectionPoolSize = 1;//每一个客户端池子的链接数 private static long op_time = 2000L;//操做超时时间 private static String servers = "192.168.121.16:11211"; private static final String KEY_SPLIT = "-";//用于隔开缓存前缀与缓存键值 /** * 构建MemcachedClient的map集合 */ static{ MemcachedClientBuilder builder = new XMemcachedClientBuilder(AddrUtil.getAddressMap(servers)); builder.setConnectionPoolSize(1);//这个默认也是1 builder.setSessionLocator(new KetamaMemcachedSessionLocator(true));//使用一致性hash算法 // builder.addAuthInfo(AddrUtil.getOneAddress("localhost:11211"), AuthInfo // .typical("cacheuser", "123456"));//对服务进行受权认证 SerializingTranscoder transcoder = new SerializingTranscoder(1024*1024);//序列化转换器,指定最大的数据大小1M transcoder.setCharset("UTF-8");//默认为UTF-8,这里可去掉 transcoder.setCompressionThreshold(1024*1024);//单位:字节,压缩边界值,任何一个大于该边界值(这里是:1M)的数据都要进行压缩 transcoder.setCompressionMode(CompressionMode.GZIP);//压缩算法 builder.setFailureMode(true);//在此模式下,某个节点挂掉的状况下,往这个节点的请求都将直接抛出MemcachedException的异常。 builder.setTranscoder(transcoder); builder.setCommandFactory(new BinaryCommandFactory());//命令工厂 //构建10个MemcachedCient,并放入clientMap for(int i=0;i<maxClient;i++){ try { MemcachedClient client = builder.build(); client.setOpTimeout(op_time);//设置操做超时时间,默认为1s clientMap.put(i, client); } catch (IOException e) { e.printStackTrace(); } } } public static MemcachedClient getMemcachedClient(){ /* * Math.random():产生[0,1)之间的小数 * Math.random()*maxClient:[0~maxClient)之间的小数 * (int)(Math.random()*maxClient):[0~maxClient)之间的整数 */ return clientMap.get((int)(Math.random()*maxClient)); } public static boolean setCache(String key,Object value,int exp){ boolean setCacheSuccess = false; try { MemcachedClient client = getMemcachedClient(); setCacheSuccess = client.add(key, exp, value); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } return setCacheSuccess; } /** * 获取缓存 **/ public static Object getCache(String key){ Object value = null; try { MemcachedClient client = getMemcachedClient(); value = client.get(key); } catch (TimeoutException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (MemcachedException e) { e.printStackTrace(); } return value; } public static void main(String[] args) throws Exception { /*for(int i=0;i<100;i++){ System.out.println(Math.random()); }*/ for(int i=0;i<1000;i++){ System.out.println(TestCluster.setCache("hello"+i, "world"+i,0)); System.out.println(TestCluster.getCache("hello"+i)); Thread.sleep(10000); } /*System.out.println(MemcachedUtil.getCache("hello2")); System.out.println(MemcachedUtil.getCache("hello2")); System.out.println(MemcachedUtil.getCache("hello2")); System.out.println(MemcachedUtil.getCache("hello2")); System.out.println(MemcachedUtil.getCache("hello2"));*/ } }