在我們的Lab架構中,Object的分割是在API層做的,其實我們的API層要做的事情滿多的,最好的情況是還能把API層拆成兩部分,這部分可以留給有興趣的讀者做
在API層接收到Object之後,我們的流程是
程式邏輯如下
PARTITION = 6
def splitData(data):
rsc = RSCodec(len(data))
res = rsc.encode(data) # 第1步
n = len(res)//PARTITION
big_num = len(res)%PARTITION
left_start = big_num*(n+1)
return [res[i*(n+1):(i+1)*(n+1)] for i in range(big_num)] \
+ [res[left_start+i*n:left_start+(i+1)*n] \
for i in range(0, PARTITION-big_num)] # 第2步
def produce_object(content):
if 'name' not in content:
return False
size = len(content['obj'])
components = splitData(content['obj'])
servers = getDataServers()
locats = []
for i, comp in enumerate(components):
while servers:
idx = random.randint(0,len(servers)-1)
server = servers[idx]
res = Producer.getInstance().send(
server, value={f"{content['hash']}-{i}": comp}) # 第3步
if res: # 第4步
locats.append((server, i, len(comp)))
break
del servers[idx]
if len(locats) == len(components):
DB.addMetadata(f"{content['name']}", content['version'], content['hash'], size, locats)
return True
else:
print(f"Numer of components: {num(components)} and number of locates: {len(locats)} are not match.", flush=True)
return False
由於我們使用hash當作檔案名稱存在data server,所以讀Object的時候,只要有hash就可以讀出來,程式邏輯如下
for server, idx, size in locate:
try:
res = requests.get(f"http://{server}:{PORT}/partition/{hash}-{idx}", timeout=1)
if res.status_code == 201:
return -1
else:
data += res.content
except Exception as e:
print('request error: %s' % e, flush=True)
data += b'x'*size # 如果server有問題則隨意填充
print(data, flush=True)
try:
res = combineData(data)
return res
Encode和decode的演算法讀者可以自行變換使用