Создаю универсальный workflow для обработки наших геномных данных. Для этого пишется более или менее стандартный в нашей среде workflow, что-то вроде такого:
Каждая панель на рисунке - отдельный экземпляр определенного класса (наследника TShape, но не думаю, что это важно) со своими методами, графические и математические проблемы вроде более-менее решены.
У каждого класса есть хотя бы один метод ввода или вывода (бывает, и по нескольку), формат данных на входе и на выходе у разных классов - разные. Одному, условно говоря, нужен массив строк, другому - просто строка, третий выдает самостоятельные массивы строк и массивы целых и т.п.
Поэтому первый вопрос - как проверять соответствие типов данных? Вот хочу я сделать связь между двумя экземплярами на рисунке, а вдруг нельзя! Либо совсем нельзя, либо нужен еще посредник (конвертор или обработчик). Ничего иного не придумал, кроме как ввести единый энумератор, в котором прописать все возможные варианты типов данных и запрашивать у экземпляров, которые хочу связать, совпадают ли у них типы ввод-вывод по энумератору или нет. Или можно проще придумать способ?
Второй момент. Если использовать TMemoryStream для обмена данными, то как я должен обрабатывать такие ситуации:
вывод в Stream экземпляром-донором начат. Нужно сообщить экземпляру-акцептору, что можно начинать обрабатывать данные. Как? Создать поток акцептора уснувшим, а затем Resume?
Экземпляр-акцептор обрабатывает данные быстрее, чем их записывает в Stream экземпляр-донор. Через что разумнее обработать эту ситуацию? Через CriticalSection?
Ответ
По первому вопросу: нормальное решение.
По второму вопросу: Для синхронизации работы потоков придумано много разных объектов синхронизации, в том числе и события (TEvent) и критические секции (TCriticalSection). Делать потоку Resume - это не по феншую. Чтобы передавать данные через TMemoryStream между потоками и одновременно читать и писать, его нужно сделать потокобезопасным (добавить в него критическую секцию). Возможно, для этой задачи лучше подойдет использование каналов (pipe), но мне пока не доводилось с ними работать и я про них мало чего знаю.
Итак, наш наследник от TMemoryStream
TMyCustomMemoryStream = class(TMemoryStream)
private
FLock: TCriticalSection;
FCanReadUpTo: Int64;
FAllDataWritten: Boolean;
protected
function GetSize: Int64; override;
procedure SetSize(const NewSize: Int64); override;
public
constructor Create;
destructor Destroy; override;
property Lock: TCriticalSection read FLock;
property CanReadUpTo: Int64 read FCanReadUpTo write FCanReadUpTo;
property AllDataWritten: Boolean read FAllDataWritten write FAllDataWritten;
end;
{ TMyCustomMemoryStream }
constructor TMyCustomMemoryStream.Create;
begin
inherited Create;
FLock := TCriticalSection.Create;
end;
destructor TMyCustomMemoryStream.Destroy;
begin
FLock.Free;
inherited Destroy;
end;
function TMyCustomMemoryStream.GetSize: Int64;
begin
FLock.Acquire;
try
Result := inherited GetSize;
finally
FLock.Release;
end;
end;
procedure TMyCustomMemoryStream.SetSize(const NewSize: Int64);
begin
FLock.Acquire;
try
inherited SetSize(NewSize);
finally
FLock.Release;
end;
end;
Lock - критическая секция для доступа к данным. Как-то так:
var
DataStream: TMyCustomMemoryStream;
DataStream.Lock.Acquire;
try
DataStream.Position := 0;
DataStream.Write(...);
DataStream.AllDataWritten := True;
finally
DataStream.Lock.Release;
end;
Чтобы другой поток мог читать данные, нужно периодически отпускать критическую секцию, а не писать все за один заход.
Не забываем, что Position, после Lock.Release может быть совсем не таким, как мы ожидаем, так что его нужно будет запоминать в контексте каждого потока и устанавливать каждый раз в нужное место при записи или чтении.
Теперь поток. который читает данные:
TCustomThread= class(TThread)
private
FTerminatedEvent: TEvent;
FCanReadData: TEvent;
FData: TMyCustomMemoryStream;
procedure DoReadData;
function CheckAndWaitNewData(BytesReaded: Int64): Boolean;
procedure ClearData;
protected
procedure Execute; override;
procedure TerminatedSet; override;
public
constructor Create;
destructor Destroy; override;
procedure ReadData(Data: TMyCustomMemoryStream);
end;
{ TCustomThread }
constructor TCustomThread.Create;
begin
inherited Create(False);
FTerminatedEvent := TEvent.Create(nil, True, False, '');
FCanReadData := TEvent.Create(nil, True, False, '');
end;
destructor TCustomThread.Destroy;
begin
Terminate;
if not Suspended then
WaitFor;
ClearData;
FTerminatedEvent.Free;
FCanReadData.Free;
inherited Destroy;
end;
procedure TCustomThread.ClearData;
begin
FCanReadData.ResetEvent;
FreeAndNil(FData);
end;
procedure TCustomThread.DoReadData;
var
DataPosition: Int64;
CountToRead: Int64;
begin
DataPosition := 0;
while CheckAndWaitNewData(DataPosition) do
begin
// read new data
FData.Lock.Acquire;
try
CountToRead := FData.Size - DataPosition;
FData.Position := DataPosition;
DataPosition := DataPosition + FData.Read(Buffer, CountToRead);
finally
FData.Lock.Release;
end;
// process new data
// ...
end;
// all data readed
ClearData;
end;
function TCustomThread.CheckAndWaitNewData(BytesReaded: Int64): Boolean;
begin
// returns True on data available or False on AllDataWritten
Result := FData.Size > BytesReaded;
while not Result and not FData.AllDataWritten do
begin
// wait for 100 ms to check data available again
if FTerminatedEvent.WaitFor(100) <> wrTimeout then
Abort;
Result := FData.Size > BytesReaded;
end;
end;
procedure TCustomThread.Execute;
var
WOHA: TWOHandleArray;
begin
WOHA[0] := FCanReadData.Handle;
WOHA[1] := FTerminatedEvent.Handle;
try
while not Terminated do
begin
if WaitForMultipleObjects(2, @WOHA, False, INFINITE) = WAIT_OBJECT_0 then
DoReadData
else
Exit;
end;
except
on EAbort do
Exit;
end;
end;
procedure TCustomThread.ReadData(Data: TMyCustomMemoryStream);
begin
if Assigned(FData) then
raise Exception.Create('Thread is busy!');
FData := Data;
FCanReadData.SetEvent;
end;
procedure TCustomThread.TerminatedSet;
begin
inherited TerminatedSet;
FTerminatedEvent.SetEvent;
end;
Предполагается, что поток, читающий данные, ответственен за уничтожение потока данных (возможно не лучшее решение). Поток создается и ожидает события для начала чтения FCanReadData или события, сигнализирующего о необходимости завершения работы потока FTerminatedEvent. Метод Execute можно модифицировать, если нужно делать еще какую-то работу, тогда ожидание должно быть не бесконечным либо вообще нулевым. Чтение данных запускается вызовом метода ReadData. Чтение данных продолжается до тех пор, пока не закончатся данные, после чего, если в потоке данных не установлен признак AllDataWritten начинается ожидание поступления новых данных с проверкой наличия новых данных каждые 100 мс. Если данные закончились и установлен признак AllDataWritten, чтение данных прекращается и поток данных уничтожается.
Комментариев нет:
Отправить комментарий