Подтвердить что ты не робот

PHP concurrency проблема, несколько одновременных запросов; мьютексы?

Итак, я только что понял, что PHP потенциально запускает несколько запросов одновременно. Журналы прошлой ночи, похоже, показывают, что вступили два запроса, были обработаны параллельно; каждый инициировал импорт данных с другого сервера; каждая попытка вставить запись в базу данных. Один запрос не удался, когда он попытался вставить запись, которую только что вставил другой поток (импортированные данные поступают с PK, я не использую увеличивающиеся идентификаторы): SQLSTATE[23000]: Integrity constraint violation: 1062 Duplicate entry '865020' for key 'PRIMARY' ....

  • Я правильно поставил диагноз?
  • Как мне обратиться к этому вопросу?

Ниже приведен код. Я удалил большую часть из него (ведение журнала, создание других объектов за пределами Пациента из данных), но следующее должно включать соответствующие фрагменты. Запросы попадают в метод import(), который вызывает importOne() для каждой записи для импорта, по существу. Обратите внимание на метод сохранения в importOne(); что метод Eloquent (используя Laravel и Eloquent), который будет генерировать SQL для вставки/обновления записи по мере необходимости.

public function import()
{
        $now = Carbon::now();
        // Get data from the other server in the time range from last import to current import
        $calls = $this->getCalls($this->getLastImport(), $now);
        // For each call to import, insert it into the DB (or update if it already exists)
        foreach ($calls as $call) {
            $this->importOne($call);
        }
        // Update the last import time to now so that the next import uses the correct range
        $this->setLastImport($now);
}

private function importOne($call)
{
    // Get the existing patient for the call, or create a new one
    $patient = Patient::where('id', '=', $call['PatientID'])->first();
    $isNewPatient = $patient === null;
    if ($isNewPatient) {
        $patient = new Patient(array('id' => $call['PatientID']));
    }
    // Set the fields
    $patient->given_name = $call['PatientGivenName'];
    $patient->family_name = $call['PatientFamilyName'];
    // Save; will insert/update appropriately
    $patient->save();
}

Я бы предположил, что для решения потребуется мьютекс вокруг всего блока импорта? И если запрос не смог получить мьютекс, он просто переместился бы с остальной частью запроса. Мысли?

РЕДАКТИРОВАТЬ: просто отметить, что это не критический провал. Исключение вылавливается и регистрируется, а затем на запрос отвечает как обычно. И импорт успешно выполняется по другому запросу, а затем на этот запрос отвечает как обычно. Пользователи не являются более умными; они даже не знают об импорте, и это не является основной целью запроса. Так что, действительно, я мог бы просто оставить это как есть, и, кроме случайного исключения, ничего плохого не происходит. Но если есть исправление, позволяющее предотвратить выполнение дополнительной работы или ненужное обращение нескольких запросов к этому другому серверу, это может быть оправдано.

EDIT2: Хорошо, я применил механизм блокировки с flock(). Мысли? Будет ли следующая работа? И как бы я unit test это дополнение?

public function import()
{
    try {
        $fp = fopen('/tmp/lock.txt', 'w+');
        if (flock($fp, LOCK_EX)) {
            $now = Carbon::now();
            $calls = $this->getCalls($this->getLastImport(), $now);
            foreach ($calls as $call) {
                $this->importOne($call);
            }
            $this->setLastImport($now);
            flock($fp, LOCK_UN);
            // Log success.
        } else {
            // Could not acquire file lock. Log this.
        }
        fclose($fp);
    } catch (Exception $ex) {
        // Log failure.
    }
}

EDIT3: Мысли о следующей альтернативной реализации блокировки:

public function import()
{
    try {
        if ($this->lock()) {
            $now = Carbon::now();
            $calls = $this->getCalls($this->getLastImport(), $now);
            foreach ($calls as $call) {
                $this->importOne($call);
            }
            $this->setLastImport($now);
            $this->unlock();
            // Log success
        } else {
            // Could not acquire DB lock. Log this.
        }
    } catch (Exception $ex) {
        // Log failure
    }
}

/**
 * Get a DB lock, returns true if successful.
 *
 * @return boolean
 */
public function lock()
{
    return DB::SELECT("SELECT GET_LOCK('lock_name', 1) AS result")[0]->result === 1;
}

/**
 * Release a DB lock, returns true if successful.
 *
 * @return boolean
 */
public function unlock()
{
    return DB::select("SELECT RELEASE_LOCK('lock_name') AS result")[0]->result === 1;
}
4b9b3361

Ответ 1

Похоже, что у вас нет состояния гонки, потому что идентификатор поступает из файла импорта, и если ваш алгоритм импорта работает правильно, каждый поток будет иметь свой собственный осколок выполняемой работы и должен никогда не конфликтуют с другими. Теперь кажется, что 2 потока получают запрос на создание одного и того же пациента и конфликтуют друг с другом из-за плохого алгоритма.

conflictfree

Убедитесь, что каждый порожденный поток получает новую строку из файла импорта и повторяется только при ошибке.

Если вы не можете сделать это и хотите придерживаться мьютекса, использование блокировки файлов не похоже на очень приятное решение, так как теперь вы решили конфликт внутри приложения, в то время как он действительно встречается в вашей базе данных. Блокировка БД также должна быть намного быстрее, и в целом более достойное решение.

Запросить блокировку базы данных, например:

$db → exec ('LOCK TABLES table1 WRITE, table2 WRITE');

И вы можете ожидать SQL-ошибки, когда будете писать в заблокированную таблицу, так что объедините свой Patient- > save() с помощью try catch.

Еще лучше было бы использовать условный атомный запрос. Запрос БД, который также имеет условие внутри него. Вы можете использовать такой запрос:

INSERT INTO targetTable(field1) 
SELECT field1
FROM myTable
WHERE NOT(field1 IN (SELECT field1 FROM targetTable))

Ответ 2

Ваш примерный код блокирует второй запрос до завершения первого. Вам нужно будет использовать параметр LOCK_NB для flock(), чтобы немедленно возвращать ошибку и не ждать.

Да, вы можете использовать либо блокировку, либо семафоры, либо на уровне файловой системы, либо непосредственно в базе данных.

В вашем случае, когда вам нужно, чтобы каждый файл импорта обрабатывался только один раз, лучшим решением было бы иметь таблицу SQL со строкой для каждого файла импорта. В начале импорта вы вставляете информацию о том, что импорт выполняется, поэтому другие потоки будут знать, что не будут обрабатывать его снова. По завершении импорта вы помечаете это как таковое. (Через несколько часов вы можете проверить таблицу, чтобы убедиться, что импорт действительно завершен.)

Также лучше делать такие одноразовые долговременные вещи, такие как импорт по отдельным сценариям, а не показывая обычные веб-страницы посетителям. Например, вы можете запланировать ночное задание cron, которое забирает файл импорта и обрабатывает его.

Ответ 3

Я вижу три варианта:
- использовать мьютекс/семафор/некоторый другой флаг - не легко кодировать и поддерживать
- использовать встроенный механизм транзакций DB
- используйте очередь (например, RabbitMQ или 0MQ) для записи сообщений в БД в строке