Подтвердить что ты не робот

Vert.x Event loop - Как это асинхронно?

Я играю с Vert.x и совершенно новичок в серверах на основе цикла событий, в отличие от модели потока/подключения.

public void start(Future<Void> fut) {
    vertx
        .createHttpServer()
        .requestHandler(r -> {
            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME));
            final MyModel model = new MyModel();
            try {

                for(int i=0;i<10000000;i++){
                    //some simple operation
                }

                model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);

            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }

          r.response().end(
                  new Gson().toJson(model)
                 );
        })
        .listen(4568, result -> {
          if (result.succeeded()) {
            fut.complete();
          } else {
            fut.fail(result.cause());
          }
        });
    System.out.println("Server started ..");
  }
  • Я просто пытаюсь смоделировать длинный обработчик запросов, чтобы понять, как работает эта модель.
  • То, что я наблюдал, так называемый цикл событий блокируется до тех пор, пока мой первый запрос не завершится. Какое бы небольшое время он ни потребовал, последующий запрос не действует до тех пор, пока предыдущий не завершится.
  • Очевидно, что мне не хватает части здесь и того вопроса, который у меня есть здесь.

Отредактировано на основе ответов:

  • Не принимает все запросы, которые считаются асинхронными? Если новый соединение может быть принято только при удалении предыдущего как это происходит?
    • Предположим, что типичный запрос занимает от 100 мс до 1 с (в зависимости от характера и характера запроса). Таким образом, это означает, что цикл события не может принять новое соединение до предыдущего запроса заканчивается (даже если он затягивается в секунду). И если я как программист должны продумать все это и направить таких обработчиков запросов на рабочий поток, то как он отличается от потока/соединения модель?
    • Я просто пытаюсь понять, как эта модель лучше от традиционных моделей потоков/коннекторов? Предположим, что нет операций ввода-вывода или все операции ввода/вывода обрабатываются асинхронно? Как это решить? c10k, когда он не может одновременно запускать все параллельные запросы и ждать, пока предыдущий не завершится?
  • Даже если я решит нажать все эти операции на рабочий поток (объединенный), я вернусь к той же проблеме, не так ли? Переключение контекста между потоками? Редактирование и использование этого вопроса для награды

    • Не совсем понимаю, как эта модель претендует на асинхронность.
    • У Vert.x есть async JDBC-клиент (Asyncronous - это ключевое слово), которое я пытался адаптировать с помощью RXJava.
    • Вот пример кода (соответствующие части)

    server.requestStream(). toObservable(). subscribe (req → {

            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME));
            jdbc.getConnectionObservable().subscribe(
                    conn -> {
    
                        // Now chain some statements using flatmap composition
                        Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'");
                        // Subscribe to the final result
                        resa.subscribe(resultSet -> {
    
                            req.response().end(resultSet.getRows().toString());
                            System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
                        }, err -> {
                            System.out.println("Database problem");
                            err.printStackTrace();
                        });
                    },
    
                    // Could not connect
                    err -> {
                        err.printStackTrace();
                    }
                    );
    
    });
    server.listen(4568);
    
    • Запрос выбора занимает около 3 секунд, чтобы вернуть полный дамп таблицы.
    • Когда я запускаю одновременные запросы (только с двумя), я вижу, что второй запрос полностью ждет завершения первого.
    • Если выбор JDBC является асинхронным, не так ли справедливое ожидание того, что инфраструктура обрабатывает второе соединение, пока он ожидает, что запрос выбора будет возвращать что-либо.
4b9b3361

Ответ 1

Цикл событий Vert.x, по сути, является классическим циклом событий, существующим на многих платформах. И, конечно же, большинство объяснений и документов можно найти для Node.js, поскольку он является самой популярной платформой, основанной на этом шаблоне архитектуры. Посмотрите на более или менее хорошее объяснение механики в цикле событий Node.js. В учебнике по Vert.x также есть хорошее объяснение между словами "Не звони нам, хорошо звони" и "Вертикали".

Изменить для своих обновлений:

Прежде всего, когда вы работаете с циклом событий, основной поток должен работать очень быстро для всех запросов. Вы не должны делать длинную работу в этом цикле. И, конечно же, вам не следует ждать ответа на ваш звонок в базу данных. - Расписание звонков асинхронно - Назначить обратный вызов (обработчик) для результата - Обратный вызов будет выполняться в рабочем потоке, а не в потоке цикла событий. Этот обратный вызов, например, вернет ответ сокету. Таким образом, ваши операции в цикле обработки событий должны просто планировать все асинхронные операции с обратными вызовами и переходить к следующему запросу, не ожидая никаких результатов.

Предположим, что типичный запрос занимает от 100 мс до 1 секунды (в зависимости от типа и характера запроса).

В этом случае ваш запрос содержит некоторые дорогостоящие части вычислений или доступ к IO - ваш код в цикле обработки событий не должен ждать результатов этих операций.

Я просто пытаюсь понять, как эта модель лучше, чем традиционные модели потоков /conn-серверов? Предположим, что нет операции ввода-вывода или все операции ввода-вывода обрабатываются асинхронно?

Когда у вас слишком много одновременных запросов и традиционная модель программирования, вы будете создавать поток для каждого запроса. Что этот поток будет делать? В основном они будут ожидать операций ввода-вывода (например, результат из базы данных). Это пустая трата ресурсов. В нашей модели цикла событий у вас есть один основной поток, который планирует операции и предварительно выделяет количество рабочих потоков для длинных задач. + Ни один из этих работников на самом деле не ждет ответа, он просто может выполнить другой код в ожидании результата ввода-вывода (это может быть реализовано как обратные вызовы или периодическая проверка состояния выполняемых в данный момент заданий ввода-вывода). Я бы порекомендовал вам пройти через Java NIO и Java NIO 2, чтобы понять, как этот асинхронный ввод-вывод может быть реализован внутри фреймворка. Зеленые нити тоже очень родственная концепция, которую было бы хорошо понять. Зеленые потоки и сопрограммы представляют собой тип теневого цикла обработки событий, который пытается достичь того же - меньше потоков, потому что мы можем повторно использовать системный поток, пока зеленый поток чего-то ждет.

Как это даже решает проблему c10k, когда он не может запустить все параллельные запросы параллельно и должен ждать, пока предыдущий не завершится?

Конечно, мы не ждем в основном потоке отправки ответа на предыдущий запрос. Получить запрос, запланировать выполнение длинных /IO задач, следующий запрос.

Даже если я решу перенести все эти операции в рабочий поток (в виде пула), я вернусь к той же проблеме, не так ли? Переключение контекста между потоками?

Если все сделать правильно - нет. Более того, вы получите хорошую локализацию данных и прогнозирование потока выполнения. Одно ядро ЦП будет выполнять ваш короткий цикл обработки событий и планировать асинхронную работу без переключения контекста и ничего более. Другие ядра делают вызов в базу данных и возвращают ответ и только это. Переключение между обратными вызовами или проверка различных каналов на состояние ввода-вывода фактически не требует переключения контекста системного потока - оно фактически работает в одном рабочем потоке. Итак, у нас есть один рабочий поток на ядро, и этот системный поток ожидает/проверяет доступность результатов, например, из нескольких соединений с базой данных. Пересмотрите концепцию Java NIO, чтобы понять, как она может работать таким образом. (Классический пример для NIO - прокси-сервер, который может принимать много параллельных соединений (тысячи), запросы прокси к некоторым другим удаленным серверам, прослушивать ответы и отправлять ответы клиентам, и все это с помощью одного или двух потоков)

Что касается вашего кода, я сделал пример проекта для вас, чтобы продемонстрировать, что все работает как положено:

public class MyFirstVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> fut) {
        JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
                .put("url", "jdbc:hsqldb:mem:test?shutdown=true")
                .put("driver_class", "org.hsqldb.jdbcDriver")
                .put("max_pool_size", 30));


        client.getConnection(conn -> {
            if (conn.failed()) {throw new RuntimeException(conn.cause());}
            final SQLConnection connection = conn.result();

            // create a table
            connection.execute("create table test(id int primary key, name varchar(255))", create -> {
                if (create.failed()) {throw new RuntimeException(create.cause());}
            });
        });

        vertx
            .createHttpServer()
            .requestHandler(r -> {
                int requestId = new Random().nextInt();
                System.out.println("Request " + requestId + " received");
                    client.getConnection(conn -> {
                         if (conn.failed()) {throw new RuntimeException(conn.cause());}

                         final SQLConnection connection = conn.result();

                         connection.execute("insert into test values ('" + requestId + "', 'World')", insert -> {
                             // query some data with arguments
                             connection
                                 .queryWithParams("select * from test where id = ?", new JsonArray().add(requestId), rs -> {
                                     connection.close(done -> {if (done.failed()) {throw new RuntimeException(done.cause());}});
                                     System.out.println("Result " + requestId + " returned");
                                     r.response().end("Hello");
                                 });
                         });
                     });
            })
            .listen(8080, result -> {
                if (result.succeeded()) {
                    fut.complete();
                } else {
                    fut.fail(result.cause());
                }
            });
    }
}

@RunWith(VertxUnitRunner.class)
public class MyFirstVerticleTest {

  private Vertx vertx;

  @Before
  public void setUp(TestContext context) {
    vertx = Vertx.vertx();
    vertx.deployVerticle(MyFirstVerticle.class.getName(),
        context.asyncAssertSuccess());
  }

  @After
  public void tearDown(TestContext context) {
    vertx.close(context.asyncAssertSuccess());
  }

  @Test
  public void testMyApplication(TestContext context) {
      for (int i = 0; i < 10; i++) {
          final Async async = context.async();
          vertx.createHttpClient().getNow(8080, "localhost", "/",
                            response -> response.handler(body -> {
                                context.assertTrue(body.toString().contains("Hello"));
                                async.complete();
                            })
        );
    }
  }
}

Выход:

Request 1412761034 received
Request -1781489277 received
Request 1008255692 received
Request -853002509 received
Request -919489429 received
Request 1902219940 received
Request -2141153291 received
Request 1144684415 received
Request -1409053630 received
Request -546435082 received
Result 1412761034 returned
Result -1781489277 returned
Result 1008255692 returned
Result -853002509 returned
Result -919489429 returned
Result 1902219940 returned
Result -2141153291 returned
Result 1144684415 returned
Result -1409053630 returned
Result -546435082 returned

Итак, мы принимаем запрос - планируем запрос к базе данных, переходим к следующему запросу, мы используем их все и отправляем ответ на каждый запрос, только когда все сделано с базой данных.

Что касается вашего примера кода, я вижу две возможные проблемы - во-первых, похоже, что у вас нет соединения close(), что важно для его возврата в пул. Во-вторых, как настроен ваш пул? Если есть только одно бесплатное соединение - эти запросы будут сериализованы в ожидании этого соединения.

Я рекомендую вам добавить печать временной метки для обоих запросов, чтобы найти место, где вы сериализуете. У вас есть что-то, что делает вызовы в цикле событий блокирующими. Или... проверьте, что вы отправляете запросы параллельно в своем тесте. Не следующий после получения ответа после предыдущего.

Ответ 2

Как это асинхронно? Ответ в самом вашем вопросе

Я заметил, что так называемый цикл событий блокируется, пока мой Первый запрос завершен. Сколько бы времени не понадобилось, последующее запрос не выполняется до тех пор, пока предыдущий не завершит

Идея состоит в том, чтобы вместо каждого нового HTTP-запроса использовать новый, используется тот же поток, который вы заблокировали своей долгосрочной задачей.

Цель цикла обработки событий - сэкономить время, затрачиваемое на переключение контекста с одного потока на другой, и использовать идеальное время ЦП, когда задача использует операции ввода-вывода/сети. Если при обработке вашего запроса он выполнял другие операции ввода-вывода/сети, например: извлечение данных из удаленного экземпляра MongoDB в течение этого времени, ваш поток не будет заблокирован, и вместо этого другой поток будет обслуживать другой запрос, что является идеальным вариантом использования модель цикла обработки событий (учитывая, что на ваш сервер поступают параллельные запросы).

Если у вас есть долго выполняющиеся задачи, которые не включают работу с сетью/вводом-выводом, вам следует рассмотреть возможность использования вместо этого пула потоков, если вы заблокируете сам поток основного цикла обработки событий, другие запросы будут отложены. то есть, если вы выполняете долго выполняемые задачи, вы можете заплатить цену за переключение контекста, чтобы сервер реагировал.

EDIT: Способ, которым сервер может обрабатывать запросы, может различаться:

1) Создайте новый поток для каждого входящего запроса (в этой модели переключение контекста будет высоким, и каждый раз возникает дополнительная плата за создание нового потока)

2) Использование пула потоков для обработки запроса (тот же набор потоков будет использоваться для обслуживания запросов, а дополнительные запросы будут поставлены в очередь)

3) Используйте цикл обработки событий (один поток для всех запросов. Незначительное переключение контекста. Потому что будут некоторые запущенные потоки, например: для постановки в очередь входящих запросов)

Во-первых, переключение контекста неплохо, требуется, чтобы сервер приложений реагировал, но слишком большое переключение контекста может стать проблемой, если число одновременных запросов слишком велико (примерно более 10 тыс.). Если вы хотите понять более подробно, я рекомендую вам прочитать статью C10K

Предположим, что типичный запрос занимает от 100 мс до 1 с (на основе по виду и характеру запроса). Значит, цикл событий не может принять новое соединение, пока не закончится предыдущий запрос (даже если его заводит в секунду).

Если вам нужно ответить на большое количество одновременных запросов (более 10 тыс.), Я бы рассмотрел более 500 мс как более длительную операцию. Во-вторых, как я уже говорил, некоторые потоки/переключение контекста задействованы, например, для постановки в очередь входящих запросов, но переключение контекста между потоками будет значительно сокращено, поскольку одновременно будет слишком мало потоков. В-третьих, если при разрешении первого запроса задействована операция сети/ввода-вывода, второй запрос получит возможность быть разрешенным до разрешения первого запроса, и именно здесь эта модель хорошо работает.

И если я как программист должен думать через все это и отправить такие обработчики запросов в рабочий поток, тогда чем он отличается от модели потока/соединения?

Vertx пытается предоставить вам лучшие потоки и цикл обработки событий, поэтому, как программист, вы можете сделать запрос о том, как сделать ваше приложение эффективным в обоих сценариях, т.е. при длительной работе с сетью/операцией ввода-вывода и без нее.

Я просто пытаюсь понять, как эта модель лучше от традиционные модели потоков /conn-серверов? Предположим, что нет операции ввода-вывода или все операции ввода/вывода обрабатываются асинхронно? Как это вообще решает с10к проблема, когда он не может запустить все параллельные запросы параллельно и должны ждать, пока предыдущий не закончится?

Приведенное выше объяснение должно ответить на это.

Даже если я решу перенести все эти операции на работника нить (объединенная), тогда я возвращаюсь к той же проблеме, не так ли? контекст переключение между потоками?

Как я уже сказал, у обоих есть свои плюсы и минусы, и Vertx дает вам обе модели, и в зависимости от вашего варианта использования вы должны выбрать то, что идеально подходит для вашего сценария.

Ответ 3

В таких механизмах обработки вы должны включить длительные задачи в асинхронно выполняемые операции, и это методология для этого, так что критический поток может как можно быстрее завершить работу и вернуться к выполнению другой задачи. то есть любые операции ввода-вывода передаются в фреймворк, чтобы перезвонить вам при выполнении ввода-вывода.

Структура асинхронна в том смысле, что она поддерживает создание и выполнение этих асинхронных задач, но это не изменяет ваш код от синхронного к асинхронному.