Калина Алексей блог программиста

Классические задачи теории многопоточности средствами C#

Сегодня мы снова пройдемся по классической теории. На этот раз мы будем заниматься многопоточностью, а если точнее, то синхронизацией потоков. Конечно, большое число процессов позволяет более полно использовать мощности вашего компьютера. Однако, когда ваши потоки требуют взаимодействия друг с другом, часто возникают проблемы. Во многих языках программирования многопоточности уделяется большое значение, в том числе и синхронизации. Вот мы и посмотрим какое выражение нашла эта теория в языке высокого уровня C# и порешаем известные задачи о взаимодействии нескольких процессов.

Основы теории

Начнем с рассмотрения основных понятий теории. В каждый момент времени можно описать выполнение программы с помощью ее состояния. Состояние складывается из значений всех переменных программы. Процесс выполняет последовательность операторов, которые в свою очередь разбиваются на последовательность неделимых действий. Такие действия изменяют состояние программы неделимым образом.

Выполнение программы можно представить как последовательность чередующихся неделимых действий, принадлежащих разным потокам (в этом посте я буду использовать понятия процесс и поток как синонимы). Такая последовательность называется историей (напоминает теорию транзакций, не правда ли?). В случае одного процесса такая история может быть только одна, но в контексте многопоточности мы получаем огромное число возможных историй. Цель синхронизации заключается в том, чтобы исключить нежелательные истории из программы.

Для задач будут представлены классические решения на псевдокоде и то, как их можно реализовать на C#. При этом некоторые конструкции псевдокода, нужно дополнительно пояснить. Для синхронизации используются два основных приема: взаимное исключение и условная синхронизация. Взаимное исключение реализуется за счет крупномодульных неделимых действий, то есть неделимых действий, выполняющихся как одно неделимое. Такая последовательность обозначается в псевдокоде угловыми скобками. Таким образом сложение и присвоение значений в следующем операторе будет выполнено неделимым образом: <a = b + c;>

Условная синхронизация - другой прием для управления взаимодействием процессов. Она позволяет “усыпить” процесс до выполнения некоторого условия. Синтаксис такой: < await (s > 0) s = s - 1; > . В этом примере процесс доходит до условия того, что значение s положительно, и в случае невыполнения этого условия висит и ничего не делает. Как только значение станет положительным, процесс преодолеет условие и неделимым образом уменьшит a на единицу.

Блокировки и барьеры

Начнем с двух классических задач, которые можно решать без использования каких-либо сложных конструкций, помимо тех операций, что мы уже ввели.

Задача критической секции

В этой задаче N процессов многократно повторяют сначала критическую а затем некритическую секции. Под критической секцией понимается последовательность операторов, содержащих работу с некоторым общим ресурсом, то есть работу, которую в один момент времени должен выполнять только один процесс. Задача состоит в том, чтобы разработать такие вход и выход из критической секции, которые удовлетворяют следующим свойствам:

  1. Взаимное исключение. Очевидное свойство, требующее того, что в каждый момент времени в секции находится только один процесс.
  2. Отсутствие взаимной блокировки. При желании входа в секцию несколькими потоками, хотя бы один это сделает.
  3. Отсутствие излишних задержек. Если процесс хочет войти в секцию и там нет других процессов, он это осуществит.
  4. Возможность входа. Процесс, который хочет войти в секцию, когда-нибудь сделает это.

Существуют достаточно тривиальные решения этой задачи, которые обеспечивают выполнения первых трех условий, но последнее условие в теории может не выполняться. Такое происходит, когда одни и те же процессы заходят в критическую секцию, а один ожидает своего момента в течение всего выполнения программы. Поэтому мы рассмотрим несколько справедливых решений этой задачи.

Алгоритм Питерсона

Рассмотри вариант алгоритма, который решает задачу с двумя потоками. Его идея заключается в том, что при одновременной попытке процессов войти в критическую секцию, решение о том, какой из них это сделает, принимается на основе переменной (last), хранящей информацию о последнем покинувшем секцию потоке. Псевдокод алгоритма:

boolean in1 = true, in2 = false;
int last = 1;
process CS1 {
    while(true) {
        last = 1; in1 = true;
        < await (!in2 and last == 2) ; >
        //критическая секция;
        in1 = false;
        //некритическая секция;
    }
}
process CS2 {
    while(true) {
        last = 2; in2 = true;
        < await (!in1 and last == 1) ; >
        //критическая секция;
        in2 = false;
        //некритическая секция;
    }
}

Настал момент обсудить как может быть реализован оператор await. Один из вариантов называется активное ожидание. Он заключается в том, что поток ждет своей очереди в пустом цикле, пока не будет выполнено условие синхронизации. Такой подход можно использовать и в C#, но это крайне не рекомендуется. Проблема в том, что активное ожидание не освобождается от времени CPU, а наоборот тратит ресурсы, заставляя думать, что делает полезную работу. Вместо этого лучше использовать метод Sleep(), который по-настоящему блокирует поток на заданное время. Напишем код метода для Питерсона на C# для одного потока с использованием гибридного подхода, который включает и активное ожидание и блокировку.

bool in1 = true, in2 = false;
int last = 1;
void DoWork()
{
    while(true) 
    {
        last = 1; 
        in1 = true;
        while(in2 and last == 1)
            Thread.Sleep(time);
        //критическая секция;
        in1 = false;
        //некритическая секция;
    }
}

Конечно, эта задача решается более правильно с использованием других конструкций, но это ждет нас впереди.

Алгоритм билета

Разберем два других классических решения задачи критической секции, уже в варианте с любым числом потоков. Алгоритм билета схож с системой, которая используется для очередей, например в Сбербанке. Каждый поток, который хочет войти в критическую секцию, обращается к счетчику, берет себе билет с номером этого счетчика и увеличивает его на единицу. Затем поток ждет своей очереди при входе в секцию, а при выходе указывает, что подошла очередь следующего потока.

int number1 = 1, next1 = 1, turn[1:n];
process CS[i = 1 to n] {
    while(true) {
        turn[i] = FA(number, 1);
        while (turn[i] != next) skip;
        //критическая секция;
        next = next + 1;
        //некритическая секция;
    }
}

В этом коде используется операция FA (Fetch and Add). Это операция реализована на многих машинах и выполняется неделимым образом. Она работает так: FA(var, incr): < int tmp = var; var = var + incr; return tmp; >

Алгоритм поликлиники

Суть алгоритма поликлиники заключается в том, что при выборе потоком себе номера, он не обращается к общему счетчику, а выбирает наибольший среди тех, что имеются у других потоков. При определении следующей очереди, так же как и в предыдущем алгоритме, ее получает поток с наименьшим номером. Рассмотрим псевдокод алгоритма:

int turn[1:n];
process CS[i = 1 to n] {
    while(true) {
        turn[i] = 1;
        turn[i] = max(turn[1:n] + 1);
        for [j = 1 to n where j != i]
            while (turn[j] != 0 and
                    (turn[i], i) > (turn[j], j)) skip;
        //критическая секция;
        turn[i] = 0;
        //некритическая секция;
    }
}

Проверка (turn[i], i) > (turn[j], j) нужна для разрешения ситуации, когда так вышло, что у двух процессов оказался один и тот же номер очереди. В таком случае, мы сравниваем номера самих потоков. Именно это и означает такая запись.

Барьеры

Барьером называют точку в программе, преодолеть которую можно только дождавшись всех потоков. Такие конструкции часто встречаются в итеративных методах, требующих синхронизации по окончании каждой итерации. Возможность повторного использования - еще одно важное условие реализации барьера, так как, как правило, те же самые процессы будут преодолевать этот барьер на следующей итерации.

Синхронизация с управляющим процессом

Для начала решим эту задачу с использованием отдельного процесса для координации рабочих потоков. Реализация содержит два массива флагов, соответствующих процессам. Первый массив сигнализирует координатору, что потоки подошли к барьеру, а благодаря второму они узнают от координатора, что можно продолжать работу. Псевдокод алгоритма:

int arrive[1:n], continue[1:n];
process Worker {
    while(true) {
        //решаем задачу i
        arrive[i] = 1;
        < await (continue[i] == 1) ; >
        continue[i] = 0;
    }
}
process Coordinator {
    while(true) {
        for [i = 1 to n] {
            < await (arrive[i] == 1) ; >
            arrive[i] = 0;
        }
        for [i = 1 to n] continue[i] = 1;
    }
}

Проблема такого решения в том, что мы тратим отдельный поток на синхронизацию вместо того, чтобы использовать его для вычислений. Другая проблема заключается в том, что время преодоления каждого барьера пропорционально числу рабочих потоков, так как координатор обходит их в цикле. Для решения этих проблем можно использовать древовидную структуру оповещения потоков. Тогда время будет пропорционально логарифму от числа потоков, но возникнет новая проблема: разные потоки выполняют разную работу при достижении барьера. Поэтому мы рассмотрим еще один класс алгоритмов для реализации барьеров, решающий и эту проблему.

Симметричные барьеры

Потоки в симметричных барьерах выполняют одинаковый код, из чего следует преимущество таких барьеров перед другими - теоретически процессы будут подходить к барьеру в одинаковое время, и ни один из них не будет ждать дольше остальных. Код для двух симметричных потоков:

Теперь возникает задача расширения этой реализации на случай большего числа потоков. Для этого необходимо организовать систему связей, такую что в результате каждый из потоков знал, что все остальные тоже достигли барьера. Есть разные способы решения этой задачи. Рассмотрим два примера. Схема барьера-бабочки:

barrier

Схема барьера с распространением:

barrier

В языке C# для решения этой задачи существует класс Barrier. Для использования этого класса необходимо:

  1. Создать экземпляр, указав количество потоков, которые будут встречаться одновременно (можно изменить это значение позднее путем вызова методов AddParticipants() / RemoveParticipants()).
  2. Каждый поток, ожидающий встречи, должен вызвать метод SignalAndWait().

Семафоры

Семафоры нашли свое место во всевозможных библиотеках многопоточного и параллельного программирования, и сейчас мы разберемся какими их придумали теоретики когда-то давным давно (1962 г.).

Определение

Семафор - это обычная целочисленная неотрицательная переменная, с которой можно работать только с помощью двух специальных неделимых операций P и V. Для определения семафора используется ключевое слово sem. Пусть s - семафор, тогда операции P и V определяются следующим образом:

P(s): < await (s > 0) s = s - 1 >

V(s): s = s + 1

Семафоры изначально обеспечивают взаимное исключение, поэтому с их помощью можно проще решить уже рассмотренные задачи.

Критические секции

Отчасти именно для решения этой задачи были придуманы семафоры, поэтому с их пользованием код становится совсем простым.

sem mutex = 1;
process CS[i = 1 to n] {
    while(true) {
        P(mutex);
        //критическая секция;
        V(mutex);
        //некритическая секция;
    }
}

Здесь используются двоичный семафор, то есть семафор, который может принимать только два значения - ноль и один. Такие семафоры принято называть мьютексами (mutex).

Барьеры

Для организации барьера необходимо использовать массив мьютексов, по одному для каждого процесса. Операция V сигнализирует о достижении барьера потоком, а на операциях P он дожидается остальных. Рассмотрим пример кода для двух потоков:

sem arrive1 = 0, arrive2 = 0;
process Worker1 {
    while(true) {
        //...
        V(arrive1);
        P(arrive2);
        //...
    }
}
process Worker2 {
    while(true) {
        //...
        V(arrive2);
        P(arrive1);
        //...
    }
}

Такую реализацию легко превратить в вариант с N потоками с помощью схем бабочки или распределения.

Очередь Производитель-Потребитель

С помощью семафоров можно решить другую классическую задачу многопоточного программирования - организацию очереди Производитель-Потребитель. По условию задачи есть общий буфер (разделяемый массив) и два вида потоков: первые пишут данные в буфер, вторые читают из него. Нужно организовать очередь обращений к буферу так, чтобы никакие из уже записанных данных не были утеряны (перезаписаны) и никакие не были прочитаны дважды.

Мы рассмотрим решение этой задачи, организовав кольцевой буфер (buf). Переменные front и rear хранят информацию об индексах первого сообщения в очереди и первой пустой ячейки после сообщения в конце очереди, соответственно. Обращение к индексам при записи и чтении элементов из буфера производится по модулю длины массива. Для понимания того, есть ли возможность писать и читать в буфер используются семафоры full и empty. Для организации же взаимного исключения для потоков одного типа применим мьютексы mutexD и mutexF.

typeT buf[n];
int front = 0, rear = 0;
sem empty = n, full = 0;
process Producer {
    while(true) {
        // создать сообщение data
        P(empty);
        buf[rear] = data; 
        rear = (rear + 1) % n;
        V(full);
    }
}
process Consumer {
    while(true) {
        P(full);
        buf[rear] = data; 
        result = buf[front];
        front = (front + 1) % n;
        V(empty);
        //...
    }
}

Использование в современных языках

Почти в любом языке, поддерживающим многопоточность, есть такие конструкции как семафоры и мьютексы. Не является исключением и C#. Помимо того, что мьютекс является двоичным семафором, в языках программирования между этими двумя объектами есть еще одно отличие. Освободить мьютекс может только тот процесс, который его установил.

Класс Mutex языка C# имеет два основных метода для организации синхронизации между потоками. Метод WaitOne() получает блокировку, блокируя поток, если это необходимо. Для снятия блокировки используется метод ReleaseMutex(). На практике класс Mutex используется редко, так почти ту же функциональность предоставляет класс Monitor и конструкция lock (читай дальше).

Конструктор класса Semaphore имеет два параметра: вместимость потоков семафора и количество еще доступных мест. Аналогично мьютексу он имеет два важных синхронизирующих метода: WaitOne() и Release(). Semaphore с емкостью, равной единице, соответствует Mutex, за исключением того, что не имеет потока хозяина, то есть любой поток может вызвать Release() для семафора.

Реализовать паттерн Производитель-Потребитель на языке C# можно разными способами. По моему мнению наиболее простым и правильным будет использование класса BlockingCollection<T>, который инкапсулирует все необходимое.

Мониторы

Мониторы представляют собой абстракцию, которая содержит состояния объекта и обеспечивает набор операций для работы с ними. Кроме того, они позволяют разделять взаимную блокировку и сигнализацию, которые по сути своей являются разными задачами.

Синтаксис и семантика

Монитор группирует представление и реализацию разделяемого класса. Он содержит переменные, которые инициализируются и остаются постоянными на время существования монитора, и процедуры работы с ним. Монитор обладает несколькими важными характеристиками:

  1. Взаимное исключение. Процедуры монитора по определению выполняются со взаимным исключением. Программисту не требуется заботиться об этом.
  2. Условные переменные. Этот новый тип данных (cond), который может принадлежать монитору. Каждая условная переменная по сути своей является очередью. При создании монитора такая очередь пуста. Она используется для приостановки потоков. В очередь добавляются потоки и ожидают сигнала для продолжения выполнения. Есть три основные операции для работы с условными переменными:
    1. empty(cv). Определяет пуста ли очередь условной переменной cv.
    2. wait(cv). Процесс блокируется на условной переменной cv.
    3. signal(cv). При вызове этого метода происходит проверка содержит ли очередь переменной cv потоки, и, в случае положительного результата, первый поток очереди будет освобожден.
  3. Порядок сигнализации. При вызове операции signal, поток находится в мониторе и “выпускает на свободу” еще другой поток. Так как монитор обеспечивает взаимное исключение, только один из них может продолжить выполнение. Для разрешения этого конфликта есть два пути:
    1. Сигнализировать и продолжить. Сигнализирующий процесс остается в мониторе, а извлеченный из очереди условной переменной отправляется в очередь потоков ожидающих на блокировке монитора.
    2. Сигнализировать и ожидать. В этом случае запускаемый процесс возвращается к выполнению своего кода, а процесс, который его запустил, возвращается на блокировку монитора.

Все разобранные ранее задачи решаются монитором без особого труда, поэтому рассмотрим еще одну распространенную задачу, которую решим с помощью новой для нас конструкции.

Задача о читателях и писателях

К базе данных имеют доступ два типа процессов - читатели и писатели. Первые могут выполнять транзакции, которые могут только читать данные, для вторых добавляется возможность их читать. Необходимо обеспечить такой доступ к базе данных, что в любой момент времени в ней находится либо один писатель и ни одного читателя, либо сколь угодно читателей, но ни одного писателя.

Распространенным решением для синхронизации потоков является представление разделяемого ресурса монитором, но в данной ситуации такой вариант не подходит, поскольку необходимо обеспечить читателям множественный доступ к базе данных. Вместо этого монитор используется для упорядочивания запросов к ресурсу.

Есть два вида процессов и два вида действий на процесс, поэтому получаем четыре процедуры. Они представлены в коде ниже. Также необходимо вести учет читающих и пишущих процессов - переменные nr и nv. Для условной блокировки потоков используются переменные oktowrite и oktoread.

monitor RW_Controller {
    int nr = 0, nw = 0;
    cond oktoread, oktowrite;
    procedure request_read() {
        while (nw > 0) wait(oktoread);
        nr = nr + 1;
    }
    procedure release_read() {
        nr = nr - 1;
        if (nr == 0) signal(oktowrite);        
    }
    procedure request_write() {
        while (nr > 0 || nw > 0) wait(oktowrite);
        nw = nw + 1;
    }
    procedure release_write() {
        nw = nw - 1;
        signal(oktowrite);
        signal_all(oktoread);
    }
}

Отмечу, что последняя процедура монитора использует метод signall_all() - одно из типичных расширений стандартных мониторов, позволяющее снять все потоки с условной переменной.

Реализация мониторов в C#

Класс Monitor языка C# достаточно точно отражает ту теорию, что была сегодня рассказана, за отличием того, что в этом мониторе используется лишь одна условная переменная. Как следствие просто разделяют две очереди - очередь готовности (входная) и очередь ожидания (на условной переменной).

Для входа в монитор используется метод Enter(), в который аргументом передается разделяемый ресурс, который хотим заблокировать. При выходе указывается метод Exit(), а метод Wait() отправляет поток в очередь ожидания. Для снятия потоков с этой очереди используются 2 метода - Pulse() и PulseAll(), которые соответствуют signal() и signal_all() в нашей нотации. Интересно, что для первого из них используется порядок “сигнализировать и ожидать”, но для второго “сигнализировать и продолжить” (в этом случае альтернатив нет).

Класс Monitor достаточно широко используем в языке C#, поэтому для операций входа и выхода был создан синтаксический сахар. В результате мы имеем конструкцию lock(), в которую передаем имя разделяемого ресурса, и в фигурных скобках после нее можем писать код, исполняющийся внутри монитора.

Тем не менее, для решения задачи о читателях и писателях стоит воспользоваться еще одной готовой реализацией стандартной библиотеки. Класс ReaderWriterLock обеспечивает разработчика всеми необходимыми свойствами, которые требует условие задачи.

Заключение

Задачи и концепции, которые сегодня были рассмотрены, возникли достаточно давно (например, задача критической секции возникла в начале 60-х годов). Тем не менее, они регулярно встречаются в сегодняшней практике. Я считаю, что любой программист должен знать как решать эти задачи, а для того чтобы лучше понимать, почему современные средства для работы с многопоточностью работают именно так, можно обратиться к классической теории, на которой все основано. Для более плотного знакомства с темой: Основы многопоточного, параллельного и распределенного программирования. Г. Эндрюс.

P.S. Интересное наблюдение: чем ближе экзамен, тем длиннее пост…