Страницы

Поиск по вопросам

воскресенье, 26 января 2020 г.

C# Файлы, сеть, многопоточность

#c_sharp #многопоточность #сеть #fileupload #очередь


Оговорюсь сразу, использую .NET 4.0. Это в случае будущих рекомендаций в пользу async-await =)

Изначальная постановка задачи - имеется несколько однотипных приложений, функция
которых одна - записывать файлы определенного формата заранее известной длины (например,
10 минут приложение пишет файл, постоянно его увеличивая, по прошествии этого времени
оно закрывает файл и открывает новый). По завершению записи очередного файла оно отправляет
моему приложению TCP сообщение (в качестве TCP сокета использую класс Ipdaemon от ipWorks
от nSoftware) с именем файла, которое оно только что закончила писать. Размер этого
файла может составлять более полутора гигабайт. Задача моего приложения - раз в 10-15
секунд переписывать очередную порцию того, что накатало то приложение, на сетевой диск
и по получению TCP сообщения о завершении - удостовериться в том, что файлы совпадают
по содержимому.

Я написал класс для решения этой проблемы и вот его ключевой метод для перекидывания
(в среднем, его исполнение занимает от 90мс до 8000мс - StopWatch, но теоретически
может и больше):

private void ArrayCopyFile()
    {
        //http://stackoverflow.com/questions/1246899/file-copy-vs-manual-filestream-write-for-copying-file
        //http://stackoverflow.com/questions/995320/file-writeallbytes-causes-error-insufficient-system-resources-exist-to-complete#995320
        const int bufferSize = 1024*1024;
        var headerBytes = new byte[_headerManager.HeaderSize()];
        FileStream src = new FileStream(SrcFile, FileMode.Open, FileAccess.Read,
FileShare.ReadWrite);
        FileStream dst = new FileStream(DstFile, FileMode.OpenOrCreate, FileAccess.Write,
FileShare.ReadWrite);
        try
        {
            // выясним, увеличился ли исходный файл
            if (src.Length > _currPos)
            {
                // чтение заголовка файла
                src.Read(headerBytes, 0, headerBytes.Length);
            }
            else 
            {
                Trace("Исходный файл '{0}' не изменился со времени последней синхронизации",
                    SrcFile);
                return;
            }
            if (!CheckDstDirectory())
            {                    
                return;
            }
            if (_currPos > 0)
            {
                // записываем заголовок
                dst.Write(headerBytes, 0, headerBytes.Length);
                // резервируем место
                dst.SetLength(src.Length);
                // устанавливаем позицию на такую же, как у исходного файла ДО считывания
очередной порции байт
                dst.Seek(_currPos, SeekOrigin.Begin);
            }
            src.Position = _currPos;
            int bytesRead;
            byte[] bytes = new byte[bufferSize];
            while ((bytesRead = src.Read(bytes, 0, bufferSize)) > 0)
            {
                dst.Write(bytes, 0, bytesRead);
                _currPos += bytesRead;
            }
        }
        finally
        {
            src.Dispose();
            dst.Dispose();
        }
    }


Для каждого объекта этого класса создается свой объект System.Threading.Timer с интервалом
10 секунд.

Для проверки идентичности использую такой метод:

public static bool CompareFiles(string filePath1, string filePath2)
    {
        long fileLength;
        if((fileLength = new FileInfo(filePath1).Length) != new FileInfo(filePath2).Length)
            return false;
        bool filesAreEquals = true;
        const int size = 1024*1024; //0x1000000;
        int countIteration = (int)Math.Ceiling(fileLength / (double)size);
        Parallel.For(0, countIteration, x =>
        {
            if(!filesAreEquals) return;
            var start = x * size;
            if (start >= fileLength) return;
            int realSize = (int) (x == countIteration - 1 ? fileLength - start : size);
            using (FileStream file = File.OpenRead(filePath1))
            using (FileStream file2 = File.OpenRead(filePath2))
            {
                var buffer = new byte[realSize];
                var buffer2 = new byte[realSize];
                file.Position = start;
                file2.Position = start;
                int count = file.Read(buffer, 0, realSize);
                file2.Read(buffer2, 0, realSize);
                for (int i = 0; i < count; i++)
                    if (buffer[i] != buffer2[i])
                    {
                        filesAreEquals = false;
                        return;
                    }
            }
        });
        return filesAreEquals;
    }


После приемки TCP сообщения оно помещается в очередь:

class QueueTasks : IDisposable
{
    private readonly Queue _tasks = new Queue();
    readonly object _syncObj = new object();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    private readonly ManualResetEvent _exitEvent = new ManualResetEvent(false);
    private bool _isRunning;

    public bool IsRunning
    {
        get { return _isRunning; }
        private set
        {
            _isRunning = value;
            if(!_isRunning) OnQueueTasksStopped();
        }
    }

    public delegate void QueueTasksStoppedEventHandler(object sender);
    public event QueueTasksStoppedEventHandler QueueTasksStopped;
    protected virtual void OnQueueTasksStopped()
    {
        var handler = QueueTasksStopped;
        if (handler != null) handler(this);
    }

    public delegate void TaskReceivedEventHandler(object sender, T eventArgs);
    public event TaskReceivedEventHandler TaskReceived;
    protected virtual void OnTaskReceived(T eventArgs)
    {
        var handler = TaskReceived;
        if (handler != null) handler(this, eventArgs);
    }

    public void Stop()
    {
        _exitEvent.Set();
        _autoResetEvent.Set();
    }

    public void Start()
    {
        if(IsRunning) throw new InvalidOperationException("Очередь задач уже работает");
        IsRunning = true;
        _autoResetEvent.WaitOne();
        _exitEvent.WaitOne();
        new Thread(Work) { Name = "Очередь задач" }.Start();
    }

    public void Add(T data)
    {
        SafeAdd(data);
    }

    void SafeAdd(T data)
    {
        lock (_syncObj) _tasks.Enqueue(data);
        _autoResetEvent.Set();
    }

    T SafeRemove()
    {
        lock (_syncObj)
            if (_tasks.Count > 0)
                return _tasks.Dequeue();
        return default(T);
    }

    void Work()
    {
        while (!_exitEvent.WaitOne(0, false))
        {
            _autoResetEvent.WaitOne();
            while (_tasks.Count != 0)
                OnTaskReceived(SafeRemove());
        }
        IsRunning = true;
    }

    public void Dispose()
    {
        Stop();
    }
}


На событии из этой очереди я анализирую список текущих обрабатываемых файлов, если
нахожу объект класса с именем файла, который совпадает с сообщением, я останавливаю
внутренний таймер для него, проверяю, увеличился ли исходный файл, и если да, докопирую
оставшееся.

Теперь, после всей этой возможно неинтересной лабуды, возможно уже решавшейся и не
раз, задаю вопрос:

Если файлов больше 100, то в очереди начинают копиться сообщения о завершении/начале
записи файлов. Серверная машина, на которой этот софт запускается, имеет 8 ядер чистого
интела и 32 гб оперативной памяти. Пиковая загрузка процессора не выше 80%, оперативная
память - занято около 2-3 гигабайт. Загрузка на сетевом адаптере - не более 55% от
гигабитной сети. Сетевой диск - SSD. В каком месте мое приложение, перекидывающее файлы,
можно и нужно оптимизировать?
    


Ответы

Ответ 1



Parallel.For(0, countIteration, x => Параллельное чтение файлов - это почти всегда плохая идея. Просто читай их в одном потоке последовательно достаточно крупными кусками. И нет смысла каждый раз создавать новый буферный массив - создай по одному на файл и используй. int count = file.Read(buffer, 0, realSize); file2.Read(buffer2, 0, realSize); for (int i = 0; i < count; i++) А это вообще на баг похоже. Stream не обязан прочитать ровно столько байтов, сколько ты ему передал, поэтому Не факт, что файлы одинаковы, если твоя функция так говорит Например, могли сравниться только префиксы блоков. Не факт, что файлы различны, если твоя функция так говорит Например, из первого файла было прочитано больше, чем из второго.

Комментариев нет:

Отправить комментарий