本篇同步發文於個人Blog: [讀書筆記] Threading in C# - PART 2: BASIC SYNCHRONIZATION
Simple blocking methods: 像是Sleep, Join, Task.Wait等
Locking constructs: 限制數個Thread做後續操作, Excluseive locking只有一個thread, 比如lock(Monitor.Enter, Monitor.Exit), Mutex 和 SpinLock. 而Nonexclusive locking像是Semaphore, SemaphoreSlim 和 reader/writer locks
Signaling constructs: Thread可以暫停, 直到收到通知才恢復, 這可避免沒效率的輪詢. 比如用 event wait handler, Monitor的Wait/Pulse, CountdownEvent和Barrier
Nonblocking synchronization constructs: Thread.MemoryBarrier, Thread.VolatileRead, Thread.VolatileWrite, volatile關鍵字 和 Interlocked類別
等待、Sleep等會讓thread暫停, 把time slice還給CPU.
可用ThreadState檢查是否Blocked
bool blocked = (someThread.ThreadState & ThreadState.WaitSleepJoin) != 0
當Thread發生block 或 unblock, 都會造成 Context Switch
unblock的觸發條件:
blocking條件已滿足
operation timing out(有指定timeout的時候)
被interrupt
被aborted
用loop一直查詢某條件, 造成CPU消耗很大的運算資源
一種比較好一點的寫法是, 在loop內加個Thread.Sleep
如果要用Spinning的寫法, 一種是保證該條件很快就滿足的運算, 另外一種是用SpinLock/SpinWait
區分 thread-safe和thread-unsafe的程式碼, 通常是有static變數, 如果有多個thread存取時會不會錯誤
最簡單的方式是用lock關鍵字做同步化, 綁住某個同步化物件, 只允許一個thread操作, 其他的thread變成blocked狀態, 且依照queue的順序來排隊
Construct | Purpose | Cross-Process ? | Overhead |
---|---|---|---|
lock (Monitor.Enter / Monitor.Exit) | 確保只有一個Thread能存取資源或一段code | 20 ns | |
Mutext | 同lock | Yes | 1000 ns |
SemaphoreSlim | 確保指定數量的thread能存取資源或一段code | 200 ns | |
Semaphore | 同Semaphore | Yes | 1000 ns |
ReaderWriterLockSlim | 允許多個reader能與一個writer共存 | 40 ns | |
ReaderWriterLock | 同ReaderWriterLockSlim | 100 ns |
Monitor.Enter(_locker);
try
{
DoSomething();
}
finally
{
Monitor.Exit(_locker);
}
上面寫法會有bug, 如果在Enter和try之間發生exception (比如thread被Abort或記憶體溢出), 則永遠不釋放該locker
更好的寫法是在try內用Enter, 且代入bool的變數, 用來判斷是否lock成功, 成功的話可以呼叫Exit
bool lockTaken = false;
try
{
Monitor.Enter(_locker, ref lockTaken)
}
finally
{
if(lockTaken)
{
Monitor.Exit(_locker);
}
}
必須是reference type的物件
一般是private的物件, 做邏輯封裝
精準的lock會用專門的locker物件
lock(this)或lock(typeof(SomeClass)), 很難預防死結和過多的blocking
基本的規則是, lock在存取可寫的共享物件
Thread safe與unsafe的寫法
class ThreadUnsafe
{
static int _x;
static void Increment() { ++_x; }
static void Assign() { _x = 123; }
}
class ThreadSafe
{
static readonly object _locker = new object();
static int _x;
static void Increment() { lock(_locker) { ++_x; }}
static void Assign() { lock(_locker) { _x = 123; }}
}
有一組變數, 寫跟讀總是在同一個lock, 則稱它們是Atomic
比如下面x與y的除法範例
lock (_locker)
{
if(x!=0)
y /= x;
}
有時會有破壞atomicity的bug, 比如有呼叫其他函式造成exception, 使某些變數沒完整計算完
建議其他函式先運算完再把它的值帶入到lock, 或者try的catch/finally做rollback
lock可以巢狀包裝
適用於lock的內容, 有call其他的函式, 這些函式實作再加上lock
當兩個thread都掌握對方的資源且等待對方釋放, 沒任何進展就是死結
基本的死結案例:
using System;
using System.Threading;
class TestDeadlocks
{
static void Main(string[] args){
object locker1 = new object();
object locker2 = new object();
new Thread(() => {
lock(locker1){
Thread.Sleep(1000);
Console.WriteLine("Ready to lock 2");
lock(locker2);
}
}).Start();
lock(locker2){
Thread.Sleep(1000);
Console.WriteLine("Ready to lock 1");
lock(locker1);
}
Console.WriteLine("Hi");
Console.Read();
}
}
更複雜的死結是,Thread 1 lock 而呼叫A class的X方法, X方法呼叫B class的Y 方法, 另外Thread 2 lock而呼叫B class的Y方法, Y方法呼叫A class的X方法.
考慮lock是否要用在別的class的函式
之後的declarative, data parallelism, immutable types 和 nonblock synchronization能減少lock的需求
另一些常見的死結發生在WPF的Dispatcher.Invoke或Winform的Control.Invoke, 解法是用BeginInvoke
基本上lock的速度很快
如果有很短暫的lock, 可以改用SpinLock, 減少Context Switch
Lock得太久, 會減少共時性的效能; Lock也是造成Deadlock的風險
跨Process的lock, 大約比lock慢50倍
使用WaitOne做lock, ReleaseMutex unlock, 而用close或dispose也是release
Mutex認出同樣的lock是用Name
如果是執行在Terminal Services環境, 一般的Mutex無法跨terminal server session, 要在Name加上Global\ 前綴字
using System;
using System.Threading;
class OneAtATimePlease
{
static void Main(string[] args){
using(var mutex = new Mutex(false, "test oreilly"))
{
if(!mutex.WaitOne(TimeSpan.FromSeconds(3), false)){
Console.WriteLine(" another is running");
Console.Read();
return;
}
RunProgram();
}
}
static void RunProgram()
{
Console.WriteLine("To exit");
Console.Read();
}
}
Semaphore允許多個Thread在同一區段執行, 超過此容量的thread會block等待
把Semaphore的容量設為1, 就和lock與mutex一樣, 但Semaphore的Release是任何thread都能呼叫
SemaphoreSlim有更低延遲, 且能帶cancellation token, 用在parallel programming
如果Semaphore有給名字, 也是能跨Process
下面範例是最多3個Thread進入
using System;
using System.Threading;
class SemaphoreClub
{
static SemaphoreSlim _sem = new SemaphoreSlim(3);
static void Main(string[] args){
for(int i = 0 ; i < 5; ++i){
new Thread(Enter).Start(i);
}
Console.Read();
}
static void Enter(object id)
{
Console.WriteLine(id + " wants to enter");
_sem.Wait();
Console.WriteLine(id + " is in!");
Thread.Sleep(500 * (int) id);
Console.WriteLine(id + " is leaving");
_sem.Release();
}
}
開發時要維護該type所有Thread-safe欄位
Thread-safety有效能上的花費, 即使沒有多執行緒也必要花費
使用Thread-safe的type不一定能讓執行程式thread-safe
基本的用法是用exclusive lock去鎖定特定的程式碼而達到thread-safe
另外是減少共享資料, 達到無狀態的功能, 比如ASP.NET Web的Request, 大都是獨自處理
最後是用automatic locking regime的方式, 對class或property加上ContextBoundObject和Synchronization屬性, 就能自動有鎖的功能. 但缺點是會產生另一種方式的死結、併發性差、意外重入等問題. 盡量用exlusive lock.
Enumeration是thread-unsafe的行為, 所以共同資料要enumeration時, 先宣告一個local變數, 再用lock的方式copy (ToList, ToArray等)到local變數
Enumeration的另一種解法是reader/writer lock
class ThreadSafe
{
static List<string> _list = new List<string>();
static void Main()
{
new Thread(AddItem).Start();
new Thread(AddItem).Start();
}
static void AddItem()
{
lock (_list) _list.Add("Item " + _list.Count());
string[] items;
lock (_list) items = _list.ToArray();
foreach(string s in items) Console.WriteLine(s);
}
}
Locking around thread-safe objects
Static members
.net framework設計static member是thread-safe, 而實例的member不是. 比如取DateTime.Now, 就不需要去用lock來取
static function不是thread-safe, 要確認功能對資料的共享性
Read-only thread safety
能在文件註明該collection是只讀訪問的thread-safe, 並要求使用者在只讀的方法做寫入
實作ToArray等, 本身會有thread-unsafe的issue
如果文件缺少說明, 要注意是否某些方法是read-only. 比如Random.Next(), 內部實作有更新private seed, 因此要用lock取值或者分開的Random物件
在WPF或Winform, UI的元件有Affinity特性, 代表哪個thread建立元件, 那元件只能被那thread存取.
所以別的thread需要marshal原本thread來控制元件, 比如Winform的Invoke或BeginInvoke, WPF的Invoke或BeginInvoke
Invoke是同步方法, 會block目前thread; BeginInvoke是非同步方法, 立即回傳caller而marshal的request會進到queue(和keyboard, mouse的事件使用同樣的message queue)
Worker threads versus UI threads
Rich client有兩大thread: UI Thread和Worker Thread
UI Thread專門建立UI元件, Worker thread一般用來執行long-running job
Rich client都會有一個UI Thread且是Main thread, 再由它生成work thread, 可直接生成或者用BackgroundWorker
Single Document Interface (SDI), 像是Word, 會有多個UI Thread
物件能封裝成裡面的狀態不能被內部與外部改變, 稱為immutable object. 決定它內部值是在Constructor且值是Read-only. 可減少lock的執行時間.
下面範例是建立一個immutable object, 只有assign新物件才會需要lock, 取值不需要
class ProgressStatus
{
public readonly int PercentComplete;
public readonly string StatusMessage;
public ProgressStatus(int percentComplete, string statusMessage)
{
PercentComplete = percentComplete;
StatusMessage = statusMessage;
}
}
class Program
{
readonly object _statusLocker = new object();
ProgressStatus _status;
void SomeFunction()
{
_status = new ProgressStatus(50, "Working on it");
ProgressStatus statusCopy;
lock(_statusLocker) statusCopy = _status;
int pc = statusCopy.PercentComplete;
string msg = statusCopy.StatusMessage;
}
}
在int pc = ... 的最後2行, 有隱含用Memory barrier包裝
後續不使用lock, 還會有顯示Memory barrier, Interlocked.CompareExchange, spin-waits等功能可用
Signaling是指thread會一直等待, 直到收到從別的Thread發的通知
和一般C#的event不相關
3種類型: AutoResetEvent, ManualResetEvent, CountdownEvent
Construct | Purpose | Cross-Process ? | Overhead |
---|---|---|---|
AutoResetEvent | 允許一個thread當收到singal時,執行一次unblock | Yes | 1000 ns |
ManualResetEvent | 允許一個thread當收到singal時,執行無限期的unblock (直到它重置) | Yes | 1000 ns |
ManualResetEventSlim (Net Framework 4) | 同ManualResetEvent | 40 ns | |
CountdownEvent (Net Framework 4) | 允許一個thread當收到預定數量的singal時,執行unblock | 40 ns | |
Barrier (Net Framework 4) | 實作Thread執行屏障 | 80 ns | |
Wait and Pulse | 允許一個thread block直到某條件達成 | 120 ns for a Pulse |
它像是一個票閘, 插入一張票只讓一個人過
Thread 在門閘時呼叫WaitOne來wait/block, 而呼叫Set插入票
如果有多個thread在門閘呼叫WaitOne, 變成queue排隊
Ticket可以來自任何thread, 代表任何unblock的thread可存取該AutoResetEvent物件並呼叫Set
在constructor代入true的話, 代表直接呼叫Set
用EventWaitHandle可達到相同的功能 (EventWaitHandle是AutoResetEvent的父類別)
var auto = new AutoResetEvent (false);
// 等同寫法
var auto2 = new EventWaitHandle(false, EventResetMode.AutoReset);
using System;
using System.Threading;
class TestAutoResetEvent
{
static EventWaitHandle _waitHandle = new AutoResetEvent(false);
static void Main(string[] args){
new Thread(() => {
Console.WriteLine("Wait...");
_waitHandle.WaitOne();
Console.WriteLine("awake");
}).Start();
Thread.Sleep(1000);
_waitHandle.Set();
Console.Read();
}
}
Producer/consumer queue
一個queue用來放進要執行的任務, 而其他thread在背景從這queue挑任務來做
用這種queue能有效管理要執行的thread數量, 比如IO密集型任務可只安排一個thread, 其他需要10個
CLR的Thread pool也是一種Producer/consumer queue
queue插入的資料會有對應的任務, 比如填入檔案名稱, 而對應的任務是加密該檔案
以下用AutoResetEvent實作範例
using System;
using System.Collections.Generic;
using System.Threading;
namespace ProducerConsumerTest
{
class Program
{
static void Main(string[] args)
{
using (ProducerConsumerQueue q = new ProducerConsumerQueue())
{
q.EnqueueTask("Hello");
for(int i = 0; i < 20; ++i)
{
q.EnqueueTask("Say " + i);
}
q.EnqueueTask("Good bye");
}
}
}
public class ProducerConsumerQueue : IDisposable
{
EventWaitHandle _wh = new AutoResetEvent(false);
Thread _worker;
readonly object _locker = new object();
Queue<string> _tasks = new Queue<string>();
public ProducerConsumerQueue()
{
_worker = new Thread(Work);
_worker.Start();
}
public void EnqueueTask(string task)
{
lock (_locker)
{
_tasks.Enqueue(task);
}
_wh.Set();
}
public void Dispose()
{
EnqueueTask(null); // signal the consumer to exit
_worker.Join(); // wait for the consumer's thread to finish
_wh.Close(); // release any OS Resources
}
private void Work()
{
while (true)
{
string task = null;
lock(_locker)
{
if(_tasks.Count > 0)
{
task = _tasks.Dequeue();
if(task == null)
{
return;
}
}
}
if(task != null)
{
Console.WriteLine("Performing task : " + task);
Thread.Sleep(1000); // simulate work...
}
else
{
_wh.WaitOne(); // no more tasks , wait for a signal
}
}
}
}
}
用lock去鎖定queue, 達到thread-safe
在enqueue之後, 呼叫Set, 通知在while(true)有wait的thread可以往下做
如果caller插入null的資料, 直接結束
queue如果是空的, 會呼叫WaitOne等待signal
在Dispose的實作, 呼叫Enqueue(null), 讓Work方法讀到null而return結束, 否則Thread的Join永遠不結束; 對EventWaitHandle呼叫Close, 可以釋放內部有用到的資源
.Net Framework 4 有BlockingCollection, 實作Producer/Consumer queue
上述用AutoResetEvent的Producer/Consumer queue是個好的範例, 未來加上cancellation或bounded queue, 都可以此為起點
和AutoResetEvent相比, ManualResetEvent是一般的閘門, 呼叫Set時, 讓所有等待(有呼叫過WaitOne)的Thread全都能進入
呼叫Reset能把閘門關上
呼叫WaitOne就會Block
等同的寫法
var manual1 = new ManualResetEvent(false);
var manual2 = new EventWaitHandle(false, EventResetModel.ManualReset);
另一個是ManualResetEventSlim能執行更快且支援CancellationToken, 但不能跨Process
ManualResetEvent是讓一個Thread允許多個Thread unblock, CountdownEvent則相反
用CountdownEvent可以等多個Thread執行後再往後執行
在.NET Framework 4之前, 可以用Wait and Pulse來實作CountdownEvent
建構CountdownEvent指定要的數量, 呼叫Wait則block該thread, 而呼叫Signal會降低count, 直到count為0, 該thread將unblock
以下範例是等待3個Thread執行後, 才繼續執行
using System;
using System.Threading;
public class Program
{
static CountdownEvent _countDown = new CountdownEvent(3);
public static void Main()
{
new Thread(SaySomething).Start("Thread 1");
new Thread(SaySomething).Start("Thread 2");
new Thread(SaySomething).Start("Thread 3");
_countDown.Wait();
Console.WriteLine("All threads have finished");
}
static void SaySomething(object msg)
{
Thread.Sleep(3000);
Console.WriteLine(msg);
_countDown.Signal();
}
}
Count可以用AddCount來加更多需等待的數量, 但如果已經達到count = 0而又呼叫AddCount, 將拋出exception
建議可用TryAddCount, 回傳false代表count已經是0
呼叫Reset將Count回到初始值
EventWaitHandle可以指定名字, 讓多個Process根據同一個名字而共同參考
基本用法:
EventWaitHandle wh = new EventWaitHandle(false, EventResetMode.AutoReset, "MyCompany.MyApp.Name");
using System;
using System.Threading;
namespace TestWaitHandleThreadPool
{
class Program
{
static ManualResetEvent _starter = new ManualResetEvent(false);
static void Main(string[] args)
{
RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject(_starter, Go, "Some Data", -1, true);
Thread.Sleep(5000);
_starter.Set();
Console.ReadLine();
reg.Unregister(_starter);
}
static void Go(object data, bool timeOut)
{
Console.WriteLine("Start work : " + data);
}
}
}
參數-1代表不用timeout, 如果有timeout的話, 會檢測傳送的物件(範例是Some Data字串)的狀態; 參數true代表該Thread pool收到signal後, 不再重設要Wait.
假如原本用WaiOne的方式處理, Server收到100個任務就得new 100個Thread, 變成綁定太多且大量Block. 改寫的方法如下, 讓後續的委託工作都給‘hread Pool處理
void AppServerMethod()
{
_wh.WaitOne();
// ... continue execution
}
// 變成
void AppServerMethod()
{
RegisteredWaitHandle reg = ThreadPool.RegisterWaitForSingleObject(_starter, Resume, null, -1, true);
// ...
}
static void Resume(object data, bool timeOut)
{
// ... continue execution
}
WaitHandle提供static method, 包含WaitNay, WaitAll, SignalAndWait, 可以對有繼承WaitHandle的物件使用較複雜的Signal/Wait的功能
WaitAny: 等待任一個Thread收到Signal
WaitAll: 等待所有Thread都收到Signal
SignalAndWait: 對第一個參數的thread發出signal, 對第二個參數的thread做等待
Alternatives to WaitAll and SignalAndWait
WaitAll和SignalAndWait不能在單一執行緒的環境執行.
SignalAndWait的替代方案是Barrier類別, 而WaitAll的替代方案是Parallel class的Invoke方法
繼承ContextBoundObject且加上Synchronization屬性, CLR在這物件會自動使用lock
如下範例, 每個Demo函式會排隊執行
using System;
using System.Runtime.Remoting.Contexts;
using System.Threading;
namespace TestAutoLock
{
class Program
{
static void Main(string[] args)
{
AutoLock safeInstance = new AutoLock();
new Thread(safeInstance.Demo).Start();
new Thread(safeInstance.Demo).Start();
safeInstance.Demo();
}
}
[Synchronization]
public class AutoLock : ContextBoundObject
{
public void Demo()
{
Console.Write("Thread id : " + Thread.CurrentThread.ManagedThreadId);
Console.Write(" Start.....");
Thread.Sleep(1000);
Console.WriteLine("End");
}
}
}
自動lock不包含static的成員和沒有繼承ContextBoundObject的物件(比如Form)
想像是CLR將原始Class套上一層ContextBoundObject Proxy, 能呼叫原始Class的成員, 再為它的方法都加上同步化的功能
如果前面的AutoLock是個Collection, 則使用它的物件也必須是ContextBoundObject, 否則存取它的item需要手動加上lock
Synchronization Context預設會延伸從同一層Scope的Context, 也就是lock包含的深度一直向下
在Synchronization的attribute可以改變預設的行為, 有這些選項:
NOT_SUPPORTED: 就跟沒加上Synchronization的屬性一樣
SUPPORTED: 如果來自別的synchronized 物件做初始化, 則延伸它的context, 否則保持unsynchronized
REQUIRED (預設): 如果來自別的synchronized 物件做初始化, 則延伸它的context, 否則建立新的Context
REQUIRES_NEW: 總是建立新增Synchronization context
using System;
using System.Runtime.Remoting.Contexts;
using System.Threading;
namespace TestAutoLockDeadlock
{
[Synchronization]
public class Deadlock : ContextBoundObject
{
public Deadlock Other;
public void Demo()
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000);
Console.WriteLine("Call other");
Other.Hello();
}
void Hello()
{
Console.WriteLine("hello");
}
}
class Program
{
private static void Main(string[] args)
{
Deadlock dead1 = new Deadlock();
Deadlock dead2 = new Deadlock();
dead1.Other = dead2;
dead2.Other = dead1;
new Thread(dead1.Demo).Start();
dead2.Demo();
Console.Read();
}
}
}
兩個Deadlock物件都是在Program建立, Program本身是unsynchronized, 所以Deadlock物件建立各自的Synchronization Context, 也有各自的lock
呼叫對方的Hello方法後, 即發生Deadlock
Reentrant的定義是, 如果有段程式碼被中斷, 執行緒去執行別的程式, 之後再回來執行這段程式而沒造成影響
通常Thread-safe和reentrant視為同等
如果[Synchronization(true)]這樣使用, 代表需要reentry, 當執行離開此程式碼時, 會把lock釋放, 可避免deadlock. 副作用是在這釋放期間, 任何thread可以進入該物件的context(比如呼叫它的方法)
[Synchronization(true)]是類別層級, 所以在非該context的呼叫都會當class層面的木馬(?)
如果沒有reentrancy, 則在一些場合比較難工作, 比如在一個synchronized class實作多執行緒, 將邏輯委託給其他worker thread, 則worker thread彼此間要溝通沒有reentrancy的話, 將會受阻.
同步自動鎖造成deadlock, reentrancy, 刪除併發等問題, 在一些應用場合沒有手動lock來的好用