之前在JavaTWO聆聽大師們介紹Java的Lambda支援,尤其用它來寫Map Reduce平行化處理,看起來很有趣,而且看起來用Javascript也可以做得出來,所以就想寫一個簡單的範例來做驗證...
如果使用node.js,它有ChildProcess, Cluster, thread_a_gogo模組等機制可以支援平行化的處理。如果想在瀏覽器實作的話,就得靠HTML5的Dedicated Worker了。
希望的使用方式是像這樣:
F(資料, 處理方法).map(分割資料丟給Worker的函數).reduce(彙總資料的函數).run(接收結果的callback);
根據之前在node.js使用ChildProcess以及Cluster的經驗,做這樣的處理,最好把所有過程設計成用事件來觸發,不做任何假設,這樣比較不會有問題。所以設計了幾個host跟Worker溝通的方式:
接收「處理方法」有一個困難點,就是postMessage只能傳遞立即值或Object,不能傳遞Function,因為他內部會用JSON.stringify跟JSON.parse來處理資料,避免傳參考造成資料同步/競爭的問題。所以只好傳Function.toString()過去,然後在Worker端利用簡單的parse把它還原。
先來看一下Worker的程式:
var executor = function(){};
onmessage = function(m) {
switch(m.data.cmd) {
case 'executor':
var v = m.data.payload.match(/\((.+)\)/g);
var param = v[0].substr(1, v[0].length-2);
var b = m.data.payload.replace(/\n|\r/g, '').match(/\{.+\}/g);
var body = b[0].substr(1, b[0].length-2);
executor = new Function(param, body);
break;
case 'response':
postMessage({cmd: 'result', payload: executor(m.data.payload)});
postMessage({cmd: 'request'});
break;
case 'run':
postMessage({cmd: 'request'});
break;
}
}
再來看一下host端:
<style>
div {
border: solid 1px #6699CC;
background: #99CCFF;
}
</style>
<script>
function F(arr, f) {
return new (function() {
var workers = new Array(4);
var parallel = true;
var requestHandler,resultHandler,result,callback;
var returned = false;
for(var i=0; i<4; i++) {
workers[i] = new Worker('test813.js');
workers[i].onmessage = function(m) {
switch(m.data.cmd) {
case 'request':
requestHandler.call(this);
break;
case 'result':
resultHandler.call(this, m.data.payload);
break;
}
};
workers[i].postMessage({cmd:'executor',payload:f.toString()});
}
this.map = function(f) {
requestHandler = function() {
var payload = f.call(arr);
if(payload) {
this.postMessage({cmd:'response',payload:payload})
} else {
if(!returned) {
returned = true;
callback(result);
}
this.terminate();
}
};
return this;
};
this.reduce = function(f) {
resultHandler = function(curr) {
result = result? result:{};
result = f(result, curr);
};
return this;
};
this.run = function(f) {
callback = f;
for(var i=0; i<workers.length; i++) {
workers[i].postMessage({cmd: 'run'});
}
};
})();
}
</script>
<div id="origin"></div>
<button id='run'>go</button>
<div id="result"></div>
<script>
var arr = [
'abdegiawhfheeufjosdifiawhfhe',
'iawhfheijsdowhfheeufjosifkvnawefiuu',
'difajifviawhfheaiowhfheewhfheeufjosufjos',
'ifiwsvwhfheeufjosiiawhfheejfijsdifj',
'isdiidifjfjjsidwhfheeufjiawhfheosjf'
];
document.getElementById('origin').innerHTML = '<table border="1" cellpadding="5" cellspacing="0"><tr><td>'+arr.join('</td></tr><tr><td>')+'</td></tr></table>';
var parallelreducer = F(arr, function(str) {
ret = {};
if(str) {
for(var i=0; i<str.length; i++) {
if(ret[str[i]]) {
ret[str[i]]++;
} else {
ret[str[i]] = 1;
}
}
}
return ret;
}).map(function() {
return this.shift();
}).reduce(function(prev, curr) {
for(var i in curr) {
if(typeof prev[i] !== 'undefined') {
prev[i] += curr[i];
} else {
prev[i] = curr[i];
}
}
return prev;
});
document.getElementById('run').onclick = function() {
parallelreducer.run(function(result) {
var target = document.getElementById('result');
var str = '<table border="1" cellpadding="5" cellspacing="0">';
for(var i in result) {
str += '<tr><td>'+i+'</td><td>'+result[i]+'</td></tr>';
}
str += '</table>';
target.innerHTML = str;
});
};
</script>
arr是要處理的資料,也同時顯示在畫面中。按下go按鈕,則會出現處理結果,這裡做的是資料中使用字母的統計。
這裡展示的是先呼叫F(陣列,處理方法).map(提供資料的方法).reduce(彙總資料的方法),然後到go按鈕的onclick事件再實際呼叫.run()來執行,並且顯示結果。目前預設會開四個Worker來跑,有些瀏覽器會限制每個頁面可以開啟的Worker數目,如果要修改Worker數目的話,應該是有上限的。
另外,我沒有clone要處理的資料...所以隨著處理過程會......
在Chrome跑起來大概像這樣:
按下go按鈕之後:
目前測試過在最新的Chrome跟FF可以執行無誤。不過這還只是概念驗證而已,應該很多細節還沒考慮清楚