iT邦幫忙

9

利用Web Worker做平行化處理,以Map Reduce為例

之前在JavaTWO聆聽大師們介紹Java的Lambda支援,尤其用它來寫Map Reduce平行化處理,看起來很有趣,而且看起來用Javascript也可以做得出來,所以就想寫一個簡單的範例來做驗證...
如果使用node.js,它有ChildProcess, Cluster, thread_a_gogo模組等機制可以支援平行化的處理。如果想在瀏覽器實作的話,就得靠HTML5的Dedicated Worker了。

希望的使用方式是像這樣:

F(資料, 處理方法).map(分割資料丟給Worker的函數).reduce(彙總資料的函數).run(接收結果的callback);

根據之前在node.js使用ChildProcess以及Cluster的經驗,做這樣的處理,最好把所有過程設計成用事件來觸發,不做任何假設,這樣比較不會有問題。所以設計了幾個host跟Worker溝通的方式:

  1. executor:接收「處理方法」
  2. request:Worker告訴host,它有空處理資料。host收到後,會視情況response給Worker資料,或是關閉Worker(處理完畢)。
  3. response:Worker接收要處理的資料,處理完畢後,將結果傳回host。傳回host表示Worker有空處理資料了,所以會再發出一個request

接收「處理方法」有一個困難點,就是postMessage只能傳遞立即值或Object,不能傳遞Function,因為他內部會用JSON.stringify跟JSON.parse來處理資料,避免傳參考造成資料同步/競爭的問題。所以只好傳Function.toString()過去,然後在Worker端利用簡單的parse把它還原。

先來看一下Worker的程式:

var executor = function(){};
onmessage = function(m) {
  switch(m.data.cmd) {
    case 'executor':
      var v = m.data.payload.match(/\((.+)\)/g);
      var param = v[0].substr(1, v[0].length-2);
      var b = m.data.payload.replace(/\n|\r/g, '').match(/\{.+\}/g);
      var body = b[0].substr(1, b[0].length-2);
      executor = new Function(param, body);
      break;
    case 'response':
      postMessage({cmd: 'result', payload: executor(m.data.payload)});
      postMessage({cmd: 'request'});
      break;
    case 'run':
      postMessage({cmd: 'request'});
      break;
  }
}

再來看一下host端:

    <style>
      div {
        border: solid 1px #6699CC;
        background: #99CCFF;
      }
    </style>
    <script>
function F(arr, f) {
  return new (function() {
    var workers = new Array(4);
    var parallel = true;
    var requestHandler,resultHandler,result,callback;
    var returned = false;
    for(var i=0; i<4; i++) {
      workers[i] = new Worker('test813.js');
      workers[i].onmessage = function(m) {
        switch(m.data.cmd) {
          case 'request':
            requestHandler.call(this);
            break;
          case 'result':
            resultHandler.call(this, m.data.payload);
            break;
        }
      };
      workers[i].postMessage({cmd:'executor',payload:f.toString()});
    }
    this.map = function(f) {
      requestHandler = function() {
        var payload = f.call(arr);
        if(payload) {
          this.postMessage({cmd:'response',payload:payload})
        } else {
          if(!returned) {
            returned = true;
            callback(result);
          }
          this.terminate();
        }
      };
      return this;
    };
    this.reduce = function(f) {
      resultHandler = function(curr) {
        result = result? result:{};
        result = f(result, curr);
      };
      return this;
    };
    this.run = function(f) {
      callback = f;
      for(var i=0; i<workers.length; i++) {
        workers[i].postMessage({cmd: 'run'});
      }
    };
  })();
}
    </script>      
  

  <div id="origin"></div>
  <button id='run'>go</button>
  <div id="result"></div>


<script>
var arr = [
  'abdegiawhfheeufjosdifiawhfhe',
  'iawhfheijsdowhfheeufjosifkvnawefiuu',
  'difajifviawhfheaiowhfheewhfheeufjosufjos',
  'ifiwsvwhfheeufjosiiawhfheejfijsdifj',
  'isdiidifjfjjsidwhfheeufjiawhfheosjf'
];

document.getElementById('origin').innerHTML = '<table border="1" cellpadding="5" cellspacing="0"><tr><td>'+arr.join('</td></tr><tr><td>')+'</td></tr></table>';

var parallelreducer = F(arr, function(str) {
  ret = {};
  if(str) {
    for(var i=0; i<str.length; i++) {
      if(ret[str[i]]) {
        ret[str[i]]++;
      } else {
        ret[str[i]] = 1;
      }
    }
  }
  return ret;
}).map(function() {
  return this.shift();
}).reduce(function(prev, curr) {
  for(var i in curr) {
    if(typeof prev[i] !== 'undefined') {
      prev[i] += curr[i];
    } else {
      prev[i] = curr[i];
    }
  }
  return prev;
});

document.getElementById('run').onclick = function() {
  parallelreducer.run(function(result) {
    var target = document.getElementById('result');
    var str = '<table border="1" cellpadding="5" cellspacing="0">';
    for(var i in result) {
      str += '<tr><td>'+i+'</td><td>'+result[i]+'</td></tr>';
    }
    str += '</table>';
    target.innerHTML = str;
  });
};
</script>

arr是要處理的資料,也同時顯示在畫面中。按下go按鈕,則會出現處理結果,這裡做的是資料中使用字母的統計。

這裡展示的是先呼叫F(陣列,處理方法).map(提供資料的方法).reduce(彙總資料的方法),然後到go按鈕的onclick事件再實際呼叫.run()來執行,並且顯示結果。目前預設會開四個Worker來跑,有些瀏覽器會限制每個頁面可以開啟的Worker數目,如果要修改Worker數目的話,應該是有上限的。

另外,我沒有clone要處理的資料...所以隨著處理過程會......

在Chrome跑起來大概像這樣:

按下go按鈕之後:

目前測試過在最新的Chrome跟FF可以執行無誤。不過這還只是概念驗證而已,應該很多細節還沒考慮清楚XD


尚未有邦友留言

立即登入留言