利用Curator客户端API,实现分布式事务锁.java
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.5.0</version> </dependency>
import java.util.concurrent.TimeUnit; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class CuratorUtil implements InitializingBean { private static final Logger LOG = LoggerFactory.getLogger(CuratorUtil.class); //zookeeper.connection.url=172.30.0.177:2181,172.30.0.173:2181,172.30.0.171:2181 //zookeeper.iread.lock.path=/iread/source/lock @Value("${zookeeper.connection.url}") private String zookeeperConnectionString; @Value("${zookeeper.lockPath.prefix}") private String lockPathPrefix; private CuratorFramework client; @Override public void afterPropertiesSet() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start(); } /** * 获取锁。返回不为null表示成功获取到锁,用完以后须要调用releaseLock方法释放 * @param relativePath 锁的相对路径,Not start with '/' * @param waitSeconds 等待秒数 * @return 未获取到锁返回null */ public InterProcessMutex getLock(String relativePath, int waitSeconds) { InterProcessMutex lock = new InterProcessMutex(client, lockPathPrefix + relativePath); try { if (lock.acquire(waitSeconds, TimeUnit.SECONDS)) { return lock; } } catch (Exception e) { LOG.error("get lock error", e); } releaseLock(lock); return null; } /** * 释放锁 */ public void releaseLock(InterProcessMutex lock) { if (lock != null && lock.isAcquiredInThisProcess()) { try { lock.release(); } catch (Exception e) { LOG.warn("release lock error", e); } } } }
import javax.annotation.Resource; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; /** * 抽象分布式Job(自动获取和释放ZK分布式锁) <br> * 子类实现process()方法进行业务处理 */ public abstract class AbstractDistributedJob { protected final Logger LOG = LoggerFactory.getLogger(getClass()); /** 至少锁60秒 */ private static final long LOCK_MIN_TIME = 60000; @Resource private CuratorUtil curatorUtil; public void run() { InterProcessMutex lock = curatorUtil.getLock(getClass().getSimpleName() + "/lock", 1); if (lock == null) { LOG.info("can not get lock, exit job."); return; } long st = System.currentTimeMillis(); LOG.info("start job..."); try { process(); } catch (Exception e) { LOG.error("job error", e); } finally { long cost = System.currentTimeMillis() - st; LOG.info("job finished, cost {} ms.", cost); if (cost < LOCK_MIN_TIME) { try { Thread.sleep(LOCK_MIN_TIME - cost); } catch (InterruptedException e) {} } curatorUtil.releaseLock(lock); } } public abstract void process() throws Exception; }
使用场景:当部署多台服务器时,有一个任务须要,若是没有分布式锁,则多台服务器都会执行这个任务,可是咱们每每只想让其中一台服务器执行这个任务。git
1. 5台tomcat服务器,部署相同的war包,每一个tomcat服务器都会在凌晨2点执行一次消息推送,为了防止5台服务器都推送消息,部署三台zookeeper 服务器,5台Tomcat服务器都链接上zookeeper服务器,而后在推送消息的时候,获取锁的那台服务器执行任务,从而保证了Tomcat服务器集群只有一台服务器获取锁,执行任务。github
2. 分布式调度,一台消息队列服务器MQ,多个业务逻辑服务器,多个业务逻辑服务器能够使用一个分布式锁去竞争消息队列数据,获取到锁的服务器获取数据,保证了消息队列的每条数据只被一台服务器获取,从而保证多台服务器并发执行任务。redis
redis实现分布式锁 spring
https://yq.aliyun.com/articles/307547?spm=5176.100239.blogrightarea309637.19.af4dc28ybSYCyapache
redisson分布式锁tomcat
https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#81-lock服务器