Подтвердить что ты не робот

Что такое эквивалент async/await сервера ThreadPool?

Я работаю на сервере tcp, который выглядит примерно так, используя синхронный apis и пул потоков:

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    ThreadPool.QueueUserWorkItem(this.HandleConnection, client);
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}

Предполагая, что моя цель состоит в том, чтобы обрабатывать как можно больше параллельных подключений при минимальном использовании ресурсов, похоже, что это будет быстро ограничено количеством доступных потоков. Я подозреваю, что с помощью неблокирующей задачи apis я смогу обрабатывать гораздо больше с меньшим количеством ресурсов.

Мое первое впечатление - это что-то вроде:

async Task Serve(){
  while(true){
    var client = await listener.AcceptTcpClientAsync();
    HandleConnectionAsync(client); //fire and forget?
  }
}

Но мне кажется, что это может вызвать узкие места. Возможно, HandleConnectionAsync займет необычно долгое время, чтобы нажать на первый вызов и остановит основной цикл принятия. Будет ли это использовать только один поток когда-либо, или будет ли время выполнения волшебным образом работать над несколькими потоками по своему усмотрению?

Есть ли способ объединить эти два подхода, чтобы мой сервер точно использовал количество потоков, необходимое для количества активных задач, но чтобы он не блокировал потоки без необходимости в операциях ввода-вывода?

Есть ли идиоматический способ максимизации пропускной способности в такой ситуации?

4b9b3361

Ответ 1

Я позволил бы Framework управлять потоками и не создавал бы никаких дополнительных потоков, если тесты профилирования не показывают, что мне это может понадобиться. Особенно, если вызовы внутри HandleConnectionAsync в основном связаны с вводом-выводом.

В любом случае, если вы хотите освободить вызывающий поток (диспетчер) в начале HandleConnectionAsync, есть очень простое решение. Вы можете перейти на новый поток из ThreadPool с помощью await Yield(). Это работает, если ваш сервер работает в среде выполнения, в которой не установлен какой-либо контекст синхронизации в начальном потоке (консольное приложение, Служба WCF), которая обычно используется для TCP-сервера.

Следующие примеры иллюстрируют это (код взят из здесь). Обратите внимание, что основной цикл while явно не создает потоков:

using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

class Program
{
    object _lock = new Object(); // sync lock 
    List<Task> _connections = new List<Task>(); // pending connections

    // The core server task
    private async Task StartListener()
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            // if already faulted, re-throw any error on the calling context
            if (task.IsFaulted)
                await task;
        }
    }

    // Register and handle the connection
    private async Task StartHandleConnectionAsync(TcpClient tcpClient)
    {
        // start the new connection task
        var connectionTask = HandleConnectionAsync(tcpClient);

        // add it to the list of pending task 
        lock (_lock)
            _connections.Add(connectionTask);

        // catch all errors of HandleConnectionAsync
        try
        {
            await connectionTask;
            // we may be on another thread after "await"
        }
        catch (Exception ex)
        {
            // log the error
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            // remove pending task
            lock (_lock)
                _connections.Remove(connectionTask);
        }
    }

    // Handle new connection
    private async Task HandleConnectionAsync(TcpClient tcpClient)
    {
        await Task.Yield();
        // continue asynchronously on another threads

        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    }

    // The entry point of the console app
    static async Task Main(string[] args)
    {
        Console.WriteLine("Hit Ctrl-C to exit.");
        await new Program().StartListener();
    }
}

В качестве альтернативы код может выглядеть следующим образом, без await Task.Yield(). Обратите внимание, я передаю лямбду async в Task.Run, потому что я все еще хочу воспользоваться асинхронными API-интерфейсами внутри HandleConnectionAsync и использовать там await:

// Handle new connection
private static Task HandleConnectionAsync(TcpClient tcpClient)
{
    return Task.Run(async () =>
    {
        using (var networkStream = tcpClient.GetStream())
        {
            var buffer = new byte[4096];
            Console.WriteLine("[Server] Reading from client");
            var byteCount = await networkStream.ReadAsync(buffer, 0, buffer.Length);
            var request = Encoding.UTF8.GetString(buffer, 0, byteCount);
            Console.WriteLine("[Server] Client wrote {0}", request);
            var serverResponseBytes = Encoding.UTF8.GetBytes("Hello from server");
            await networkStream.WriteAsync(serverResponseBytes, 0, serverResponseBytes.Length);
            Console.WriteLine("[Server] Response has been written");
        }
    });
}

Обновлено, основываясь на комментарии: если это будет библиотечный код, среда выполнения действительно неизвестна и может иметь контекст синхронизации не по умолчанию. В этом случае я бы предпочел запустить основной цикл сервера в потоке пула (который не имеет никакого контекста синхронизации):

private static Task StartListener()
{
    return Task.Run(async () => 
    {
        var tcpListener = TcpListener.Create(8000);
        tcpListener.Start();
        while (true)
        {
            var tcpClient = await tcpListener.AcceptTcpClientAsync();
            Console.WriteLine("[Server] Client has connected");
            var task = StartHandleConnectionAsync(tcpClient);
            if (task.IsFaulted)
                await task;
        }
    });
}

Таким образом, все дочерние задачи, созданные внутри StartListener, не будут затронуты контекстом синхронизации клиентского кода. Так что мне не пришлось бы нигде явно вызывать Task.ConfigureAwait(false).

Ответ 2

Существующие ответы правильно предложили использовать Task.Run(() => HandleConnection(client));, но не объяснили, почему.

Вот почему: Вы обеспокоены тем, что HandleConnectionAsync может занять некоторое время, чтобы попасть в первую очередь. Если вы придерживаетесь использования async IO (как и в этом случае), это означает, что HandleConnectionAsync выполняет работу с ЦП без каких-либо блокировок. Это идеальный случай для пула потоков. Это сделано для выполнения короткой, неблокирующей работы ЦП.

И вы правы, что цикл accept будет задерживаться на HandleConnectionAsync, занимая много времени перед возвратом (возможно, потому, что в нем есть значительная работа по работе с ЦП). Этого следует избегать, если вам нужна высокая частота новых соединений.

Если вы уверены, что нет существенной работы по дросселированию цикла, вы можете сохранить дополнительный пул потоков Task и не делать этого.

В качестве альтернативы вы можете одновременно запускать несколько принятых команд. Замените await Serve(); на (например):

var serverTasks =
    Enumerable.Range(0, Environment.ProcessorCount)
    .Select(_ => Serve());
await Task.WhenAll(serverTasks);

Это устраняет проблемы масштабируемости. Обратите внимание, что await проглотит здесь все, кроме одной ошибки.

Ответ 3

Try

TcpListener listener;
void Serve(){
  while(true){
    var client = listener.AcceptTcpClient();
    Task.Run(() => this.HandleConnection(client));
    //Or alternatively new Thread(HandleConnection).Start(client)
  }
}

Ответ 4

В соответствии с Microsoft http://msdn.microsoft.com/en-AU/library/hh524395.aspx#BKMK_VoidReturnType тип возврата void не должен использоваться, потому что он не может улавливать исключения. Как вы указали, вам нужны задачи "стрелять и забывать", поэтому я пришел к выводу, что вы всегда должны возвращать задачу (как заявила Microsoft), но вы должны поймать ошибку, используя:

TaskInstance.ContinueWith(i => { /* exception handler */ }, TaskContinuationOptions.OnlyOnFaulted);

Пример, который я использовал в качестве доказательства, приведен ниже:

public static void Main()
{
    Awaitable()
        .ContinueWith(
            i =>
                {
                    foreach (var exception in i.Exception.InnerExceptions)
                    {
                        Console.WriteLine(exception.Message);
                    }
                },
            TaskContinuationOptions.OnlyOnFaulted);
    Console.WriteLine("This needs to come out before my exception");
    Console.ReadLine();
}

public static async Task Awaitable()
{
    await Task.Delay(3000);
    throw new Exception("Hey I can catch these pesky things");
}

Ответ 5

Есть ли какая-то причина, по которой вам нужно принимать соединения async? Я имею в виду, ожидает ли какое-либо соединение с клиентом какое-либо значение? Единственная причина для этого - это потому, что на сервере есть еще одна работа, ожидающая соединения. Если есть, возможно, вы можете сделать что-то вроде этого:

    public async void Serve()
    {
        while (true)
        {
            var client = await _listener.AcceptTcpClientAsync();
            Task.Factory.StartNew(() => HandleClient(client), TaskCreationOptions.LongRunning);
        }
    }

Таким образом, принятие будет освобождать текущий вариант останова для других действий, и обработка выполняется в новом потоке. Единственные накладные расходы - это создать новый поток для обработки клиента, прежде чем он вернется к принятию нового соединения.

Изменить: Просто понял, что это тот же самый код, который вы написали. Думаю, мне нужно снова прочитать ваш вопрос, чтобы лучше понять, что вы на самом деле спрашиваете: S

Edit2:

Есть ли способ объединить эти два подхода, чтобы мой сервер использовал именно    количество потоков, необходимое для количества активно работающих задач, но чтобы оно    не блокировать потоки без необходимости при выполнении операций ввода-вывода?

Думаю, мое решение действительно отвечает на этот вопрос. Действительно ли это необходимо?

Edit3: Сделано Task.Factory.StartNew() фактически создает новый поток.