`
丁树同
  • 浏览: 9149 次
  • 性别: Icon_minigender_1
  • 来自: 济南
社区版块
存档分类
最新评论

3)用zookeeper实现分布式锁

 
阅读更多

    在我们的项目当中,可能有许多组件在并行的运行。这些组件可能需要更新我们的数据存储。所以这些组件应该有解决冲突的能力,避免脏数据。但是这些组件往往运行在不同的服务器上,甚至编程语言,文件系统结构都是不同的。这些差异给我们的分布式协作带来困难。

    zookeeper给我们提供了解决方案。那么如何借助zookeeper来管理我们的分布式系统,达到系统之间的协作呢?我们可以通过zookeeper来实现分布式锁的概念,从而达到系统之间的协作目的。

 

public class DistributedLock {

  private final ZooKeeper zk;
  private final String lockBasePath;
  private final String lockName;

  private String lockPath;

  public DistributedLock(ZooKeeper zk, String lockBasePath, String lockName) {
    this.zk = zk;
    this.lockBasePath = lockBasePath;
    this.lockName = lockName;
  }

  public void lock() throws IOException {
    try {
      // lockPath will be different than (lockBasePath + "/" + lockName) becuase of the sequence number ZooKeeper appends
      lockPath = zk.create(lockBasePath + "/" + lockName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

      final Object lock = new Object();

      synchronized(lock) {
        while(true) {
          List<String> nodes = zk.getChildren(lockBasePath, new Watch() {
            @Override
            public void process(WatchedEvent event) {
              synchronized (lock) {
                lock.notifyAll();
              }
            }
          });
          Collections.sort(nodes); // ZooKeeper node names can be sorted lexographically
          if (lockPath.endsWith(nodes.get(0)) {
            return;
          } else {
            lock.wait();
          }
        }
      }
    } catch (KeeperException e) {
      throw new IOException (e);
    } catch (InterruptedException e) {
      throw new IOException (e);
    }
  }

  public void unlock() throws IOException {
  try {
	  zk.delete(lockPath, -1);
	  lockPath = null;
	} catch (KeeperException e) {
	  throw new IOException (e);
    } catch (InterruptedException e) {
      throw new IOException (e);
    }
  }
}

 

  1.  构造方法
    public DistributedLock(ZooKeeper zk, String lockBasePath, String lockName) {
        this.zk = zk;
        this.lockBasePath = lockBasePath;
        this.lockName = lockName;
      }
     需要给构造方法提供三个参数,zk 表示可以连接到zookeeper服务器的客户端。lockBasePath 准备将节点创建在那个目录路径下。lockName指创建的子目录名称。
  2. 创建节点
    lockPath = zk.create(lockBasePath + "/" + lockName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
     这里注意的地方就是节点的类型是ephemeral并且是sequetial的。也就是说我们可以在lockBasePath下面可以创建多个名为lockName的多个临时节点,这些节点时有序的。sequetial节点的名称可以相同,因为zookeeper将transactionID后缀到节点路径后面以用于区分。这也是实现分布式锁的关键。另外这些节点时临时的,保证当申请锁的程序失去session以后,可以由zookeeper自动去除该节点。
  3. 申请锁
    final Object lock = new Object();
    
          synchronized(lock) {
            while(true) {
              List<String> nodes = zk.getChildren(lockBasePath, new Watch() {
                @Override
                public void process(WatchedEvent event) {
                  synchronized (lock) {
                    lock.notifyAll();
                  }
                }
              });
              Collections.sort(nodes); // ZooKeeper node names can be sorted lexographically
              if (lockPath.endsWith(nodes.get(0)) {
                return;
              } else {
                lock.wait();
              }
            }
          }
     因为watcher 里面的process()是有外部线程触发的,所以为了保证两次时间上相近的触发操作能够同步执行使用两层的synchronized block。
  • 首先从zookeeper lockBasePath目录下面获得所有的名字为lockName的节点。
  • 将子节点排序
  • 如果该线程创建的节点时最小的节点也就是相对其他节点时最先创建的,那么就获得了锁,跳出while循环执行相应的操作逻辑。否则释放锁,等待节点变化事件触发下一次循环重新申请锁。
  • 当执行完操作逻辑以后,就可以释放锁了,也就是删除该线程创建的节点。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics