Лабораторная работа № 10 Потоки



Разработка многопоточного приложения. Синхронизация потоков.

Цель работы. Исследовать возможности многопоточности, а также вопросы создания и использования волокон на одно- и многоядерном процессорах.

Теоретические сведения

Процесс состоит из потоков. При инициализации процесса система создает всегда первичный поток. Поток определяет последовательность исполнения кода в процессе. В то время как приложение предназначено для выполнения всей задачи, поток – это мини-программа, принадлежащая приложению и выполняющая частную задачу.

Создание и запуск потоков

Для создания потоков используется конструктор класса Thread, принимающий в качестве параметра делегат типа ThreadStart, указывающий метод, который нужно выполнить. Делегат ThreadStart определяется так:

public delegate void ThreadStart();

Вызов метода Start начинает выполнение потока. Поток продолжается до выхода из исполняемого метода. Вот пример, использующий полный синтаксис C# для создания делегата ThreadStart:

class ThreadTest

{ static void Main() 

 { Thread t = new Thread(new ThreadStart(Go));

t.Start(); // Выполнить Go() в новом потоке.

   Go();   // Одновременно запустить Go() в главном потоке. 

}

 static void Go()

 { Console.WriteLine("hello!"); }

В этом примере поток выполняет метод Go() одновременно с главным потоком. Результат – два почти одновременных «hello»:

hello!hello!

Поток можно создать, используя для присваивания значений делегатам более удобный сокращенный синтаксис C#:

static void Main()

{ Thread t = new Thread(Go); // Безявногоиспользования ThreadStart

t.Start(); 

 }

static void Go() { ... }

В этом случае делегат ThreadStart выводится компилятором автоматически.

Передача данных в ThreadStart

Допустим, что в рассматриваемом выше примере мы захотим более явно различать вывод каждого из потоков, например, по регистру символов. Можно добиться этого, передавая соответствующий флаг в метод Go(), но в этом случае нельзя использовать делегат ThreadStart, так он не принимает аргументов. К счастью, .NET Framework определяет другую версию делегата – ParameterizedThreadStart, которая может принимать один аргумент:

public delegate void ParameterizedThreadStart(object obj);

Предыдущий пример можно переписать так:

class ThreadTest

{ static void Main()

{ Thread t = new Thread(Go);

t.Start(true);        // == Go(true)  

Go(false); } 

static void Go(object upperCase)

{ bool upper = (bool)upperCase;

Console.WriteLine(upper ? "HELLO!" : "hello!"); }}

Консольныйвывод:

hello!HELLO!

В этом примере компилятор автоматически выводит делегат ParameterizedThreadStart, так как метод Go() принимает в качестве параметра один object.

Особенность использования ParameterizedThreadStart состоит в том, что перед использованием нужно привести аргумент из типа object к нужному типу (в данном случае bool). К тому же существует только версия, принимающая единственный аргумент.

Свойства класса Thread

•   Статическое свойство CurrentContext позволяет получить контекст, в котором выполняется поток

•   Статическое свойство CurrentThread возвращает ссылку на выполняемый поток

•   Свойство IsAlive указывает, работает ли поток в текущий момент

•   Свойство IsBackground указывает, является ли поток фоновым

•   Свойство Name содержит имя потока

•   Свойство Priority хранит приоритет потока - значение перечисления ThreadPriority:

enum ThreadPriority { Lowest, BelowNormal, Normal, AboveNormal, Highest }

Значение приоритета становится существенным, когда одновременно исполняются несколько потоков.

•   Свойство ThreadState возвращает состояние потока - одно из значений перечисления ThreadState

Некоторые методы класса Thread:

•   Метод Abort уведомляет среду CLR о том, что надо прекратить поток, однако прекращение работы потока происходит не сразу, а только тогда, когда это становится возможно. Для проверки завершенности потока следует опрашивать его свойство ThreadState

•   Метод Interrupt прерывает поток на некоторое время

•   Метод Resume возобновляет работу ранее приостановленного потока

•   Метод Start запускает поток

•   Метод Suspend приостанавливает поток

Блокировка потоков

Класс имеет методы приостановки выполнения потоков:

· Sleep(миллисек) Блокировка на указанное время

Если быть более точным, Thread.Sleep отпускает CPU и сообщает, что потоку не должно выделяться время в указанный период. Thread.Sleep(0) отпускает CPU для выделения одного кванта времени следующему потоку в очереди на исполнение.При передаче значения Timeout. Infinite поток будет приостановлен на неопределенно долгий срок, пока это состояние потока не будет прервано другим потоком, вызвавшим метод приостановленного потока Thread.Interrupt.

· Второй способ приостановить исполнение потока — вызов метода Thread.Suspend. Между этими методиками несколько важных отличий. Во-первых, можно вызвать метод Thread. Suspend для потока, исполняемого в текущий момент, или для другого потока. Во-вторых, если таким образом приостановить выполнение потока, любой другой поток способен возобновить его выполнение с помощью метода Thread. Resume. Обратите внимание, что, когда один поток приостанавливает выполнение другого, первый поток не блокируется. Возврат управления после вызова происходит немедленно. Кроме того, единственный вызов Thread.Resume возобновит исполнение данного потока независимо от числа вызовов метода Thread.Suspend, выполненных ранее.

· Join()    Ожидание окончания другого потока

class JoinDemo

{ static void Main() 

{ Thread t = new Thread(delegate() { Console.ReadLine(); });

t.Start();

t.Join(); // ожидать, пока поток не завершится

Console.WriteLine("Thread t's ReadLine complete!");

}}

Метод Join может также принимать в качестве аргумента timeout - в миллисекундах или как TimeSpan. Если указанное время истекло, а поток не завершился, Join возвращает false. Join с timeout функционируеткак Sleep:

Thread.Sleep(1000);Thread.CurrentThread.Join(1000);

Синхронизация потоков

При параллельном выполнении потоков и процессов могут возникнуть проблемы с доступом к разделяемым ресурсам. Это могут быть общие переменные, файлы, другие ресурсы. Например:

class Program

{static int x=0;

static void Main(string[] args)

{ for (int i = 0; i < 5; i++)

{ Thread myThread = new Thread(Count);

       myThread.Name = "Поток " + i.ToString();

myThread.Start();

   }

Console.ReadLine();

}

public static void Count()

{ x = 1;

for (int i = 1; i < 9; i++)

{ Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x);

x++;

Thread.Sleep(100);

   }

}

}

Здесь запускаются пять потоков, которые работают с общей переменной x. И мы предполагаем, что метод выведет все значения x от 1 до 8. И так для каждого потока. Однако в реальности в процессе работы будет происходить переключение между потоками, и значение переменной x становится непредсказуемым.

В основу синхронизации положено понятие блокировки, посредством которой организуется управление доступом к кодовому блоку в объекте. Когда объект заблокирован одним потоком, остальные потоки не могут получить доступ к заблокированному кодовому блоку. Когда же блокировка снимается одним потоком, объект становится доступным для использования в другом потоке.

Lock

Средство блокировки встроено в язык C#. Благодаря этому все объекты могут быть синхронизированы. Синхронизация организуется с помощью ключевого слова lock.

Ниже приведена общая форма блокировки:

lock(lockObj) {

// синхронизируемые операторы

}

где lockObj обозначает ссылку на синхронизируемый объект.

Если же требуется синхронизировать только один оператор, то фигурные скобки не нужны. Оператор lock гарантирует, что фрагмент кода, защищенный блокировкой для данного объекта, будет использоваться только в потоке, получающем эту блокировку. А все остальные потоки блокируются до тех пор, пока блокировка не будет снята. Блокировка снимается по завершении защищаемого ею фрагмента кода.

Блокируемым считается такой объект, который представляет синхронизируемый ресурс. В некоторых случаях им оказывается экземпляр самого ресурса или же произвольный экземпляр объекта, используемого для синхронизации. Следует, однако, иметь в виду, что блокируемый объект не должен быть общедоступным, так как в противном случае он может быть заблокирован из другого, неконтролируемого в программе фрагмента кода и в дальнейшем вообще не разблокируется.

class Program

{ static int x=0;

static object locker = new object();

static void Main(string[] args)

{ for (int i = 0; i < 5; i++)

{ Thread myThread = new Thread(Count);

       myThread.Name = "Поток " + i.ToString();

myThread.Start();

   }

Console.ReadLine();

}

public static void Count()

{ lock (locker)

   { x = 1;

for (int i = 1; i < 9; i++)

{ Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x);

x++;

Thread.Sleep(100);

       }

   }

}

}

Для блокировки с ключевым словом lock используется объект-заглушка, в данном случае это переменная locker. Когда выполнение доходит до оператора lock, объект locker блокируется, и на время его блокировки монопольный доступ к блоку кода имеет только один поток. После окончания работы блока кода, объект locker освобождается и становится доступным для других потоков.

В прошлом для блокировки объектов очень часто применялась конструкция lock (this). Но она пригодна только в том случае, если this является ссылкой на закрытый объект. В связи с возможными программными и концептуальными ошибками, к которым может привести конструкция lock (this), применять ее больше не рекомендуется. Вместо нее лучше создать закрытый объект, чтобы затем заблокировать его.

Мониторы

Наряду с оператором lock для синхронизации потоков мы можем использовать мониторы, представленные классомSystem.Threading.Monitor.

Компилятор C# преобразует оператор lock в код, использующий класс Monitor. Например, показанный ниже оператор lock:

lock (obj)

{

// синхронизированная область для obj

}

будет преобразован в код, который вызывает метод Enter() и ожидает, пока поток не получит объектную блокировку. В каждый момент времени только один поток может быть владельцем объектной блокировки. После получения блокировки поток сможет входить в синхронизируемый раздел. Метод Exit() класса Monitor позволяет снимать блокировку.

Компилятор помещает вызов метода Exit()вобработчикfinally блока try, чтобы блокировка снималась даже в случае генерации исключения:

Monitor.Enter(obj);

try

{

// синхронизированная область для obj

}

finally

{

Monitor.Exit(obj);

}

Рассмотренный в прошлой теме пример будет эквивалентен следующему коду:

class Program

{ static int x=0;

static object locker = new object();

static void Main(string[] args)

{ for (int i = 0; i < 5; i++)

{ Thread myThread = new Thread(Count);

       myThread.Name = "Поток " + i.ToString();

myThread.Start();

   }

Console.ReadLine();

}

public static void Count()

{ Monitor.Enter(locker);

try

{ x = 1;

for (int i = 1; i < 9; i++)

{ Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x);

x++;

Thread.Sleep(100);

       }

   }

finally

{ Monitor.Exit(locker);

   }

}

}

Метод Monitor.Enter блокирует объект locker так же, как это делает оператор lock. А в блоке try...finally с помощью методаMonitor.Exit происходит освобождение объекта locker, и он становится доступным для других потоков.

Класс Monitor обладает одним важным преимуществом по сравнению с оператором lock в C#: он позволяет добавлять значение тайм-аута для ожидания получения блокировки. Таким образом, вместо того, чтобы ожидать блокировку до бесконечности, можно вызвать метод TryEnterи передать в нем значение тайм-аута, указывающее, сколько максимум времени должно ожидаться получение блокировки.

static void TryEnter(

    object obj,

    int millisecondsTimeout,

    ref bool lockTaken)

Когда блокировка obj получена, метод TryEnter()устанавливает булевский параметр ref в true и производит синхронизированный доступ к состоянию, охраняемому объектом obj. Если obj блокируется другим потоком на протяжении более заданного параметром millisecondsTimeout количества миллисекунд, то TryEnter()устанавливает параметр lockTaken в falseи поток больше не ожидает, а используется для выполнения другой работы.

Кроме блокировки и разблокировки объекта класс Monitor имеет еще ряд методов, которые позволяют управлять синхронизацией потоков. Так, метод Monitor.Wait освобождает блокировку объекта и переводит поток в очередь ожидания объекта. Следующий поток в очереди готовности объекта блокирует данный объект. А все потоки, которые вызвали метод Wait, остаются в очереди ожидания, пока не получат сигнала от метода Monitor.Pulse или Monitor.PulseAll, посланного владельцем блокировки. Если методMonitor.Pulse отправлен, поток, находящийся во главе очереди ожидания, получает сигнал и блокирует освободившийся объект. Если же метод Monitor.PulseAll отправлен, то все потоки, находящиеся в очереди ожидания, получают сигнал и переходят в очередь готовности, где им снова разрешается получать блокировку объекта.

/// Находит максимальный элемент при условии, что на элементах массивавыполняется

/// некоторое условие (> 100) Если условие не выполняется,поток приостанавливается,

///уступая место другому потоку, исправляющему ситуацию

   public void Max_resource()

{ lock (resource)

{ for (int i = 0; i < n; i++)

{ if (resource[i] < 100)

               { stop_index = i;

Monitor.Pulse(resource);

Monitor.Wait(resource);

               }

if (max < resource[i])

max = resource[i];

           }

finished = true;

Monitor.Pulse(resource);               

}

   }

Обратите внимание на связку Monitor.Pulse и Monitor.Wait, - уведомляем другой поток, чтобы он мог перейти в состояние готовности, и переходим в режим ожидания. А вот как выглядит код другого потока, исправляющего "некорректные" элементы:

/// Метод, исправляющий ситуациюМалые элементы заменяет большими

public void ChangeSituation()

   { lock (resource)

       {

if (stop_index == -1)

Monitor.Wait(resource);               

do

               {

resource[stop_index] = rnd.Next(100, 200);

Monitor.Pulse(resource);

Monitor.Wait(resource);

} while (!finished);

       }

   }

Если метод первым начинает работу, то он входит в режим ожидания. В противном случае он исправляет элемент и выполняет связку методов Pulse - Wait. Возобновление работы происходит с проверки условия завершения обработки массива. Если первый поток закончил работу, то завершает работу и второй поток.

Класс AutoResetEvent

Класс AutoResetEvent (System.Threading) также служит целям синхронизации потоков. Этот класс является оберткой над объектом ОС Windows "событие с автосбросом" и позволяет переключить данный объект-событие из сигнального в несигнальное состояние.

Set() Устанавливает сигнальное состояние события, что позволяет продолжить выполнение одному или нескольким ожидающим потокам (сигнализирует о том, что событие произошло).

WaitOne() Блокирует текущий поток до получения сигнала.

WaitOne принимает необязательный параметр timeout – метод возвращает false, если ожидание заканчивается по таймауту, а не по получению сигнала.

Метод ResetУстанавливает не сигнальное состояние события (сброс).

Метод Close нужно вызывать сразу же, как только объект события станет не нужен – для освобождения ресурсов операционной системы.

AutoResetEvent может быть создан двумя путями. Во-первых, с помощью своего конструктора:

EventWaitHandle wh = new AutoResetEvent(false);

Если аргумент конструктора true, метод Set будет вызван автоматически сразу после создания объекта.

Другой метод состоит в создании объекта базового класса, EventWaitHandle:

EventWaitHandle wh = new EventWaitHandle(false, EventResetMode.Auto);

Конструктор EventWaitHandle также позволяет создавать именованные EventWaitHandle, способные действовать через границы процессов. Имя задается обыкновенной строкой и может быть любым. Если задаваемое имя уже используется на компьютере, возвращается ссылка на существующий EventWaitHandle, в противном случае операционная система создает новый. Вот пример:

EventWaitHandle wh = new EventWaitHandle(false, EventResetMode.Auto,

"MyCompany.MyApp.SomeName");

Исполнив этот код, два приложения получили бы возможность сигнализировать друг другу из любого потока обоих процессов.

Пример из предыдущей темы мы можем переписать с использованием AutoResetEvent следующим образом:

       class Program

{ static AutoResetEvent waitHandler = new AutoResetEvent(true);

static int x=0;

static void Main(string[] args)

{ for (int i = 0; i < 5; i++)

{ Thread myThread = new Thread(Count);

       myThread.Name = "Поток " + i.ToString();

myThread.Start();

   }

Console.ReadLine();

}

public static void Count()

{ waitHandler.WaitOne();

   x = 1;

for (int i = 1; i < 9; i++)

{ Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x);

x++;

Thread.Sleep(100);

   }

waitHandler.Set();

}

}

Во-первых, создаем переменную типа AutoResetEvent. Передавая вконструктор значение true, мы тем самым указываем, что создаваемый объект изначально будет в сигнальном состоянии.

Когда начинает работать поток, то первым делом срабатывает определенный в методе Count вызов waitHandler.WaitOne(). Метод WaitOne указывает, что текущий поток переводится в состояние ожидания, пока объект waitHandler не будет переведен в сигнальное состояние. И так все потоки у нас переводятся в состояние ожидания.

После завершения работы вызывается метод waitHandler.Set, который уведомляет все ожидающие потоки, что объект waitHandler снова находится в сигнальном состоянии, и один из потоков "захватывает" данный объект, переводит в несигнальное состояние и выполняет свой код. А остальные потоки снова ожидают.

Так как в конструкторе AutoResetEvent мы указываем, что объект изначально находится в сигнальном состоянии, то первый из очереди потоков захватывает данный объект и начинает выполнять свой код.

Но если бы мы написали AutoResetEvent waitHandler = new AutoResetEvent(false), тогда объект изначально был бы в несигнальном состоянии, а поскольку все потоки блокируются методом waitHandler.WaitOne() до ожидания сигнала, то у нас попросту случилась бы блокировка программы, и программа не выполняла бы никаких действий.

Если у нас в программе используются несколько объектов AutoResetEvent, то мы можем использовать для отслеживания состояния этих объектов методы WaitAll и WaitAny, которые в качестве параметра принимают массив объектов класса WaitHandle - базового класса для AutoResetEvent.

Так, мы тоже можем использовать WaitAll в вышеприведенном примере. Для этого надо строку waitHandler.WaitOne(); заменить на следующую:

AutoResetEvent.WaitAll(new WaitHandle[] {waitHandler});

 

ManualResetEvent

ManualResetEvent – это разновидность AutoResetEvent. Отличие состоит в том, что он не сбрасывается автоматически, после того как поток проходит через WaitOne, и действует как шлагбаум – Set открывает его, позволяя пройти любому количеству потоков, вызвавших WaitOne. Reset закрывает шлагбаум, потенциально накапливая очередь ожидающих следующего открытия.

ManualResetEvent может использоваться для сигнализации о завершении какой-либо операции или инициализации потока и готовности к выполнению работы.

Конструктор EventWaitHandle также может использоваться для создания объекта ManualResetEvent (если задать в качестве параметра EventResetMode.Manual):

EventWaitHandle wh = new EventWaitHandle(false, EventResetMode. Manual);

Кроме этого у класса ManualResetEvent есть конструктор.аналогичный AutoResetEvent:

public ManualResetEvent(bool initialState)

Мьютексы

Еще один инструмент управления синхронизацией потоков представляет класс Mutex, также находящийся в пространстве имен System.Threading. Данный класс является классом-оболочкой над соответствующим объектом ОС Windows "мьютекс".

У Mutex имеется несколько конструкторов:

1) public Mutex()

      создается мьютекс, которым первоначально никто не владеет

2) public Mutex(bool initiallyOwned)

исходным состоянием мьютекса завладевает вызывающий поток, если параметр initiallyOwned имеет логическое значение true. В противном случае мьютексом никто не владеет.

3) public Mutex(bool initiallyOwned, String Name)

Вторым параметром задается строка, являющаяся именем мьютекса (для синхронизации потоков различных процессов)

4) public Mutex(bool initiallyOwned, String Name, out bool createdNew)

Третий параметр - логическое значение, которое при возврате метода показывает, предоставлено ли вызывающему потоку изначальное владение мьютексом.

Перепишем пример из прошлой темы, используя мьютексы:

class Program

{ static Mutex mutexObj = new Mutex();

static int x=0;   

static void Main(string[] args)

{ for (int i = 0; i < 5; i++)

{ Thread myThread = new Thread(Count);

       myThread.Name = "Поток " + i.ToString();

myThread.Start();

   }

Console.ReadLine();

}

public static void Count()

{ mutexObj.WaitOne();

   x = 1;

for (int i = 1; i < 9; i++)

{ Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x);

x++;

Thread.Sleep(100);

   }

mutexObj.ReleaseMutex();

}

}

Сначаласоздаемобъектмьютекса: Mutex mutexObj = new Mutex().

Основную работу по синхронизации выполняют методы WaitOne()и ReleaseMutex(). Метод mutexObj.WaitOne() приостанавливает выполнение потока до тех пор, пока не будет получен мьютекс mutexObj.

После выполнения всех действий, когда мьютекс больше не нужен, поток освобождает его с помощью метода mutexObj.ReleaseMutex().

Таким образом, когда выполнение дойдет до вызова mutexObj.WaitOne(), поток будет ожидать, пока не освободится мьютекс. И после его получения продолжит выполнять свою работу.

Мы использовали мьютекс для синхронизации потоков. Однако замечательная черта мьютексов состоит также в том, что они могут также применяться не только внутри одного процесса, но и между процессами.

Пример синхронизации потоков двух различных процессов. Первый поток инкрементирует переменную и выводит ее на экран, второй поток декрементирует это же значение и также выводит на экран. Для доступа к значению переменной каждый из потоков считывает ее из файла и после изменения снова записывает в файл.

Первыйпроцесс:

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading;

using System.IO;

using System.Diagnostics;

 

namespace mutex

{

classProgram

{

staticMutex mutexObj = newMutex(false,"my_mutex");

staticint x;

staticvoid Main(string[] args)

   {

FileStream fs = newFileStream(@"C:\projectc#\x.dat", FileMode.Create);

BinaryWriter bw = newBinaryWriter(fs);

    bw.Write(x);

    bw.Close();

Process.Start(@"C:\ProjectC#\mutex_child\bin\Debug\mutex_child.exe");

Thread myThread = newThread(Count);

    myThread.Name = "Поток 1";

    myThread.Start();

Console.ReadLine();

   }

publicstaticvoid Count()

   {      

for (int i = 1; i < 90; i++)

       {mutexObj.WaitOne();

FileStream fs = newFileStream(@"C:\projectc#\x.dat", FileMode.Open);

BinaryReader br = newBinaryReader(fs);

       x= br.ReadInt32();

       br.Close();

Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x);

           x++;

Thread.Sleep(100);

FileStream fsw = newFileStream(@"C:\projectc#\x.dat", FileMode.Create);

BinaryWriter bw = newBinaryWriter(fsw);

       bw.Write(x);

       bw.Close();

       mutexObj.ReleaseMutex();

       }

 

   }

}

}

второйпроцесс:

using System;

using System.Collections.Generic;

using System.Linq;

using System.Text;

using System.Threading;

using System.IO;

using System.Diagnostics;

 

namespace mutex_child

{

classProgram

{

 

staticMutex mutexObj = newMutex(false,"my_mutex");

staticint x;

staticvoid Main(string[] args)

   {

Thread myThread = newThread(Count);

       myThread.Name = "Поток 2";

       myThread.Start();

Console.ReadLine();

   }

publicstaticvoid Count()

   {

for (int i = 1; i < 90; i++)

       {mutexObj.WaitOne();

FileStream fsr = newFileStream(@"C:\projectc#\x.dat", FileMode.Open);

BinaryReader br = newBinaryReader(fsr);

       x = br.ReadInt32();

       br.Close();

Console.WriteLine("{0}: {1}", Thread.CurrentThread.Name, x);

           x--;

 

Thread.Sleep(100);

FileStream fsw = newFileStream(@"C:\projectc#\x.dat", FileMode.Create);

BinaryWriter bw = newBinaryWriter(fsw);

       bw.Write(x);

       bw.Close();

mutexObj.ReleaseMutex();

       }    

   }

}

}

 

Семафоры (Semaphore)

Еще один инструмент, который предлагает нам платформа .NET для управления синхронизацией, представляют семафоры. Семафоры позволяют ограничить доступ определенным количеством объектов.

Семафор управляет доступом к общему ресурсу, используя для этой цели счетчик. Если значение счетчика больше нуля, то поток получает доступ к ресурсу, а счетчик семафора декрементируется. А если это значение равно нулю, то доступ к ресурсу запрещен и поток блокируется до тех пор, пока не получит разрешение.

Когда же потоку больше не требуется доступ к общему ресурсу, он высвобождает разрешение, а счетчик семафора инкрементируется. Если разрешения ожидает другой поток, то он получает его в этот момент. Количество одновременно разрешаемых доступов указывается при создании семафора. Так, если создать семафор, одновременно разрешающий только один доступ, то такой семафор будет действовать как мьютекс.

Семафоры особенно полезны в тех случаях, когда общий ресурс состоит из группы или пула ресурсов. Например, пул ресурсов может состоять из целого ряда сетевых соединений, каждое из которых служит для передачи данных. Поэтому потоку, которому требуется сетевое соединение, все равно, какое именно соединение он получит. В данном случае семафор обеспечивает удобный механизм управления доступом к сетевым соединениям.

Семафор реализуется в классе System.Threading.Semaphore, у которого имеется несколько конструкторов. Ниже приведена простейшая форма конструктора данного класса:

public Semaphore(int initialCount, int maximumCount)

где initialCount — это первоначальное значение для счетчика разрешений семафора, т.е. количество первоначально доступных разрешений; maximumCount — максимальное значение данного счетчика, т.е. максимальное количество разрешений, которые может дать семафор.

Если необходимо синхронизировать потоки в разных процессах, тогда в конструкторе семафора необходимо задать еще и имя:

public Semaphore(int initialCount, int maximumCount, String Name)

Семафор применяется таким же образом, как и описанный ранее мьютекс. В целях получения доступа к ресурсу в коде программы вызывается метод WaitOne() для семафора. Этот метод наследуется классом Semaphore от класса WaitHandle. Метод WaitOne() ожидает до тех пор, пока не будет получен семафор, для которого он вызывается. Таким образом, он блокирует выполнение вызывающего потока до тех пор, пока указанный семафор не предоставит разрешение на доступ к ресурсу.

Если коду больше не требуется владеть семафором, он освобождает его, вызывая методRelease(). Нижеприведеныдвеформыэтогометода:

public int Release()

public int Release(int releaseCount)

В первой форме метод Release() высвобождает только одно разрешение, а во второй форме — количество разрешений, определяемых параметром releaseCount. В обеих формах данный метод возвращает подсчитанное количество разрешений, существовавших до высвобождения.

В .NET 4 предлагается два класса с функциональностью семафора: Semaphore и SemaphoreSlim. Класс Semaphore может быть именован, использовать ресурсы в масштабе всей системы и обеспечивать синхронизацию между различными процессами. Класс SemaphoreSlimпредставляет собой облегченную версию класса Semaphore, которая оптимизирована для обеспечения более короткого времени ожидания.

Например, у нас такая задача: есть некоторое число читателей, которые приходят в библиотеку три раза в день и что-то там читают. И пусть у нас будет ограничение, что единовременно в библиотеке не может находиться больше трех читателей. Данную задачу очень легко решить с помощью семафоров:

using System;

using System.Threading;

namespace SemaphoreApp

{ class Program

{ static void Main(string[] args)

   {for (int i = 1; i < 6; i++)

           Reader reader = new Reader(i);

Console.ReadLine();

   }

}

 

class Reader

{ static Semaphore sem = new Semaphore(3, 3);   // создаемсемафор

   Thread myThread;

int count = 3;// счетчикчтения

public Reader(int i)

{ myThread = new Thread(Read);

       myThread.Name = "Читатель " + i.ToString();

myThread.Start();

   }

 

public void Read()

{while (count > 0)

   {sem.WaitOne();

Console.WriteLine("{0} вбиблиотеке", Thread.CurrentThread.Name);

Thread.Sleep(1000);

Console.WriteLine("{0} покидаетбиблиотеку", Thread.CurrentThread.Name);

sem.Release();

count--;

Thread.Sleep(1000);

}

   }

}

}

В данной программе читатель представлен классом Reader. Он инкапсулирует всю функциональность, связанную с потоками, через переменную Thread myThread.

Длясозданиясемафораиспользуетсякласс Semaphore: static Semaphore sem = new Semaphore(3, 3);. Его конструктор принимает два параметра: первый указывает, какому числу объектов изначально будет доступен семафор, а второй параметр указывает, какой максимальное число объектов будет использовать данный семафор. В данном случае у нас только три читателя могут одновременно находиться в библиотеке, поэтому максимальное число равно 3.

Основной функционал сосредоточен в методе Read, который и выполняется в потоке. В начале для ожидания получения семафора используется метод sem.WaitOne(). После того, как в семафоре освободится место, данный поток заполняет свободное место и начинает выполнять все дальнейшие действия. После окончания чтения мы освобождаем семафор с помощью метода sem.Release(). После этого в семафоре освобождается одно место, которое заполняет другой поток.

А в методе Main нам остается только создать читателей, которые запускают соответствующие потоки.

Волокна

Потоки в Windows реализуются на уровне ядра операционной системы, которое отлично осведомлено об их существовании и «коммутирует» их в соответствии с созданным Майкрософт алгоритмом. В то же время волокна реализованы на уровне кода пользовательского режима, ядро ничего не знает о них, и процессорное время распределяется между волокнами по алгоритму, определяемому нами. А раз так, то о вытеснении волокон говорить не приходится — по крайней мере, когда дело касается ядра.

В потоке может быть одно или несколько волокон. Для ядра поток — все то, что можно вытеснить и что выполняет код. Единовременно поток будет выполнять код лишь одного волокна — какого, решает сам программист. Приступая к работе с волокнами, прежде всего необходимо преобразовать существующий поток в волокно. Это делает функция ConvertThreadToFiber:

 PV0ID ConvertThreadToFiber(PVOID pvParam);

Она создает в памяти контекст волокна (размером около 200 байтов). В него входят следующие элементы:

■ определенное программистом значение; оно получает значение параметра pvParam, передаваемого в ConvertThreadToFiber;

■ заголовок цепочки структурной обработки исключения;

■ начальный и конечный адреса стека волокна; при преобразовании потока в волокно он служит и стеком потока;

■ регистры процессора, включая указатели стека и команд.

Создав и инициализировав контекст волокна, программист сопоставляет его адрес с потоком, преобразованным в волокно, и теперь оно выполняется в этом потоке. ConvertThreadToFiber возвращает адрес, по которому расположен контекст волокна. Этот адрес еще понадобится вам, но ни считывать, ни записывать по нему напрямую ни в коем случае нельзя — с содержимым этой структуры работают только функции, управляющие волокнами. При вызове ExitThread завершаются и волокно, и поток.  

Нет смысла преобразовывать поток в волокно, если вы не собираетесь создавать дополнительные волокна в том же потоке. Чтобы создать другое волокно, поток (выполняющий в данный момент волокно), вызывается функция CreateFiber:

PVOID CreateFiber(

DW0RD dwStackSize,

PFIBER_START_ROUTINE pfnStartAddress,

PVOID pvParam);

Сначала она пытается создать новый стек, размер которого задан в параметре dwStackSize. Обычно передают 0, и тогда максимальный размер стека ограничивается 1 Мб, а изначально ему передается две страницы памяти. Если вы укажете ненулевое значение, то для стека будет зарезервирован и передан именно такой объем памяти.

Функция CreateFiber создает и инициализирует новую структуру, представляющую контекст исполнения волокна. При этом пользовательское значение устанавливается по значению параметра pvParam, сохраняются начальный и конечный адреса памяти нового стека, а также адрес функции волокна (переданный в параметре pfnStartAddress).

Аргумент pfnStartAddress задает адрес функции волокна, которую программист реализовывает самостоятельно. Эта функция должна соответствовать следующему прототипу: 

VOID WINAPI FiberFunc(PVOID pvParam);

Когда  волокно получает процессорное время в первый раз, эта функция вызывается и получает значение pvParam, исходно переданное функции CreateFiber. В этой функции вы можете делать что угодно, но в прототипе тип возвращаемого ею значения определен как VOID — не потому, что это значение бессмысленно, а просто потому, что функция волокна не должна завершаться, пока существует волокно! Как только функция волокна завершится, поток и все созданные в нем волокна тут же будут уничтожены.

Подобно ConvertThreadToFiber, функция CreateFiber возвращает адрес контекста исполнения волокна. Но, в отличие от ConvertThreadToFiber, исполнение созданного функцией CreateFiber волокна не начинается, пока исполняется текущее волокно. Дело в том, что исполняться может только одно волокно потока одновременно. Чтобы запустить новое волокно, вызывается функция SwitchToFiber.

 VOID SwitchToFiber(PVOID pvFiberExecutionContext);

Эта функция принимает единственный параметр (pvFiberExecutionContext) — адрес контекста волокна, полученный в предшествующем вызове ConvertThreadToFiber или CreateFiber. По этому адресу она определяет, какому волокну предоставить процессорное время. SwitchToFiber осуществляет такие операции:

1. Сохраняет в контексте выполняемого в данный момент волокна ряд текущих регистров процессора, включая указатели команд и стека.

2. Загружает в регистры процессора значения, ранее сохраненные в контексте волокна, подлежащего выполнению. В их число входит указатель стека, и поэтому при переключении на другое волокно используется именно его стек.

3. Связывает контекст волокна с потоком, и тот выполняет указанное волокно.

4. Восстанавливает указатель команд. Поток (волокно) продолжает выполнение с того места, на каком волокно было прервано в последний раз.

Применение SwitchToFiber — единственный способ выделить волокну процессорное время. Поскольку ваш код должен явно вызывать эту функцию в нужные моменты, вы полностью управляете распределением процессорного времени для волокон. Помните: такой вид планирования не имеет ничего общего с планированием потоков. Поток, в рамках которого выполняются волокна, всегда может быть вытеснен операционной системой. Когда поток получает процессорное время, выполняется только выбранное волокно, и никакое другое не получит управление, нока вы сами не вызовете SwitchToFiber.

Для уничтожения волокна предназначена функция DeleteFiber:

 VOID DeleteFiber(PVOID pvFiberExecutionContext);

Она удаляет волокно, чей адрес контекста определяется параметром pvFiberExecutionContext, освобождает память, занятую стеком волокна, и уничтожает его контекст. Но, если передать адрес волокна, связанного в данный момент с потоком, DeleteFiber сама вызывает ExitThread — в результате поток и все созданные в нем волокна «погибают».

DeleteFiber обычно вызывается волокном, чтобы удалить другое волокно. Стек удаляемого волокна уничтожается, а его контекст освобождается.

И вот еще одна разница между волокнами и потоками: потоки, как правило, уничтожают себя сами, обращаясь к ExitThread. Использование с этой целью TerminateThread считается плохим тоном — ведь тогда система не уничтожает стек потока. Удаление волокна, занятого пересчетом, — операция вполне допустимая, стек волокна и его контекст корректно уничтожаются. Если использовать потоки, а не волокна, интерфейсный поток не смог бы корректно уничтожить поток, занятый пересчетом, - пришлось бы задействовать какой-нибудь механизм межпоточного взаимодействия и ждать, пока поток пересчета не завершится сам.

После удаления всех волокон также можно удалить их состояние из потока, исходного вызвавшего ConvertThreadToFiber, с помощью ConvertFiberToThread, — так удастся полностью освободить память, использованную для преобразования потока в волокно.

Каким образом работать с Win32 API функциями на платформе .NET с помощью языка С# ,было рассмотрено выше (стр.16). Поэтому здесь сразу приведен пример работы с волокнами с использованием описанных выше функций:

 

namespace fiber

{

 

classWork_fiber

{

[DllImport("kernel32.dll")]

externstaticIntPtr ConvertThreadToFiber(int fiberData);

 

[DllImport("kernel32.dll")]

externstaticIntPtr CreateFiber(int size, System.Delegate function, int handle);

 

[DllImport("kernel32.dll")]

externstaticIntPtr SwitchToFiber(IntPtr fiberAddress);

 

[DllImport("kernel32.dll")]

externstaticvoid DeleteFiber(IntPtr fiberAddress);

 

[DllImport("kernel32.dll")]

externstaticint GetLastError();

 

delegatevoidSetFuncFiber(int number);

System.IntPtr obj;

System.IntPtr retVal2;

 

void fiber_func1(int param)

{Console.WriteLine("работает волокно "+param);

// Console.ReadKey();

SwitchToFiber(retVal2);

}

void fiber_func2(int param)

{Console.WriteLine("работает волокно "+param);

//Console.ReadKey();

SwitchToFiber(obj);

}

public Work_fiber()

  {

Thread t1 = newThread(thread_func);

      t1.Start();

Console.WriteLine("create thread");

}

void thread_func()

   {

      obj = ConvertThreadToFiber(0);

SetFuncFiber fiber_delegat1 = newSetFuncFiber(fiber_func1);

SetFuncFiber fiber_delegat2 = newSetFuncFiber(fiber_func2);

      System.IntPtr retVal1 = CreateFiber(100, fiber_delegat1, 1);

if (GetLastError() != 0) thrownewException("Create Fiber1 failed!!");

elseConsole.WriteLine("create fiber1");

      retVal2 = CreateFiber(500, fiber_delegat2, 2);

if (GetLastError() != 0) thrownewException("Create Fiber failed!!");

elseConsole.WriteLine("create fiber2");

IntPtr fiber1return = SwitchToFiber(retVal1);

if (GetLastError() != 0) thrownewException("Run Fiber failed!!");

Console.WriteLine("основнойпоток");

Console.ReadKey();

      DeleteFiber(retVal1);

  }

}

classProgram

{

 

staticvoid Main(string[] args)

{

Work_fiber w=newWork_fiber();

Console.ReadLine();

}

}}


Дата добавления: 2018-04-05; просмотров: 1244; Мы поможем в написании вашей работы!

Поделиться с друзьями:






Мы поможем в написании ваших работ!