Страницы

Поиск по вопросам

вторник, 21 мая 2019 г.

Синхронизация между объектами Workflow

Создаю универсальный workflow для обработки наших геномных данных. Для этого пишется более или менее стандартный в нашей среде workflow, что-то вроде такого:

Каждая панель на рисунке - отдельный экземпляр определенного класса (наследника TShape, но не думаю, что это важно) со своими методами, графические и математические проблемы вроде более-менее решены. У каждого класса есть хотя бы один метод ввода или вывода (бывает, и по нескольку), формат данных на входе и на выходе у разных классов - разные. Одному, условно говоря, нужен массив строк, другому - просто строка, третий выдает самостоятельные массивы строк и массивы целых и т.п.
Поэтому первый вопрос - как проверять соответствие типов данных? Вот хочу я сделать связь между двумя экземплярами на рисунке, а вдруг нельзя! Либо совсем нельзя, либо нужен еще посредник (конвертор или обработчик). Ничего иного не придумал, кроме как ввести единый энумератор, в котором прописать все возможные варианты типов данных и запрашивать у экземпляров, которые хочу связать, совпадают ли у них типы ввод-вывод по энумератору или нет. Или можно проще придумать способ?
Второй момент. Если использовать TMemoryStream для обмена данными, то как я должен обрабатывать такие ситуации:
вывод в Stream экземпляром-донором начат. Нужно сообщить экземпляру-акцептору, что можно начинать обрабатывать данные. Как? Создать поток акцептора уснувшим, а затем Resume? Экземпляр-акцептор обрабатывает данные быстрее, чем их записывает в Stream экземпляр-донор. Через что разумнее обработать эту ситуацию? Через CriticalSection?


Ответ

По первому вопросу: нормальное решение.
По второму вопросу: Для синхронизации работы потоков придумано много разных объектов синхронизации, в том числе и события (TEvent) и критические секции (TCriticalSection). Делать потоку Resume - это не по феншую. Чтобы передавать данные через TMemoryStream между потоками и одновременно читать и писать, его нужно сделать потокобезопасным (добавить в него критическую секцию). Возможно, для этой задачи лучше подойдет использование каналов (pipe), но мне пока не доводилось с ними работать и я про них мало чего знаю.
Итак, наш наследник от TMemoryStream
TMyCustomMemoryStream = class(TMemoryStream) private FLock: TCriticalSection; FCanReadUpTo: Int64; FAllDataWritten: Boolean; protected function GetSize: Int64; override; procedure SetSize(const NewSize: Int64); override; public constructor Create; destructor Destroy; override;
property Lock: TCriticalSection read FLock; property CanReadUpTo: Int64 read FCanReadUpTo write FCanReadUpTo; property AllDataWritten: Boolean read FAllDataWritten write FAllDataWritten; end;
{ TMyCustomMemoryStream }
constructor TMyCustomMemoryStream.Create; begin inherited Create;
FLock := TCriticalSection.Create; end;
destructor TMyCustomMemoryStream.Destroy; begin FLock.Free;
inherited Destroy; end;
function TMyCustomMemoryStream.GetSize: Int64; begin FLock.Acquire; try Result := inherited GetSize; finally FLock.Release; end; end;
procedure TMyCustomMemoryStream.SetSize(const NewSize: Int64); begin FLock.Acquire; try inherited SetSize(NewSize); finally FLock.Release; end; end;
Lock - критическая секция для доступа к данным. Как-то так:
var DataStream: TMyCustomMemoryStream;
DataStream.Lock.Acquire; try DataStream.Position := 0; DataStream.Write(...); DataStream.AllDataWritten := True; finally DataStream.Lock.Release; end;
Чтобы другой поток мог читать данные, нужно периодически отпускать критическую секцию, а не писать все за один заход.
Не забываем, что Position, после Lock.Release может быть совсем не таким, как мы ожидаем, так что его нужно будет запоминать в контексте каждого потока и устанавливать каждый раз в нужное место при записи или чтении.
Теперь поток. который читает данные:
TCustomThread= class(TThread) private FTerminatedEvent: TEvent; FCanReadData: TEvent; FData: TMyCustomMemoryStream; procedure DoReadData; function CheckAndWaitNewData(BytesReaded: Int64): Boolean; procedure ClearData; protected procedure Execute; override; procedure TerminatedSet; override; public constructor Create; destructor Destroy; override; procedure ReadData(Data: TMyCustomMemoryStream); end;
{ TCustomThread }
constructor TCustomThread.Create; begin inherited Create(False);
FTerminatedEvent := TEvent.Create(nil, True, False, ''); FCanReadData := TEvent.Create(nil, True, False, ''); end;
destructor TCustomThread.Destroy; begin Terminate; if not Suspended then WaitFor;
ClearData;
FTerminatedEvent.Free; FCanReadData.Free;
inherited Destroy; end;
procedure TCustomThread.ClearData; begin FCanReadData.ResetEvent; FreeAndNil(FData); end;
procedure TCustomThread.DoReadData; var DataPosition: Int64; CountToRead: Int64; begin DataPosition := 0;
while CheckAndWaitNewData(DataPosition) do begin // read new data FData.Lock.Acquire; try CountToRead := FData.Size - DataPosition; FData.Position := DataPosition; DataPosition := DataPosition + FData.Read(Buffer, CountToRead); finally FData.Lock.Release; end;
// process new data // ... end;
// all data readed ClearData; end;
function TCustomThread.CheckAndWaitNewData(BytesReaded: Int64): Boolean; begin // returns True on data available or False on AllDataWritten Result := FData.Size > BytesReaded; while not Result and not FData.AllDataWritten do begin // wait for 100 ms to check data available again if FTerminatedEvent.WaitFor(100) <> wrTimeout then Abort;
Result := FData.Size > BytesReaded; end; end;
procedure TCustomThread.Execute; var WOHA: TWOHandleArray; begin WOHA[0] := FCanReadData.Handle; WOHA[1] := FTerminatedEvent.Handle;
try while not Terminated do begin if WaitForMultipleObjects(2, @WOHA, False, INFINITE) = WAIT_OBJECT_0 then DoReadData else Exit; end; except on EAbort do Exit; end; end;
procedure TCustomThread.ReadData(Data: TMyCustomMemoryStream); begin if Assigned(FData) then raise Exception.Create('Thread is busy!');
FData := Data; FCanReadData.SetEvent; end;
procedure TCustomThread.TerminatedSet; begin inherited TerminatedSet;
FTerminatedEvent.SetEvent; end;
Предполагается, что поток, читающий данные, ответственен за уничтожение потока данных (возможно не лучшее решение). Поток создается и ожидает события для начала чтения FCanReadData или события, сигнализирующего о необходимости завершения работы потока FTerminatedEvent. Метод Execute можно модифицировать, если нужно делать еще какую-то работу, тогда ожидание должно быть не бесконечным либо вообще нулевым. Чтение данных запускается вызовом метода ReadData. Чтение данных продолжается до тех пор, пока не закончатся данные, после чего, если в потоке данных не установлен признак AllDataWritten начинается ожидание поступления новых данных с проверкой наличия новых данных каждые 100 мс. Если данные закончились и установлен признак AllDataWritten, чтение данных прекращается и поток данных уничтожается.

Комментариев нет:

Отправить комментарий