Windows Azure Platform 的 Queue Storage 是最簡單的儲存服務,它也是使用範圍最明確的服務-先進先出與擬機器間通訊。
學過資料結構的人一定都聽過Stack和Queue吧,Stack是後進先出(LIFO),而Queue則是先進先出(FIFO)的資料結構,商用系統的實務開發上,Queue的應用範圍比Stack要大的多了,因為在實務上會用到先進先出的案例太多了,舉凡線上訂位(購買)、抽號碼牌、選位等等商用的需求都會要求先進先出的條件,故Queue的應用範圍會比Stack大的多,微軟當然也很清楚這一點,所以在Storage中也實作了一個專門處理Queue的服務,即為Queue Storage。
Queue Storage和BLOB、Table一樣,都有容器的概念,Queue的容器被稱為通道(tunnel),每個通道內的訊息都是先進先出,一支應用程式可以同時生成多個通道,並且在通道內放不同的訊息,通道的另一端可以是程式,也可以是在不同虛擬機器內的不同角色,像是Web Role傳遞訊息給後面負責處理的Worker Role,就能運用Queue Storage來實作,所以Queue Storage也會用在不同角色或不同虛擬機器間程式的通訊。
佇列訊息(Queue Message)則是在通道內流通的資料,為佇列通訊的主體,它可以裝載可二進位序列化(Binary Serializable)的物件資料,這代表在佇列中可用的資料類型非常彈性,我們可以直接將物件裝到佇列中,只要兩端應用程式都擁有相同的型別資訊的話,就可以很容易的將物件還原回原始的物件型別。但佇列內的資料最長為8KB,所以不能將完整的資料放到佇列中,但能配合BLOB或Table來存放資料,再以佇列訊息搭載參數與資料所在的URL給處理程式即可。
Queue通道提供了不同的方法,以支援佇列通道內的處理訊息插入與擷取的各項作業,Queue通道支援兩種不同的訊息擷取方法:
每一個佇列訊息都有一個有效期間,預設為7天,只要程式沒有下指令將訊息明確自通道中刪除,則訊息會一直存在,直到有效期間到期為止(GetMessage()會讓訊息狀態設為不可見,但並不會刪除訊息),這樣的作法是基於佇列的強固性(Reliability),讓佇列另一端的處理程式一定可以得到訊息。但副作用就是毒訊息(Poison Message),當處理程式有多個個體時,當其中一個個體取回訊息但在處理時當機了,其他的個體會在訊息恢復可見(visible)時再次由佇列取得,如果沒有做檢查的話,很容易讓訊息被重覆處理,所以應用程式必須要做檢查,確認訊息沒有被處理,否則就會被毒訊息的副作用影響。
Queue的程式處理非常簡單,服務的入口為CloudQueueClient,首先要先建立訊息通道,使用CloudQueueClient.GetQueueReference()得到CloudQueue的參考後,再用CloudQueue.CreateIfNotExist()建立通道:
public override bool OnStart()
{
...
this._storageAccount = CloudStorageAccount.FromConfigurationSetting("QueueDataSource");
this._queueClient = this._storageAccount.CreateCloudQueueClient();
this._queue = this._queueClient.GetQueueReference("cmdqueue");
this._queue.CreateIfNotExist();
this._queueOutput = this._queueClient.GetQueueReference("resultqueue");
this._queueOutput.CreateIfNotExist();
return base.OnStart();
}
然後,使用CloudQueueMessage物件生成佇列訊息後,用AddMessage()將訊息推入通道內,而處理程式則可用GetMessage()取回訊息,處理完後呼叫DeleteMessage()刪除它,以避免被重覆處理:
protected void cmdCalc_Click(object sender, EventArgs e)
{
CloudStorageAccount storageAccount =
CloudStorageAccount.FromConfigurationSetting("QueueDataSource");
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("cmdqueue");
CloudQueueMessage queueMessage =
new CloudQueueMessage(this.txtPrimeFrom.Text + ";" + this.txtPrimeTo.Text);
queue.AddMessage(queueMessage);
storageAccount = null;
queueClient = null;
queue = null;
queueMessage = null;
Response.Redirect("~/default.aspx");
}
protected void Page_Load(object sender, EventArgs e)
{
DataTable queueResults = null;
if (Session["lastQueueResult"] == null)
{
queueResults = new DataTable();
queueResults.Columns.Add("ReturnTime", typeof(string));
queueResults.Columns.Add("Range", typeof(string));
queueResults.Columns.Add("Result", typeof(string));
Session["lastQueueResult"] = queueResults;
}
CloudStorageAccount storageAccount =
CloudStorageAccount.FromConfigurationSetting("QueueDataSource");
CloudQueueClient queueClient = storageAccount.CreateCloudQueueClient();
CloudQueue queue = queueClient.GetQueueReference("resultqueue");
queue.RetrieveApproximateMessageCount();
if (queue.ApproximateMessageCount > 0)
{
queueResults = Session["lastQueueResult"] as DataTable;
IEnumerable<CloudQueueMessage> queueResultMessages = queue.GetMessages(32);
foreach (CloudQueueMessage queueResultMessage in queueResultMessages)
{
queueResults.Rows.Add(new object[] {
DateTime.Now.ToString(),
queueResultMessage.AsString.Split('$')[0],
queueResultMessage.AsString.Split('$')[1]
});
queue.DeleteMessage(queueResultMessage);
}
Session["lastQueueResult"] = queueResults;
}
this.gvPrimeQueueResult.DataSource = (Session["lastQueueResult"] as DataTable);
this.gvPrimeQueueResult.DataBind();
}
與BLOB和Table Storage一樣,Queue Storage也有一些使用限制: