Использование защитного барьера памяти в незашифрованных очередях - программирование

Использование защитного барьера памяти в незашифрованных очередях

Недавно я прочитал статью Белого Макдональда 2010 года, "Барьеры памяти: аппаратное обеспечение для программных хакеров" .

Я был бы очень признателен за некоторые отзывы/комментарии/рекомендации относительно небольших разделов кода C, приведенных ниже, которые реализуют функцию очереди очереди M & S, в частности в отношении ограничений памяти и компилятора.

Этот код использует пары указателей-счетчиков для обработки ABA и ради этого сообщения следует рассматривать как написанный и только для x86/x64.

Входящие комментарии все написаны сейчас, для этой публикации, и являются частью этого сообщения, в котором они выражают то, что я сейчас думаю, думаю.

Для краткости я разделил код утверждений и заполнения строки строки в структурах и т.д.

В настоящее время я думаю, что код довольно сломан, но я хочу убедиться, что так думаю по правильным причинам.


#define PAC_SIZE 2

struct lfds_queue_element
{
  struct lfds_queue_element
    *volatile next[PAC_SIZE];

  void
    *user_data;
};

struct lfds_queue_state
{
  struct lfds_queue_element
    *volatile enqueue[PAC_SIZE];

  struct lfds_queue_element
    *volatile dequeue[PAC_SIZE];

  atom_t volatile
    aba_counter;
};

void lfds_queue_internal_dcas_pac_enqueue( struct lfds_queue_state *lqs, struct lfds_queue_element *lqe )
{
  ALIGN(ALIGN_DOUBLE_POINTER) struct lfds_queue_element
    *local_lqe[PAC_SIZE], *enqueue[PAC_SIZE], *next[PAC_SIZE];
  unsigned char cas_result = 0;
  unsigned int backoff_iteration = 1;

  /* TRD : here we have been passed a new element to place
           into the queue; we initialize it and its next
           pointer/counter pair
  */

  local_lqe[POINTER] = lqe;
  local_lqe[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );

  local_lqe[POINTER]->next[POINTER] = NULL;
  local_lqe[POINTER]->next[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );

  /* TRD : now, I think there is a issue here, in that these values
           are by no means yet necessarily visible to other cores

           however, they only need to be visible once
           the element has entered the queue, and for that
           to happen, the contigious double-word CAS must
           have occurred - and on x86/x64, this carries
           with it an mfence

           however, that mfence will only act to empty our
           local store buffer - it will not cause other cores
           to flush their invalidation queues, so surely
           it can all still go horribly wrong?

           ah, but if all other cores are only accessing
           these variables using atomic operations, they
           too will be issuing mfences and so at that
           point flushing their invalidate queues
  */

  do
  {
    enqueue[COUNTER] = lqs->enqueue[COUNTER];
    enqueue[POINTER] = lqs->enqueue[POINTER];

    next[COUNTER] = enqueue[POINTER]->next[COUNTER];
    next[POINTER] = enqueue[POINTER]->next[POINTER];

    /* TRD : now, this is interesting

             we load the enqueue pointer and its next pointer
             we then (immediately below) check to see they're unchanged

             but this check is totally bogus!  we could be reading
             old values from our cache, where our invalidate queue
             has not been processed, so the initial read contains
             old data *and* we then go ahead and check from our cache
             the same old values a second time

             what worse is that I think we could also read the correct
             values for enqueue but an incorrect (old) value for its
             next pointer...!

             so, in either case, we easily mistakenly pass the if()
             and then enter into code which does things to the queue

             now, in both cases, the CAS will mfence, which will
             cause us to see from the data structure the true
             values, but how much will that help us - we need
             to look to see what is actually being done

             the if() checks next[POINTER] is NULL

             if we have read a NULL for next, then we think
             the enqueue pointer is correcly placed (it not
             lagging behind) so we don't need to help; we then
             try to add our element to the end of the queue

             now, it may be we have read enqueue properly but
             next improperly and so we now try to add our element
             where it will in fact truncate the queue!

             the CAS however will mfence and so at this point
             we will actually see the correct value for enqueue-next,
             and this will prevent that occurring

             if we look now at the else clause, here we have seen
             that enqueue->next is not NULL, so the enqueue pointer
             is out of place and we need to help, which we do by
             moving it down the queue

             here though we could have read enqueue correctly
             but next incorrectly; the CAS will mfence, which will
             update the cache, but since we're only comparing
             the enqueue pointer with our copy of the enqueue
             pointer, the fact our next pointer is wrong won't
             change!  so here, we move the enqueue pointer to
             some old element - which although it might be in the
             queue (so this would be an inefficiency, you'd have
             to do a bunch more queue walking to get the enqueue
             pointer to the final element) it might not be, too!
             it could in the meantime have been dequeued and
             that of course would be death
    */

    if( lqs->enqueue[POINTER] == enqueue[POINTER] and lqs->enqueue[COUNTER] == enqueue[COUNTER] )
    {
      if( next[POINTER] == NULL )
      {
        local_lqe[COUNTER] = next[COUNTER] + 1;
        cas_result = lfds_abstraction_atomic_dcas_with_backoff( (atom_t volatile *) enqueue[POINTER]->next, (atom_t *) local_lqe, (atom_t *) next, &backoff_iteration );
      }
      else
      {
        next[COUNTER] = enqueue[COUNTER] + 1;
        lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) next, (atom_t *) enqueue );
      }
    }
  }
  while( cas_result == 0 );

  local_lqe[COUNTER] = enqueue[COUNTER] + 1;

  lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) local_lqe, (atom_t *) enqueue );

  return;
}
4b9b3361

Ответ 1

CAS является атомарным, поэтому, если один поток завершается успешно, а другой поток пытается, другой поток будет терпеть неудачу и повторите попытку.

Он работает только в том случае, если все потоки пытаются получить доступ к одной и той же памяти с тем же механизмом, то есть все они имеют доступ к CAS. Если они этого не сделают, гарантии, связанные с CAS (в данном случае, заграждения памяти), выходят из окна.