基于zookeeper
使用Zookeeper实现分布式锁
思路
①建立一个节点,假如名为:lock 。节点类型为持久节点(PERSISTENT)
②每当进程需要访问共享资源时,会调用分布式锁的lock()或tryLock()方法获得锁,这个时候会在第一步创建的lock节点下建立相应的顺序子节点,节点类型为临时顺序节点(EPHEMERAL_SEQUENTIAL),通过组成特定的名字name+lock+顺序号。
③在建立子节点后,对lock下面的所有以name开头的子节点进行排序,判断刚刚建立的子节点顺序号是否是最小的节点,假如是最小节点,则获得该锁对资源进行访问。
④假如不是该节点,就获得该节点的上一顺序节点,并给该节点是否存在注册监听事件。同时在这里阻塞。等待监听事件的发生,获得锁控制权。
⑤当调用完共享资源后,调用unlock()方法,关闭zk,进而可以引发监听事件,释放该锁。
这种分布式锁是严格的按照顺序访问的并发锁。
参考实现
public class SimpleDistributedLock implements Lock, Watcher {
private ZooKeeper zk;
private String root = "/distributedlock";//分布式锁的根节点(即多个分布式锁都在该节点下)
private String lockName;//锁,即具体的某个分布式锁的名称
private String waitNode;//当前结点的前一个结点
private String curNode;//当前结点:即该SimpleDistributedLock实例对应的zookeeper上的结点
/**
* //用来等待获取锁的计数器
*/
private CountDownLatch lockLatch;
/**
* 用来保证zk连接建立的计数器,
* 因为new zookeeper()这种方式建立连接是异步的
*/
private CountDownLatch connLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;
private SimpleDistributedLock() {
}
public SimpleDistributedLock(String connAddress, String lockName) {
this.lockName = lockName;
// 创建zk连接
try {
zk = new ZooKeeper(connAddress, sessionTimeout, this);
//等待连接完全建立
connLatch.await();
Stat statRoot = zk.exists(root, false);
if (statRoot == null) {
// 创建根节点
zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 获取锁
* <p>
* 获取不到,则阻塞当前线程,直到获取到锁才返回
*/
@Override
public void lock() {
try {
if(this.tryLock()){
return;
} else{
Stat stat = zk.exists(root + "/" + waitNode,true);
if(stat != null){
System.out.println(Thread.currentThread().getName() + "等待前面的节点释放锁(或等待并释放锁):" + root + "/" + waitNode);
this.lockLatch = new CountDownLatch(1);
//一直等待,直到获取锁,即前一个节点被删除
lockLatch.await();
System.out.println(Thread.currentThread().getName() + "获取了锁,对应节点为:" + curNode);
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
/**
* 尝试获取锁
* <p>
* 非阻塞
*
* @return 获取到锁,返回true;否则返回false
*/
@Override
public boolean tryLock() {
try {
String splitStr = "_lock_";
if (lockName.contains(splitStr)) {
throw new RuntimeException("lockName can not contains '_lock_'");
}
//创建临时子节点
curNode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(Thread.currentThread().getName() + "创建临时节点" + curNode);
//取出所有子节点
List<String> subNodes = zk.getChildren(root , false);
//取出所有lockName的锁
List<String> lockObjNodes = new ArrayList<String>();
for (String node : subNodes) {
String _node = node.split(splitStr)[0];
if (_node.equals(lockName)) {
lockObjNodes.add(node);
}
}
Collections.sort(lockObjNodes);
if (curNode.equals(root + "/" + lockObjNodes.get(0))) {
//如果是最小的节点,则表示取得锁
System.out.println(Thread.currentThread().getName() + "获取了锁,对应节点为:" + curNode);
return true;
}
//如果不是最小的节点,找到比自己小1的节点
String subCurNode = curNode.substring(curNode.lastIndexOf("/") + 1);
waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subCurNode) - 1);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return false;
}
/**
* 尝试获取锁
* <p>
* 至多阻塞time这么久的时间
*
* @param time
* @param unit
* @return
* @throws InterruptedException
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
try {
if(this.tryLock()){
return true;
}
//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
Stat stat = zk.exists(root + "/" + waitNode,true);
if(stat != null){
System.out.println(Thread.currentThread().getName() + "等待前面的节点释放锁(或等待并释放锁):" + root + "/" + waitNode);
this.lockLatch = new CountDownLatch(1);
//等待有限时间
if(this.lockLatch.await(time, TimeUnit.MILLISECONDS)){
System.out.println(Thread.currentThread().getName() + "获取了锁,对应节点为:" + curNode);
return true;
}
}else {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 释放锁
*/
@Override
public void unlock() {
try {
System.out.println(Thread.currentThread().getName() + "释放了锁,并删除临时节点:" + curNode);
zk.delete(curNode,-1);
curNode = null;
zk.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
@Override
public Condition newCondition() {
return null;
}
/**
* zk监听器
*
* @param event
*/
@Override
public void process(WatchedEvent event) {
if(event.getState() == Event.KeeperState.SyncConnected){
connLatch.countDown();
}
//节点删除,说明其他线程放弃锁
if(event.getType().equals(Event.EventType.NodeDeleted)){
if(this.lockLatch != null) {
this.lockLatch.countDown();
}
}
}
}
Curator对Zookeeper的封装
虽然zookeeper原生客户端暴露的API已经非常简洁了,但是实现一个分布式锁还是比较麻烦的
所以,我们可以直接使用curator这个开源项目提供的zookeeper分布式锁实现。
public class CuratorDistributedLockSample {
@Test
public void test(){
String connectString = "47.93.161.88:2181";
for(int i = 0; i < 10; i++){
Thread thread = new Thread(()->{
//创建zookeeper的客户端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, retryPolicy);
client.start();
//创建分布式锁, 锁空间的根节点路径为/curator/lock
InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock");
try {
mutex.acquire();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "获取到了锁,开始干活");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "干活完毕");
try {
System.out.println(Thread.currentThread().getName() + "释放锁");
mutex.release();
} catch (Exception e) {
e.printStackTrace();
}
//关闭客户端
client.close();
});
thread.setName("【线程 "+ i +"】");
thread.start();
}
while (true){}
}
}
Last updated