Как использовать семафоры между процессами с использованием общей памяти

Мне нужно синхронизировать N клиентских процессов с одним сервером. Эти процессы разветвляются основной функцией, в которой я объявил 3 семафора. Я решил использовать семафоры POSIX, но я не знаю, как делиться ими между этими процессами. Я думал, что общая память должна работать правильно, но у меня есть некоторые вопросы:

  • Как я могу выделить нужное пространство памяти в моем сегменте?
  • Можно ли использовать sizeof(sem_t) в size_t поле shmget для того, чтобы выделить именно то пространство, в котором я нуждаюсь?
  • Есть ли у кого-нибудь примеры, подобные этой ситуации?

Ответ 1

Легко делиться семафорами POSIX

  • Выберите имя для вашего семафора

    #define SNAME "/mysem"
  • Используйте sem_open с O_CREAT в процессе, который создает их

    sem_t *sem = sem_open(SNAME, O_CREAT, 0644, 3); /* Initial value is 3. */
  • Открыть семафоры в других процессах

    sem_t *sem = sem_open(SEM_NAME, 0); /* Open a preexisting semaphore. */

Если вы настаиваете на использовании разделяемой памяти, это, безусловно, возможно.

int fd = shm_open("shmname", O_CREAT, O_RDWR);
ftruncate(fd, sizeof(sem_t));
sem_t *sem = mmap(NULL, sizeof(sem_t), PROT_READ | PROT_WRITE,
    MAP_SHARED, fd, 0);

sem_init(sem, 1, 1);

Я не тестировал выше, так что это могут быть абсолютно бредеры.

Ответ 2

Я написал пример приложения, в котором родительский (производитель) порождает N дочерних (потребительских) потоков и использует глобальный массив для обмена данными между ними. Глобальный массив является круглой памятью и защищен, когда данные записываются в массив.

Файл заголовка:

#define TRUE 1
#define FALSE 0

#define SUCCESS 0
#define FAILURE -1

 * Function Declaration.
void Parent( void );
int CalcBitmap(unsigned int Num);
void InitSemaphore( void );
void DeInitSemaphore( void );
int ComputeComplexCalc( int op1, int op2);

 * Thread functions.
void *Child( void *arg );
void *Report( void *arg1 );

 * Macro Definition.
#define INPUT_FILE  "./input.txt"
#define NUM_OF_CHILDREN 10
#define BUF_SIZE 128
//Set the bits corresponding to all threads.
#define SET_ALL_BITMAP( a ) a = uiBitmap;     
//Clears the bit corresponding to a thread after
//every read. 
#define CLEAR_BIT( a, b ) a &= ~( 1 << b );   

 * Have a dedicated semaphore for each buffer
 * to synchronize the acess to the shared memory
 * between the producer (parent) and the consumers. 
sem_t ReadCntrMutex;
sem_t semEmptyNode[SIZE_CIRCULAR_BUF];

 * Global Variables.
unsigned int uiBitmap = 0;
//Counter to track the number of processed buffers.
unsigned int Stats;                           
unsigned int uiParentCnt = 0;
char inputfile[BUF_SIZE];
int EndOfFile = FALSE;

 * Data Structure Definition. ( Circular Buffer )
struct Message
    int id;
    unsigned int BufNo1;
    unsigned int BufNo2;
    /* Counter to check if all threads read the buffer. */
    unsigned int uiBufReadCntr;
struct Message ShdBuf[SIZE_CIRCULAR_BUF];

Исходный код:

# ipc_communicator is a IPC binary where a parent
# create ten child threads, read the input parameters
# from a file, sends that parameters to all the children.
# Each child computes the operations on those input parameters
# and writes them on to their own file.
#  Author: Ashok Vairavan              Date: 8/8/2014

 * Generic Header Files.
#include "stdio.h"
#include "unistd.h"
#include "string.h"
#include "pthread.h"
#include "errno.h"
#include "malloc.h"
#include "fcntl.h"
#include "getopt.h"
#include "stdlib.h"
#include "math.h"
#include "semaphore.h"

 * Private Header Files.
#include "ipc*.h"

 * Function to calculate the bitmap based on the 
 * number of threads. This bitmap is used to 
 * track which thread read the buffer and the
 * pending read threads.
int CalcBitmap(unsigned int Num)
   unsigned int uiIndex;

   for( uiIndex = 0; uiIndex < Num; uiIndex++ )
      uiBitmap |= 1 << uiIndex;
   return uiBitmap;
 * Function that performs complex computation on
 * numbers.
int ComputeComplexCalc( int op1, int op2)
    return sqrt(op1) + log(op2);

 * Function to initialise the semaphores. 
 * semEmptyNode indicates if the buffer is available
 * for write. semFilledNode indicates that the buffer
 * is available for read.
void InitSemaphore( void )
   unsigned int uiIndex=0;


   sem_init( &ReadCntrMutex, 0, 1);
   while( uiIndex<SIZE_CIRCULAR_BUF )
    sem_init( &semEmptyNode[uiIndex], 0, 1 );

 * Function to de-initilaise semaphore.
void DeInitSemaphore( void )
   unsigned int uiIndex=0;

   sem_destroy( &ReadCntrMutex);
   while( uiIndex<SIZE_CIRCULAR_BUF )
    sem_destroy( &semEmptyNode[uiIndex] );

/* Returns TRUE if the parent had reached end of file */
int isEOF( void )
   return __sync_fetch_and_add(&EndOfFile, 0);

 * Thread functions that prints the number of buffers
 * processed per second.
void *Report( void *arg1 )
   int CumStats = 0;

   while( TRUE )
      sleep( 1 );
      CumStats += __sync_fetch_and_sub(&Stats, 0);
      printf("Processed Per Second: %d Cumulative Stats: %d\n", 
               __sync_fetch_and_sub(&Stats, Stats), CumStats);
   return NULL;

 * Function that reads the data from the input file
 * fills the shared buffer and signals the children to
 * read the data.
void Parent( void )
   unsigned int op1, op2;
   unsigned int uiCnt = 0;

   /* Read the input parameters and process it in
    * while loop till the end of file.

   FILE *fp = fopen( inputfile, "r" );
   while( fscanf( fp, "%d %d", &op1, &op2 ) == 2 )
     /* Wait for the buffer to become empty */

     /* Access the shared buffer */
     ShdBuf[uiCnt].BufNo1 = op1;
     ShdBuf[uiCnt].BufNo2 = op2;

     /* Bitmap to track which thread read the buffer */
     sem_wait( &ReadCntrMutex );
     SET_ALL_BITMAP( ShdBuf[uiCnt].uiBufReadCntr );
     sem_post( &ReadCntrMutex );

     __sync_fetch_and_add( &uiParentCnt, 1);
     uiCnt = uiParentCnt % SIZE_CIRCULAR_BUF;

   /* If it is end of file, indicate it to other
    * children using global variable.
   if( feof( fp ) )
      __sync_add_and_fetch(&EndOfFile, TRUE);
      fclose( fp ); 


 * Child thread function which takes threadID as an
 * argument, creates a file using threadID, fetches the
 * parameters from the shared memory, computes the calculation,
 * writes the output to their own file and will release the 
 * semaphore when all the threads read the buffer.
void *Child( void *ThreadPtr )
   unsigned int uiCnt = 0, uiTotalCnt = 0;
   unsigned int op1, op2;
   char filename[BUF_SIZE];
   int ThreadID;

   ThreadID = *((int *) ThreadPtr);
   free( ThreadPtr );

   snprintf( filename, BUF_SIZE, "Child%d.txt", ThreadID );

   FILE *fp = fopen( filename, "w" );
   if( fp == NULL )
      perror( "fopen" );
      return NULL;

   while (TRUE)
      /* Wait till the parent fills the buffer */
      if( __sync_fetch_and_add( &uiParentCnt, 0 ) > uiTotalCnt )
         /* Access the shared memory */
         op1 = ShdBuf[uiCnt].BufNo1;
         op2 = ShdBuf[uiCnt].BufNo2;

         fprintf(fp, "%d %d = %d\n", op1, op2, 
                               ComputeComplexCalc( op1, op2) ); 

         sem_wait( &ReadCntrMutex );
         ShdBuf[uiCnt].uiBufReadCntr &= ~( 1 << ThreadID );
         if( ShdBuf[uiCnt].uiBufReadCntr == 0 )
            __sync_add_and_fetch(&Stats, 1);

            /* Release the semaphore lock if 
               all readers read the data */
         sem_post( &ReadCntrMutex );

         uiCnt = uiTotalCnt % SIZE_CIRCULAR_BUF;

         /* If the parent reaches the end of file and 
            if the child thread read all the data then break out */
         if( isEOF() && ( uiTotalCnt ==  uiParentCnt ) )
            //printf("Thid %d p %d c %d\n", ThreadID, uiParentCnt, uiCnt );
         /* Sleep for ten micro seconds before checking 
            the shared memory */
   fclose( fp );
   return NULL;

void usage( void )
    printf(" Usage:\n");
    printf("         -f - the absolute path of the input file where the input parameters are read from.\n\t\t Default input file: ./input.txt");
    printf("         -?  - Help Menu. \n" );

int main( int argc, char *argv[])
   pthread_attr_t ThrAttr;
   pthread_t ThrChild[NUM_OF_CHILDREN], ThrReport;
   unsigned int uiThdIndex = 0;
   unsigned int *pThreadID;
   int c;

   strncpy( inputfile, INPUT_FILE, BUF_SIZE );

   while( ( c = getopt( argc, argv, "f:" )) != -1 )
       switch( c )
           case 'f':
                strncpy( inputfile, optarg, BUF_SIZE );


   if( access( inputfile, F_OK ) == -1 )
      perror( "access" );
      return FAILURE;


   pthread_attr_init( &ThrAttr );
   while( uiThdIndex< NUM_OF_CHILDREN )
      /* Allocate the memory from the heap and pass it as an argument
       * to each thread to avoid race condition and invalid reference
       * issues with the local variables.
      pThreadID = (unsigned int *) malloc( sizeof( unsigned int ) );
      if( pThreadID == NULL )
     perror( "malloc" );
     /* Cancel pthread operation */
     return FAILURE;
      *pThreadID = uiThdIndex;
      if( pthread_create( 
             &ThrChild[uiThdIndex], NULL, &Child, pThreadID) != 0 )
    printf( "pthread %d creation failed. Error: %d\n", uiThdIndex, errno);
    perror( "pthread_create" );
    return FAILURE;

   /* Report thread reports the statistics of IPC communication
    * between parent and childs threads.
   if( pthread_create( &ThrReport, NULL, &Report, NULL) != 0 )
    perror( "pthread_create" );
    return FAILURE;


   uiThdIndex = 0;
   while( uiThdIndex < NUM_OF_CHILDREN )
      pthread_join( ThrChild[uiThdIndex], NULL );

    * Cancel the report thread function when all the 
    * children exits.
   pthread_cancel( ThrReport );


   return SUCCESS;

Ответ 3

Я действительно сомневаюсь, что подход с общей памятью когда-либо будет работать.

AFAIK семафор - фактически просто дескриптор, действительный для процесса, в котором он открыт. Реальная информация о семафоре лежит в памяти ядра.

Таким образом, использование того же значения дескриптора в другом процессе не будет работать.

Ответ 4

Да, мы можем использовать именованный семафор между двумя процессами. Мы можем открыть файл.

snprintf(sem_open_file, 128, "/sem_16");
ret_value = sem_open((const char *)sem_open_file, 0, 0, 0);

if (ret_value == SEM_FAILED) {
     * No such file or directory
    if (errno == ENOENT) {
        ret_value = sem_open((const char *)sem_open_file,
                           O_CREAT | O_EXCL, 0777, 1);

И другой процесс может использовать это.