Подтвердить что ты не робот

Асинхронная ограниченная очередь в JS/TS с использованием async/wait

Я пытаюсь обернуть голову вокруг async/await, и у меня есть следующий код:

class AsyncQueue<T> {
    queue = Array<T>()
    maxSize = 1

    async enqueue(x: T) {
        if (this.queue.length > this.maxSize) {
            // Block until available
        }

        this.queue.unshift(x)
    }

    async dequeue() {
        if (this.queue.length == 0) {
            // Block until available
        }

        return this.queue.pop()!
    }
}

async function produce<T>(q: AsyncQueue, x: T) {
    await q.enqueue(x)
}

async function consume<T>(q: AsyncQueue): T {
    return await q.dequeue()
}

// Expecting 3 4 in the console
(async () => {
    const q = new AsyncQueue<number>()
    consume(q).then(console.log)
    consume(q).then(console.log)
    produce(q, 3)
    produce(q, 4)
    consume(q).then(console.log)
    consume(q).then(console.log)
})()

Моя проблема, конечно, находится в разделе "Блокировать до доступных" кода. Я ожидал, что смогу "остановить" выполнение до тех пор, пока что-то не произойдет (например, dequeue останавливается, пока не появится очередь, и наоборот, учитывая доступное пространство). У меня такое чувство, что мне, возможно, понадобится использовать сопрограммы для этого, но я действительно хотел убедиться, что я просто не пропускаю async/await магию здесь.

4b9b3361

Ответ 1

17/04/2019 Обновление: Короче говоря, в реализации AsyncSemaphore есть ошибка, обнаруженная при тестировании на основе свойств. Вы можете прочитать все об этой "сказке" здесь. Вот исправленная версия:

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()!()
    }

    async wait() {
        this.permits -= 1
        if (this.permits < 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
    }
}

Наконец, после значительных усилий, вдохновленных ответом @Titian, я думаю, что решил это. Код заполнен отладочными сообщениями, но он может служить педагогическим целям относительно потока управления:

class AsyncQueue<T> {
    waitingEnqueue = new Array<() => void>()
    waitingDequeue = new Array<() => void>()
    enqueuePointer = 0
    dequeuePointer = 0
    queue = Array<T>()
    maxSize = 1
    trace = 0

    async enqueue(x: T) {
        this.trace += 1
        const localTrace = this.trace

        if ((this.queue.length + 1) > this.maxSize || this.waitingDequeue.length > 0) {
            console.debug('[${localTrace}] Producer Waiting')
            this.dequeuePointer += 1
            await new Promise(r => this.waitingDequeue.unshift(r))
            this.waitingDequeue.pop()
            console.debug('[${localTrace}] Producer Ready')
        }

        this.queue.unshift(x)
        console.debug('[${localTrace}] Enqueueing ${x} Queue is now [${this.queue.join(', ')}]')

        if (this.enqueuePointer > 0) {
            console.debug('[${localTrace}] Notify Consumer')
            this.waitingEnqueue[this.enqueuePointer-1]()
            this.enqueuePointer -= 1
        }
    }

    async dequeue() {
        this.trace += 1
        const localTrace = this.trace

        console.debug('[${localTrace}] Queue length before pop: ${this.queue.length}')

        if (this.queue.length == 0 || this.waitingEnqueue.length > 0) {
            console.debug('[${localTrace}] Consumer Waiting')
            this.enqueuePointer += 1
            await new Promise(r => this.waitingEnqueue.unshift(r))
            this.waitingEnqueue.pop()
            console.debug('[${localTrace}] Consumer Ready')
        }

        const x = this.queue.pop()!
        console.debug('[${localTrace}] Queue length after pop: ${this.queue.length} Popping ${x}')

        if (this.dequeuePointer > 0) {
            console.debug('[${localTrace}] Notify Producer')
            this.waitingDequeue[this.dequeuePointer - 1]()
            this.dequeuePointer -= 1
        }

        return x
    }
}

Обновление: здесь чистая версия с использованием AsyncSemaphore, которая действительно инкапсулирует способ, которым все обычно делается с использованием примитивов параллелизма, но адаптирована к асинхронному стилю JavaScript CPS-single-threadaded-event-loop ™ с помощью async/await. Вы можете видеть, что логика AsyncQueue становится намного более интуитивной, и двойная синхронизация через Promises делегируется двум семафорам:

class AsyncSemaphore {
    private promises = Array<() => void>()

    constructor(private permits: number) {}

    signal() {
        this.permits += 1
        if (this.promises.length > 0) this.promises.pop()()
    }

    async wait() {
        if (this.permits == 0 || this.promises.length > 0)
            await new Promise(r => this.promises.unshift(r))
        this.permits -= 1
    }
}

class AsyncQueue<T> {
    private queue = Array<T>()
    private waitingEnqueue: AsyncSemaphore
    private waitingDequeue: AsyncSemaphore

    constructor(readonly maxSize: number) {
        this.waitingEnqueue = new AsyncSemaphore(0)
        this.waitingDequeue = new AsyncSemaphore(maxSize)
    }

    async enqueue(x: T) {
        await this.waitingDequeue.wait()
        this.queue.unshift(x)
        this.waitingEnqueue.signal()
    }

    async dequeue() {
        await this.waitingEnqueue.wait()
        this.waitingDequeue.signal()
        return this.queue.pop()!
    }
}

Обновление 2: в приведенном выше коде, по- AsyncQueue ошибка, которая стала очевидной при попытке использовать AsyncQueue размера 0. Семантика имеет смысл: это очередь без буфера, где издатель всегда ожидает потребитель существует. Линии, которые мешали ему работать, были:

await this.waitingEnqueue.wait()
this.waitingDequeue.signal()

Если вы посмотрите внимательно, вы увидите, что dequeue() не совсем симметрична enqueue(). На самом деле, если поменять местами порядок этих двух инструкций:

this.waitingDequeue.signal()
await this.waitingEnqueue.wait()

Тогда все снова работает; мне кажется интуитивно понятным, что мы сигнализируем, что есть что-то заинтересованное в dequeuing() прежде чем фактически ожидать, что произойдет постановка в enqueuing.

Я до сих пор не уверен, что это не приведет к появлению мелких ошибок без тщательного тестирования. Я оставлю это как вызов;)