Zookeeper应用实操
1. 服务器动态上下线监听
1.1 需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
1.2 需求分析
1.3 具体实现
- 先在集群上创建/servers节点
sh
[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers
- 在Idea创建包名:com.jack.zkcase2
- 客户端代码,监听zk上目录的信息(也就是服务器注册主机名信息),动态获取上线的主机,来进行下一步操作。
java
package com.jack.zkcase2;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import static org.apache.zookeeper.Watcher.Event.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class DistributedClient {
private static String connectString = "hadoop105:2181,hadoop106:2181,hadoop107:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
// 创建到 zk 的客户端连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, event -> {
try {
if (event.getType().getIntValue() == EventType.None.getIntValue()) {
if (Watcher.Event.EventType.None.getIntValue() == event.getType().getIntValue()) {
// 连接状态改变
if (event.getState().getIntValue() == KeeperState.SyncConnected.getIntValue()) {
// 连接成功
System.out.println("--连接上zk集群--");
} else if (event.getState().getIntValue() == KeeperState.Disconnected.getIntValue()) {
// 连接断开
System.out.println("--断开连接--");
} else if (event.getState().getIntValue() == KeeperState.Expired.getIntValue()) {
// 会话过期
System.out.println("--会话过期--");
}
}
} else {
// 再次启动监听节点变化
getServerList();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
// 获取服务器列表信息
public void getServerList() throws Exception {
// 1 获取服务器子节点信息,并且对父节点进行监听
List<String> children = zk.getChildren("/servers", true);
// 2 存储服务器信息列表
ArrayList<String> servers = new ArrayList<>();
System.out.println("/servers监听注册完成,目前路径有: ");
// 3 遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zk.getData("/servers/" + child, false, null);
servers.add(new String(data));
}
// 4 打印服务器列表信息
System.out.println(servers);
}
// 业务功能
public void business() throws Exception {
System.out.println("client is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributedClient client = new DistributedClient();
client.getConnect();
// 2 获取 servers 的子节点信息,从中获取服务器信息列表
client.getServerList();
// 3 业务进程启动
client.business();
}
}
- 服务器端向Zookeeper注册主机名,服务端内部代码通过运行main方法时从args传入变量主机名,向zk注册临时节点。实现代码如下:
java
import org.apache.zookeeper.*;
import java.io.IOException;
public class DistributedServer {
private static String connectString = "hadoop105:2181,hadoop106:2181,hadoop107:2181";
private static int sessionTimeout = 2000;
private ZooKeeper zk = null;
private String parentNode = "/servers";
// 创建到 zk 的客户端连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
}
// 注册服务器
public void registServer(String hostname) throws Exception {
// 利用临时节点如果连接zk断开就消失特性
String create = zk.create("/servers/"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname + " is online " + create);
}
// 业务功能
public void business(String hostname) throws Exception {
System.out.println(hostname + " is working ...");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
// 1 获取 zk 连接
DistributeServer server = new DistributeServer();
server.getConnect();
// 2 利用 zk 连接注册服务器信息
server.registServer(args[0]);
// 3 启动业务功能
server.business(args[0]);
}
}
1.5 测试
- 启动DistributedClient客户端,监听可用的主机名。
- 启动DistributeServer服务, 需要在IDEA中设置运行参数,参数为主机名。
1️⃣ 点击Edit Configurations…2️⃣ 在弹出的窗口中(Program arguments)输入想启动的主机,例如,hadoop106
3️⃣ 回到DistributeServer的main方法,右键,在弹出的窗口中点击Run->"DistributeServer.main()"
4️⃣ 观察DistributedServer控制台,提示hadoop107 is working5️⃣ 观察DistributedClient控制台,提示hadoop107已经上线
2. ZooKeeper分布式锁
什么叫做分布式锁呢?
比如说"进程 1"在使用该资源的时候,会先去获得锁,"进程 1"获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,"进程 1"用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作分布式锁。
1.1 分布式锁分析
3. 原生Zookeeper实现分布式锁
java
public class DistributedLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
// 分布式锁 1
DistributedLock lock1 = new DistributedLock();
// 分布式锁 2
DistributedLock lock2 = new DistributedLock();
new Thread(() -> {
// 获取锁对象
lock1.zkLock();
System.out.println("线程1获取锁。。。");
try {
System.out.println("开始处理业务逻辑。。。。");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock1.zkUnlock();
System.out.println("线程1释放锁");
}).start();
new Thread(() -> {
// 获取锁对象
lock2.zkLock();
System.out.println("线程2获取锁。。。");
try {
System.out.println("开始处理业务逻辑。。。。");
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
lock2.zkUnlock();
System.out.println("线程2释放锁");
}).start();
}
}
java
// zookeeper server 列表
private String connectString = "hadoop105:2181,hadoop106:2181,hadoop107:2181";
// 超时时间
private int sessionTimeout = 5000;
private ZooKeeper zk;
private String rootNode = "/locks";
private String subNode = "seq-";
// 当前 client 等待的子节点
private String createdPath;
private String waitPath;
//ZooKeeper 连接
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitLatch = new CountDownLatch(1);
DistributedLock() throws IOException, InterruptedException, KeeperException {
// 建立连接
zk = new ZooKeeper(connectString, sessionTimeout, event -> {
// 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
if(event.getState()== Watcher.Event.KeeperState.SyncConnected){
connectLatch.countDown();
}
// 发生了 waitPath 的删除事件
if(event.getType() == Watcher.Event.EventType.NodeDeleted && event.getPath().equals(waitPath)){
waitLatch.countDown();
}
});
// 等待zk连接成功
connectLatch.await();
// 判断根路径/locks,没有就创建
Stat stat = zk.exists(rootNode, false);
if (stat == null) {
// 创建根节点
zk.create(rootNode, "locks".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
// 加锁
public void zkLock() {
try {
createdPath = zk.create(rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
List<String> children = zk.getChildren(rootNode, false);
// 如果子节点只有一个,那么就是最小的
if (children.size() == 1) {
return;
} else {
// 子节点不止一个,比对自己的节点是否是子节点中最小的
Collections.sort(children);
String createChild = createdPath.substring((rootNode+"/").length());
int index = children.indexOf(createChild);
if(index ==-1){
throw new RuntimeException("数据异常");
} else if (index==0) {
return;
}else {
// 如果不是最小的,需要监听比自己小的节点路径,等待他消失
waitPath = rootNode+"/"+children.get(index - 1);
zk.getChildren(waitPath, true);
waitLatch.await();
}
}
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 解锁
public void zkUnlock() {
//删除节点
try {
zk.delete(createdPath, -1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (KeeperException e) {
throw new RuntimeException(e);
}
}
}
执行测试:
4. Curator框架实现分布式锁
4.1 Curator优点
- 原生的Java API开发存在的问题:
- 会话连接是异步的,需要自己去处理。比如使用CountDownLatch
- Watch 需要重复注册,不然就不能生效
- 开发的复杂性还是比较高的
- 不支持多节点删除和创建。需要自己去递归
- Curator是一个专门解决分布式锁的框架,解决了原生JavaAPI开发分布式遇到的问题。官网地址:https://curator.apache.org/index.html
4.2 Curator案例实操
- 添加依赖
xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>5.6.0</version>
</dependency>
- 代码实现
java
public class CuratorLockTest {
private String rootNode = "/locks";
// zookeeper server 列表
private String connectString = "hadoop105:2181,hadoop106:2181,hadoop107:2181";
// connection 超时时间
private int connectionTimeout = 2000;
// session 超时时间
private int sessionTimeout = 2000;
private CuratorFramework client;
@Before
public void init(){
//重试策略,初试时间 3 秒,重试 3 次
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
client = CuratorFrameworkFactory.newClient(connectString, sessionTimeout, connectionTimeout, policy);
client.start();
System.out.println("zookeeper 初始化完成...");
}
@Test
public void testLock(){
final InterProcessLock lock1 = new InterProcessMutex(client, rootNode);
final InterProcessLock lock2 = new InterProcessMutex(client, rootNode);
new Thread(()->{
try {
lock1.acquire();
System.out.println("线程1获取到锁");
Thread.sleep(5);
// 测试锁重入
System.out.println("线程1再次获取到锁");
lock1.acquire();
lock1.release();
System.out.println("线程1释放锁");
Thread.sleep(5);
lock1.release();
System.out.println("线程1再次释放锁");
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "线程1").start();
new Thread(()->{
try {
lock2.acquire();
System.out.println("线程2获取到锁");
Thread.sleep(5);
lock2.acquire();
System.out.println("线程2再次获取到锁");
// 测试锁重入
lock2.release();
System.out.println("线程2释放锁");
Thread.sleep(5);
lock2.release();
System.out.println("线程2再次释放锁");
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "线程2").start();
}
}
执行代码,查看IDEA控制台: