生产-消费者队列,用于多节点的分布式数据结构,生产和消费数据。生产者创建一个数据对象,并放到队列中;消费者从队列中取出一个数据对象并进行处理。在ZooKeeper中,队列可以使用一个容器节点下创建多个子节点来实现;创建子节点时,CreateMode使用 PERSISTENT_SEQUENTIAL,ZooKeeper会自动在节点名称后面添加唯一序列号。EPHEMERAL_SEQUENTIAL也有同样的特点,区别在于会话结束后是否会自动删除。
敲小黑板:*_SEQUENTIAL是ZooKeeper的一个很重要的特性,分布式锁、选举制度都依靠这个特性实现的。
1 对前续代码的重构
之前的文章,我们已经用实现了Watcher和Barrier,创建ZooKeeper连接的代码已经复制了一遍。后续还需要类似的工作,因此先对原有代码做一下重构,让代码味道干净一点。

以下是 process(WatchedEvent)的代码
final public void process(WatchedEvent event) {
if (Event.EventType.None.equals(event.getType())) {
// 连接状态发生变化
if (Event.KeeperState.SyncConnected.equals(event.getState())) {
// 连接建立成功
connectedSemaphore.countDown();
}
} else if (Event.EventType.NodeCreated.equals(event.getType())) {
processNodeCreated(event);
} else if (Event.EventType.NodeDeleted.equals(event.getType())) {
processNodeDeleted(event);
} else if (Event.EventType.NodeDataChanged.equals(event.getType())) {
processNodeDataChanged(event);
} else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {
processNodeChildrenChanged(event);
}
}
以ZooKeeperBarrier为例,看看重构之后的构造函数和监听Event的代码
ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName)
throws IOException {
super(address);
this.tableSerial = createRootNode(tableSerial);
this.tableCapacity = tableCapacity;
this.customerName = customerName;
}
protected void processNodeChildrenChanged(WatchedEvent event) {
log.info("{} 接收到了通知 : {}", customerName, event.getType());
// 子节点有变化
synchronized (mutex) {
mutex.notify();
}
}
2 队列的生产者
生产者的关键代码
String elementName = queueName + "/element";
ArrayList<ACL> ids = ZooDefs.Ids.OPEN_ACL_UNSAFE;
CreateMode createMode = CreateMode.PERSISTENT_SEQUENTIAL;
getZooKeeper().create(elementName, value, ids, createMode);
注意,重点是PERSISTENT_SEQUENTIAL,PERSISTENT是表示永久存储直到有命令删除,SEQUENTIAL表示自动在后面添加自增的唯一序列号。这样,尽管elementName都一样,但实际生成的zNode名字在 “element”后面会添加格式为%010d的10个数字,如0000000001。如一个完整的zNode名可能为/queue/element0000000021。
3 队列的消费者
消费者尝试从子节点列表获取zNode名最小的一个子节点,如果队列为空则等待NodeChildrenChanged事件。关键代码
/** 队列的同步信号 */
private static Integer queueMutex = Integer.valueOf(1);
@Override
protected void processNodeChildrenChanged(WatchedEvent event) {
synchronized (queueMutex) {
queueMutex.notify();
}
}
/**
* 从队列中删除第一个对象
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException, InterruptedException {
while (true) {
synchronized (queueMutex) {
List<String> list = getZooKeeper().getChildren(queueName, true);
if (list.size() == 0) {
queueMutex.wait();
} else {
// 获取第一个子节点的名称
String firstNodeName = getFirstElementName(list);
// 删除节点,并返回节点的值
return deleteNodeAndReturnValue(firstNodeName);
}
}
}
}
网友评论