#delphi #классы #связи #workflow #биоинформатика
Создаю универсальный workflow для обработки наших геномных данных. Для этого пишется более или менее стандартный в нашей среде workflow, что-то вроде такого: Каждая панель на рисунке - отдельный экземпляр определенного класса (наследника TShape, но не думаю, что это важно) со своими методами, графические и математические проблемы вроде более-менее решены. У каждого класса есть хотя бы один метод ввода или вывода (бывает, и по нескольку), формат данных на входе и на выходе у разных классов - разные. Одному, условно говоря, нужен массив строк, другому - просто строка, третий выдает самостоятельные массивы строк и массивы целых и т.п. Поэтому первый вопрос - как проверять соответствие типов данных? Вот хочу я сделать связь между двумя экземплярами на рисунке, а вдруг нельзя! Либо совсем нельзя, либо нужен еще посредник (конвертор или обработчик). Ничего иного не придумал, кроме как ввести единый энумератор, в котором прописать все возможные варианты типов данных и запрашивать у экземпляров, которые хочу связать, совпадают ли у них типы ввод-вывод по энумератору или нет. Или можно проще придумать способ? Второй момент. Если использовать TMemoryStream для обмена данными, то как я должен обрабатывать такие ситуации: вывод в Stream экземпляром-донором начат. Нужно сообщить экземпляру-акцептору, что можно начинать обрабатывать данные. Как? Создать поток акцептора уснувшим, а затем Resume? Экземпляр-акцептор обрабатывает данные быстрее, чем их записывает в Stream экземпляр-донор. Через что разумнее обработать эту ситуацию? Через CriticalSection?
Ответы
Ответ 1
По первому вопросу: нормальное решение. По второму вопросу: Для синхронизации работы потоков придумано много разных объектов синхронизации, в том числе и события (TEvent) и критические секции (TCriticalSection). Делать потоку Resume - это не по феншую. Чтобы передавать данные через TMemoryStream между потоками и одновременно читать и писать, его нужно сделать потокобезопасным (добавить в него критическую секцию). Возможно, для этой задачи лучше подойдет использование каналов (pipe), но мне пока не доводилось с ними работать и я про них мало чего знаю. Итак, наш наследник от TMemoryStream: TMyCustomMemoryStream = class(TMemoryStream) private FLock: TCriticalSection; FCanReadUpTo: Int64; FAllDataWritten: Boolean; protected function GetSize: Int64; override; procedure SetSize(const NewSize: Int64); override; public constructor Create; destructor Destroy; override; property Lock: TCriticalSection read FLock; property CanReadUpTo: Int64 read FCanReadUpTo write FCanReadUpTo; property AllDataWritten: Boolean read FAllDataWritten write FAllDataWritten; end; { TMyCustomMemoryStream } constructor TMyCustomMemoryStream.Create; begin inherited Create; FLock := TCriticalSection.Create; end; destructor TMyCustomMemoryStream.Destroy; begin FLock.Free; inherited Destroy; end; function TMyCustomMemoryStream.GetSize: Int64; begin FLock.Acquire; try Result := inherited GetSize; finally FLock.Release; end; end; procedure TMyCustomMemoryStream.SetSize(const NewSize: Int64); begin FLock.Acquire; try inherited SetSize(NewSize); finally FLock.Release; end; end; Lock - критическая секция для доступа к данным. Как-то так: var DataStream: TMyCustomMemoryStream; DataStream.Lock.Acquire; try DataStream.Position := 0; DataStream.Write(...); DataStream.AllDataWritten := True; finally DataStream.Lock.Release; end; Чтобы другой поток мог читать данные, нужно периодически отпускать критическую секцию, а не писать все за один заход. Не забываем, что Position, после Lock.Release может быть совсем не таким, как мы ожидаем, так что его нужно будет запоминать в контексте каждого потока и устанавливать каждый раз в нужное место при записи или чтении. Теперь поток. который читает данные: TCustomThread= class(TThread) private FTerminatedEvent: TEvent; FCanReadData: TEvent; FData: TMyCustomMemoryStream; procedure DoReadData; function CheckAndWaitNewData(BytesReaded: Int64): Boolean; procedure ClearData; protected procedure Execute; override; procedure TerminatedSet; override; public constructor Create; destructor Destroy; override; procedure ReadData(Data: TMyCustomMemoryStream); end; { TCustomThread } constructor TCustomThread.Create; begin inherited Create(False); FTerminatedEvent := TEvent.Create(nil, True, False, ''); FCanReadData := TEvent.Create(nil, True, False, ''); end; destructor TCustomThread.Destroy; begin Terminate; if not Suspended then WaitFor; ClearData; FTerminatedEvent.Free; FCanReadData.Free; inherited Destroy; end; procedure TCustomThread.ClearData; begin FCanReadData.ResetEvent; FreeAndNil(FData); end; procedure TCustomThread.DoReadData; var DataPosition: Int64; CountToRead: Int64; begin DataPosition := 0; while CheckAndWaitNewData(DataPosition) do begin // read new data FData.Lock.Acquire; try CountToRead := FData.Size - DataPosition; FData.Position := DataPosition; DataPosition := DataPosition + FData.Read(Buffer, CountToRead); finally FData.Lock.Release; end; // process new data // ... end; // all data readed ClearData; end; function TCustomThread.CheckAndWaitNewData(BytesReaded: Int64): Boolean; begin // returns True on data available or False on AllDataWritten Result := FData.Size > BytesReaded; while not Result and not FData.AllDataWritten do begin // wait for 100 ms to check data available again if FTerminatedEvent.WaitFor(100) <> wrTimeout then Abort; Result := FData.Size > BytesReaded; end; end; procedure TCustomThread.Execute; var WOHA: TWOHandleArray; begin WOHA[0] := FCanReadData.Handle; WOHA[1] := FTerminatedEvent.Handle; try while not Terminated do begin if WaitForMultipleObjects(2, @WOHA, False, INFINITE) = WAIT_OBJECT_0 then DoReadData else Exit; end; except on EAbort do Exit; end; end; procedure TCustomThread.ReadData(Data: TMyCustomMemoryStream); begin if Assigned(FData) then raise Exception.Create('Thread is busy!'); FData := Data; FCanReadData.SetEvent; end; procedure TCustomThread.TerminatedSet; begin inherited TerminatedSet; FTerminatedEvent.SetEvent; end; Предполагается, что поток, читающий данные, ответственен за уничтожение потока данных (возможно не лучшее решение). Поток создается и ожидает события для начала чтения FCanReadData или события, сигнализирующего о необходимости завершения работы потока FTerminatedEvent. Метод Execute можно модифицировать, если нужно делать еще какую-то работу, тогда ожидание должно быть не бесконечным либо вообще нулевым. Чтение данных запускается вызовом метода ReadData. Чтение данных продолжается до тех пор, пока не закончатся данные, после чего, если в потоке данных не установлен признак AllDataWritten начинается ожидание поступления новых данных с проверкой наличия новых данных каждые 100 мс. Если данные закончились и установлен признак AllDataWritten, чтение данных прекращается и поток данных уничтожается.Ответ 2
Да, пожалуй. У нас в пакетах, где используются workflow, у всех таких экземпляров есть компонент-хозяин (потомок от TScrollBox с добавленным свойством Canvas), в обязанности которого входят отрисовка всяких стрелочек, создание очереди, автораспределение по поверхности и т.п., и в том числе хранение энумератора с типами данных, а также автосоздание цепочки, если это возможно (Скажем, на входе SAM, а процессу нужны FASTA, тогда пользователю автоматически предлагается создать экземпляр посредника). Мы от потоков отказались, для нас это неудобно, используем собственные классы-контейнеры для массивов. Поскольку не все алгоритмы могут работать с частичными данными, то каждый экземпляр-акцептор имеет свойство в том же энумераторе, нужны ли ему данные полностью или он может брать частично, и если второе - начиная с какого количества уже созданных данных ему можно отсемафорить о начале работы. Поскольку акцепторы никогда не подходят близко к адресам вновь создаваемых данных, критические секции не используются, акцепторы предупреждаются через TEvent. Но если нужен именно TMemoryStream, то @kot-da-vinci , на мой взгляд, очень хорошо ответил.
Комментариев нет:
Отправить комментарий