Подтвердить что ты не робот

Does commitOffsets на высокоуровневом блоке потребителей?

В Java-клиенте (http://kafka.apache.org/documentation.html#highlevelconsumerapi) делает commitOffsets на потребительском блоке высокого уровня до тех пор, пока смещения не будут успешно совершены, или это будет fire-and-forget

4b9b3361

Ответ 1

Выполняет ли commitOffsets на блоке верхнего уровня пользователя до смещения?

Похоже, что commitOffsets() проходит через каждого пользователя и вызывает updatePersistentPath, если его смещение изменилось, и, если это так, записывает данные через zkClient.writeData(path, getBytes(data)). Похоже, что commitOffsets() блокирует, пока не будут выполнены все смещения.

Вот исходный код для commitOffsets() (ref):

public void commitOffsets() {
    if (zkClient == null) {
        logger.error("zk client is null. Cannot commit offsets");
        return;
    }
    for (Entry<String, Pool<Partition, PartitionTopicInfo>> e : topicRegistry.entrySet()) {
        ZkGroupTopicDirs topicDirs = new ZkGroupTopicDirs(config.getGroupId(), e.getKey());
        for (PartitionTopicInfo info : e.getValue().values()) {
            final long lastChanged = info.getConsumedOffsetChanged().get();
            if (lastChanged == 0) {
                logger.trace("consume offset not changed");
                continue;
            }
            final long newOffset = info.getConsumedOffset();
            //path: /consumers/<group>/offsets/<topic>/<brokerid-partition>
            final String path = topicDirs.consumerOffsetDir + "/" + info.partition.getName();
            try {
                ZkUtils.updatePersistentPath(zkClient, path, "" + newOffset);
            } catch (Throwable t) {
                logger.warn("exception during commitOffsets, path=" + path + ",offset=" + newOffset, t);
            } finally {
                info.resetComsumedOffsetChanged(lastChanged);
                if (logger.isDebugEnabled()) {
                    logger.debug("Committed [" + path + "] for topic " + info);
                }
            }
        }
    }
}

и для updatePersistentPath(...) (ref):

public static void updatePersistentPath(ZkClient zkClient, String path, String data) {
    try {
        zkClient.writeData(path, getBytes(data));
    } catch (ZkNoNodeException e) {
        createParentPath(zkClient, path);
        try {
            zkClient.createPersistent(path, getBytes(data));
        } catch (ZkNodeExistsException e2) {
            zkClient.writeData(path, getBytes(data));
        }
    }
}