Подтвердить что ты не робот

Синхронизация загрузки многопоточных файлов

В настоящее время я работаю над клиентским/серверным приложением Delphi XE3 для передачи файлов (с помощью компонентов Indy FTP). Клиентская часть контролирует папку, получает список файлов внутри, загружает их на сервер и удаляет оригиналы. Загрузка выполняется отдельным потоком, который обрабатывает файлы один за другим. Файлы могут варьироваться от 0 до нескольких тысяч, а их размеры также сильно различаются.

Это приложение Firemonkey, скомпилированное как для OSX, так и для Windows, поэтому мне пришлось использовать TThread вместо OmniThreadLibrary, который я предпочитал. Мой клиент сообщает, что приложение случайно зависает. Я не мог его дублировать, но поскольку у меня не так много опыта работы с TThread, я мог бы где-то поставить тупик. Я читал довольно много примеров, но я до сих пор не уверен в некоторых многопоточных особенностях.

Структура приложения проста:
Таймер в основном потоке проверяет папку и получает информацию о каждом файле в запись, которая переходит в общий TList. В этом списке хранится информация об именах файлов, размере, прогрессе, полностью ли загружен файл или его нужно повторить. Все это отображается в сетке с индикаторами выполнения и т.д. Этот список доступен только по основному потоку. После этого элементы из списка отправляются в поток, вызывая метод AddFile (код ниже). Поток хранит все файлы в потокобезопасной очереди, подобной этой http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
Когда файл загружается, поток uploader уведомляет основной поток о вызове Synchronize.
Основной поток периодически вызывает метод Uploader.GetProgress, чтобы проверить текущий ход файла и отобразить его. Эта функция не является поточно-безопасной, но может ли она вызвать тупик или вернутся только неверные данные?

Каким будет безопасный и эффективный способ проверки прогресса?

Итак, такой подход, или я что-то пропустил? Как вы это сделаете? Например, я хочу создать новый поток только для чтения содержимого папки. Это означает, что TList, который я использую, должен быть потокобезопасным, но он должен быть доступен все время, чтобы обновить отображаемую информацию в сетке графического интерфейса. Не все ли синхронизация просто замедляют работу графического интерфейса?

Я опубликовал упрощенный код ниже, если кто-то захочет посмотреть на него. Если нет, я был бы рад услышать некоторые мнения о том, что я должен использовать в целом. Основные задачи - работать как с ОС, так и с ОС Windows; чтобы иметь возможность отображать информацию обо всех файлах и о текущем ходе; и реагировать независимо от количества и размера файлов.

Это код потока пользователя. Я удалил некоторые из них для удобства чтения:

type
  TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
  TFileInfo = record
    ID: Integer;
    Path: String;
    Size: Int64;
    UploadedSize: Int64;
    Status: TFileStatus;
  end;

  TUploader = class(TThread)
  private
    FTP: TIdFTP;
    fQueue: TThreadedQueue<TFileInfo>;
    fCurrentFile: TFileInfo;
    FUploading: Boolean;
    procedure ConnectFTP;
    function UploadFile(aFileInfo: TFileInfo): String;
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    procedure SignalComplete;
    procedure SignalError(aError: String);
  protected
    procedure Execute; override;
  public
    property Uploading: Boolean read FUploading;
    constructor Create;
    destructor Destroy; override;
    procedure Terminate;
    procedure AddFile(const aFileInfo: TFileInfo);
    function GetProgress: TFileInfo;
  end;

procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
  fQueue.Enqueue(aFileInfo);
end;

procedure TUploader.ConnectFTP;
begin
  ...
    FTP.Connect;
end;

constructor TUploader.Create;
begin
  inherited Create(false);
  FreeOnTerminate := false;
  fQueue := TThreadedQueue<TFileInfo>.Create;
  // Create the TIdFTP and set ports and other params
  ...
end;

destructor TUploader.Destroy;
begin
  fQueue.Close;
  fQueue.Free;
  FTP.Free;
  inherited;
end;

// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
  Temp: TFileInfo;
begin
  try
    ConnectFTP;
  except
    on E: Exception do
      SignalError(E.Message);
  end;

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while fQueue.Peek(fCurrentFile) = wrSignaled do
    try
      if UploadFile(fCurrentFile) = '' then
      begin
        fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
        SignalComplete;
      end;
    except
      on E: Exception do
        SignalError(E.Message);
    end;
end;

// Return the current file info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
  Result := fCurrentFile;
end;

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
  fCurrentFile.UploadedSize := AWorkCount;
end;

procedure TUploader.SignalComplete;
begin
  Synchronize(
    procedure
    begin
      frmClientMain.OnCompleteFile(fCurrentFile);
    end);
end;

procedure TUploader.SignalError(aError: String);
begin
  try
    FTP.Disconnect;
  except
  end;
  if fQueue.Closed then
    Exit;

  Synchronize(
    procedure
    begin
      frmClientMain.OnUploadError(aError);
    end);
end;

// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
  fQueue.Close;
  inherited;
end;

function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
  Result := 'Error';
  try
    if not FTP.Connected then
      ConnectFTP;
    FUploading := true;
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));     
    Result := '';
  finally
    FUploading := false;
  end;
end;

И части основного потока, которые взаимодействуют с загрузчиком:

......
// Main form
    fUniqueID: Integer;  // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
    fUploader: TUploader;         // The uploader thread
    fFiles: TList<TFileInfo>;
    fCurrentFileName: String;     // Used to display the progress
    function IndexOfFile(aID: Integer): Integer;    //Return the index of the record inside the fFiles given the file ID
  public
    procedure OnCompleteFile(aFileInfo: TFileInfo);
    procedure OnUploadError(aError: String);
  end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
  // show and log the error
end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
  I: Integer;
begin
  I := IndexOfFile(aFileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    aFileInfo.Status := fsUploaded;
    aFileInfo.UploadedSize := aFileInfo.Size;
    FFiles.Items[I] := aFileInfo;
    Inc(FFilesUploaded);
    TFile.Delete(aFileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

procedure TfrmClientMain.ProcessFolder;
var
  NewFiles: TStringDynArray;
  I, J: Integer;
  FileInfo: TFileInfo;
begin
    // Remove completed files from the list if it contains more than XX files
    while FFiles.Count > 1000 do
      if FFiles[0].Status = fsUploaded then
      begin
        Dec(FFilesUploaded);
        FFiles.Delete(0);
      end else
        Break;

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
    for I := 0 to Length(NewFiles) - 1 do
    begin
          FileInfo.ID := FUniqueID;
          Inc(FUniqueID);
          FileInfo.Path := NewFiles[I];
          FileInfo.Size := GetFileSizeByName(NewFiles[I]);
          FileInfo.UploadedSize := 0;
          FileInfo.Status := fsToBeQueued;
          FFiles.Add(FileInfo);

      if (I mod 100) = 0 then
      begin
        UpdateStatusLabel;
        grFiles.RowCount := FFiles.Count;
        Application.ProcessMessages;
        if fUploader = nil then
          break;
      end;
    end;

    // Send the new files and resend failed to the uploader thread
    for I := 0 to FFiles.Count - 1 do
      if (FFiles[I].Status = fsToBeQueued) then
      begin
        if fUploader = nil then
          Break;
        FileInfo := FFiles[I];
        FileInfo.Status := fsQueued;
        FFiles[I] := FileInfo;
        SaveDebug(1, 'Add:    ' + ExtractFileName(FFiles[I].Path));
        FUploader.AddFile(FFiles[I]);
      end;
end;

procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
  FileInfo: TFileInfo;
  I: Integer;
begin
  if (fUploader = nil) or not fUploader.Uploading then
    Exit;
  FileInfo := fUploader.GetProgress;
  I := IndexOfFile(FileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    fFiles.Items[I] := FileInfo;
    fCurrentFileName := ExtractFileName(FileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
  I: Integer;
begin
  Result := -1;
  for I := 0 to FFiles.Count - 1 do
    if FFiles[I].ID = aID then
      Exit(I);
end;
4b9b3361

Ответ 1

Возможно, это не проблема, но TFileInfo - это запись.

Это означает, что при передаче в качестве параметра (non const/var) он копируется. Это может привести к проблемам с такими вещами, как строки в записи, которые не получают количество ссылок, обновляемое при копировании записи.

Можно попытаться сделать это классом и передать экземпляр в качестве параметра (т.е. указателя на данные в куче).

Что-то еще, на что можно обратить внимание, - это совлокальный Int64 (например, ваши значения размера) в поточных 32-битных системах.

Обновление/чтение не выполняется атомарно, и у вас нет каких-либо конкретных защит, поэтому можно прочитать значение, чтобы получить несоответствие верхних и нижних 32-бит из-за потоковой передачи. (например, прочитать верхние 32 бита, записать верхние 32 биты, записать нижние 32 бита, прочитать нижние 32биты, читать и записывать в разных потоках). Вероятно, это не вызывает проблем, которые вы видите, и если вы не работаете с передачами файлов размером > 4 ГБ, вряд ли когда-нибудь возникнут какие-либо проблемы.

Ответ 2

Тупики определенно трудно обнаружить, но это может быть проблемой. В вашем коде я не видел, чтобы вы добавили тайм-аут в очередь, заглядывать или деактивировать - что означает, что он примет значение по умолчанию для Infinite.

В этой очереди есть эта строка - это означает, что любой объект синхронизации будет блокироваться до тех пор, пока не будет завершен ввод (он блокирует монитор) или время ожидания (поскольку у вас нет тайм-аута, он будет ждать вечно )

TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult;
...    
if not TMonitor.Enter(FQueue, Timeout)

Я также сделаю предположение, что вы внедрили PEEK самостоятельно на основе Dequeue - только вы фактически не удаляете элемент.

Кажется, что он реализует свой собственный тайм-аут - однако у вас все еще есть следующее:

function TSimpleThreadedQueue.Peek/Dequeue(var Item: T; Timeout: LongWord): TWaitResult;
...
if not TMonitor.Enter(FQueue, Timeout)

Если тайм-аут Infinite - поэтому, если вы находитесь в режиме peek, ожидая, что он будет сигнализирован с бесконечным тайм-аутом, вы не сможете переписать что-то из второго потока без блокировки этого потока, ожидающего, что метод peek станет завершена с бесконечным таймаутом.

Вот фрагмент комментария от TMonitor

Enter locks the monitor object with an optional timeout (in ms) value. 
Enter without a timeout will wait until the lock is obtained. 
If the procedure returns it can be assumed that the lock was acquired. 
Enter with a timeout will return a boolean status indicating whether or 
not the lock was obtained (True) or the attempt timed out prior to 
acquire the lock (False). Calling Enter with an INFINITE timeout 
is the same as calling Enter without a timeout.

Поскольку реализация по умолчанию использует Infinite, а значение TMonitor.Spinlock не предусмотрено, это блокирует поток, пока он не сможет получить объект FQueue.

Мое предложение было бы изменить ваш код следующим образом:

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while true do
    case fQueue.Peek(fCurrentFile,10) 
      wrSignaled:
        try
          if UploadFile(fCurrentFile) = '' then
          begin
            fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
            SignalComplete;
          end;
        except
          on E: Exception do
            SignalError(E.Message);
        end;
      wrTimeout: sleep(10);
      wrIOCompletion,
      wrAbandoned,
      wrError: break;
    end; //case

Таким образом, peek не будет удерживать блокировку FQueue неограниченно, оставляя окно для Enqueue для его получения и добавления файла из основного потока (UI).

Ответ 3

Это может быть длинный выстрел, но здесь есть еще одна возможность [первый ответ может быть более вероятным] (что-то, с чем я только что столкнулся, но знал раньше): использование Synchronize может вызвать тупик. Вот блог о том, почему это происходит: Delphi-Workaround-for-TThread-SynchronizeWaitFor-.aspx

Соответствующая точка статьи:

Thread A вызывает синхронизацию (MethodA)

Запросы Thread B Синхронизировать (MethodB)

Затем в контексте основного потока:

Вызов основного потока CheckSynchronize() при обработке сообщений

CheckSynchronize реализуется для пакетной обработки всех ожидающих вызовов (*). Поэтому он поднимает очередь ожидающих вызовов (содержащих MethodA и MethodB) и циклов через них один за другим.

MethodA выполняется в основном потоке контекст. Предположим, что методA вызывает ThreadB.WaitFor

Ожидание на вызовы CheckSynchronize для обработки любых ожидающих вызовов для синхронизации

В теории это должно затем обрабатывать ThreadB Synchronize (MethodB), позволяя Thread B завершить. Однако MethodB уже обладание первым вызовом CheckSynchronize, поэтому он никогда не получает называется.

ТУПИК!

статья QC Embarcadero, описывающая проблему более подробно.

Пока я не вижу вызовы ProcessMessages в вышеуказанном коде или, если на то пошло, WaitFor, которые будут вызываться во время Synchronize, все равно может быть проблема, которая в момент вызова synchronize вызывает другой поток синхронизация также - но основной поток уже синхронизирован и блокируется.

Сначала это не касалось меня, потому что я стараюсь избегать синхронных вызовов, таких как чума, и обычно разрабатывать обновления пользовательского интерфейса из потоков, используя другие методы, такие как передача сообщений и потокобезопасные списки с уведомлением о сообщении вместо синхронизации вызовов.