昨天修正從外部控制 event loop 的問題,導入 threading.Event
作為控制 event loop 的「信號」。另外只要是 event loop 會用 thread 的方式執行,避免影響其他部份的程式執行。
今天實作主控端的部份:
說明玩家回合的過程:
實作完玩家回合後,要做遊戲流程就很快了
剛寫完,測試部份沒有很完善。輸出範例暫時不附上,有興趣的人可以在自己電腦上跑跑看
至此,我們把玩家操作、主控端都模擬出來,雖然實際的遊戲指令還沒完善,但算是可以跑起來的架構。
剩下還有一些工作:
後面差不多就可以開始進行遊戲內容的開發了,我們明天見!
from queue import Queue
from time import sleep
from random import choice, randint
import threading
from threading import Event
class GameCtrl():
def __init__(self):
self._pipes_from = [Queue(),Queue()]
self._pipes_to = [Queue(),Queue()]
self._actived = [Event(), Event()]
self._cmds = ["action","end"]
self._next = Event() # signal for turn next player
self._gameOver = Event() # signal for game over
p1_loop = threading.Thread(target=self.playerLoop, args=[0])
p2_loop = threading.Thread(target=self.playerLoop, args=[1])
p1_loop.start()
p2_loop.start()
def send(self, pid, cmd, *args):
print("[info] GameCtrl: send to player#%d: %s, %s" % (pid,cmd,str(args)))
self._pipes_to[pid].put([cmd,args])
def update(self,pid, *args):
print("[info] GameCtrl: update %s" % (str(args)))
self.send(pid,"update",*args)
def actionEventHandler(self, pid, *args):
print("[info] action event handler: player#%d action %s" % (pid,str(args)))
other_pid = (pid+1) % 2
self.update(other_pid, *args)
def endEventHandler(self, pid):
print("[info] end event handler: player#%d" % pid)
self._actived[pid].clear()
self._next.set()
def forceEndEventHandler(self, pid):
print("[info] forceEnd event handler: player#%d" % pid)
self._actived[pid].clear()
self._next.set()
def setGameOver(self):
self._gameOver.set()
def playerTurn(self, pid):
print("[info] GameCtrl: player#%d turn" % pid)
self._actived[pid].set() # turn on loop
self._next.clear()
self.send(pid, "start")
self.update(pid, "draw_card")
# force end counter
t = threading.Timer(10, self.forceEndEventHandler, [pid])
t.start()
self._next.wait()
print("[info] GameCtrl: player#%d turn end" % pid)
def gameLoop(self):
self._gameOver.clear()
t = threading.Timer(60, self.setGameOver) # count down to game over
t.start()
active_pid = 0 # first active player
while self._gameOver.is_set() is False:
self.playerTurn(active_pid)
active_pid = (active_pid+1) % 2 # change player
def playerLoop(self, pid): # run on thread, controled by signal
while True:
print("[info] GameCtrl: player event loop wait signal")
self._actived[pid].wait()
while self._actived[pid].is_set():
r = self._pipes_from[pid].get() # block=True
if r is "action":
self.actionEventHandler(pid)
elif r is "end":
self.endEventHandler(pid)
else:
print("[warm] playerloop: get unsupported event %s" % r)
def playerActionTest(ctrl, pid, n=None, interval=None): # test
sendTestCmd = lambda ctrl, pid, cmd: ctrl._pipes_from[pid].put(cmd)
while True:
print("[test] playerActionTest waiting")
ctrl._actived[pid].wait()
print("[test] player action")
if n is None: n=randint(1,5) # cmd
if interval is None: interval=randint(1,15) # sec
t = interval / n
print("[test] action_n=%d interval_t=%.1fs" % (n,t))
for i in range(n):
sendTestCmd(ctrl, pid, "action")
sleep(t)
sendTestCmd(ctrl, pid, "end")
g = GameCtrl()
p1_tester = threading.Thread(target=playerActionTest, args=[g,0])
p1_tester.start()
p2_tester = threading.Thread(target=playerActionTest, args=[g,1])
p2_tester.start()
# g.playerTurn(0)
g.gameLoop()