iT邦幫忙

2024 iThome 鐵人賽

DAY 25
1
AI/ ML & Data

一個Kaggle金牌解法是如何誕生的?跟隨Kaggle NLP競賽高手的討論,探索解題脈絡系列 第 25

[Day25]誰說打kaggle比賽一定要訓練模型?從第三名的解法看 Self-Consistency + Code Reasoning 之外的比賽工程技巧

  • 分享至 

  • xImage
  •  

前言

昨天我們介紹了第一名的作法,雖然很精彩,但是他們在前期的兩階段訓練中(可能也是最重要的一部分)耗費了對個人參賽者(非實驗室)來說巨大的算力(八張 H100 GPU)。

如果我們仔細看 Leaderboard 金牌區的得分情況,會發現除了第一名答出 29 題,第二名答對 22 題,第三四名答對 21 題,第五到第十二名都答出 20 題。所以其實除了第一名,後面幾位金牌得獎者解出的題數都差不多。
在這些做法中,會不會有些方法是我們可以低成本復現、用 LoRA 微調模型就好,或甚至連訓練都不用訓練就可以得分的作法呢?

.......

還真的有。

不想 Fine-Tune 組

🎖️早期分享獎

在開始介紹前他優勝解法前,我們要先介紹 @AbdurRafae 大大在比賽的早期曾經分享一個改良版本的Zero-shot MMOS-DeepSeekMath-7B with self-consistency and generated code reasoning evaluation,他的前一個版本我們在 [Day 23]有詳細地帶大家 go through 一遍他的 code,並在 training data 上做一些小實驗,歡迎有興趣的朋友先去看看~

今天要分享的版本做了以下改動:

  1. 使用量化壓縮過後的 DeepSeekMath-7B。在前一個版本中,他 load quantized 到 4bits 版本的模型來 inference;但在今天的 improved 版本中,他選擇直接load float16 版本的模型。
  2. 增加每個問題做 self-consistency 的次數。原先是一個問題會重複問五次,現在增加到一個問題最多問 22 次。
  3. 設計兩種不同的 prompt,每次推理時隨機選擇一種 prompt,增加模型推理的多樣性。
  4. 因為重複問的次數增加有可能會超時,所以增加對 time limit 的限制
  5. 如果一個問題很簡單,模型回答的問題一致性很高的話,就跳過不需要一直問了;限制每一道題最多可以輸出的 token 數量,這樣如果太難的問題模型需要花很多token 來解釋,那就不用在這種題目重複 inference 很多遍了,反正很大可能做出來是錯的。
  6. 如果模型有生成可執行的代碼,就將代碼運行後的結果回傳給模型,讓模型根據這些資訊做出最終答案;如果生成的代碼無法執行的話,就再給模型生成答案的一次機會,就是把所有 prompt 以及剛剛模型的生成的回答都再丟回去讓模型再回答一次。

但其實第六點我覺得有點奇怪,如果模型生成無法執行的代碼,應該要把錯誤訊息傳回給模型,讓模型將生成的代碼改一下,然後我們再執行一遍。如果這樣做效果應該會更好。

我們把他的 code 重構一下比較好理解~

  • main_processing_loop 主邏輯
    下面這個 main function 主要做的事情就是控制模型對每個問題都做多次 self-consistency(22次),並在每次詢問的過程中,分別截取 llm 自己推理出的答案,以及我們執行 llm 生成的代碼後實際 run 出來的數值。

其中每個 function 的工作可參考代碼中的註釋。

def main_processing_loop(df, model, tokenizer, total_tokens, repetitions, time_limit, stopping_criteria, temperature, top_p):
    """主循環,處理每個問題的重複推理並更新結果。"""
    start_time = time.time()
    best_stats, total_outputs, question_type_counts = {}, {}, {}
    starting_counts = (2, 3)

    for repetition in tqdm(range(repetitions)):
        for i in tqdm(range(len(df))):
            elapsed_time = time.time() - start_time
            if elapsed_time > time_limit:
                print(f"Time limit exceeded: {elapsed_time:.0f} seconds")
                return

            problem_id = df['id'].iloc[i]
            problem = df['problem'].iloc[i]
            print(f"\nQUESTION {i} - Repetition {repetition} - Elapsed Time: {elapsed_time:.0f} seconds")

            best, best_count = best_stats.get(i, (-1, -1))
            // 根據最佳答案出現次數和重複次數的平方根來判斷是否應該跳過問題。
            if should_skip_question(best_count, repetition):
                print("Skipping due to sufficient best count.")
                continue

            outputs = total_outputs.get(i, [])
            text_answers, code_answers = question_type_counts.get(i, starting_counts)
            results, answers = [], []

            clear_memory()
            // 負責生成初始提示,根據文本推理和代碼推理的次數來決定選擇哪種提示。
            prompt = generate_initial_prompt(problem, text_answers, code_answers)
            print(f"Initial Prompt: {prompt}")
            // 將模型的文本生成邏輯分離出來,使其單獨處理。
            decoded_output, input_len = execute_model_generation(prompt, model, tokenizer, total_tokens, 
                                                                 USE_PAST_KEY, temperature, top_p, stopping_criteria)

            cummulative_code = ""
            // 負責在生成的文本中處理代碼部分,並更新 prompt。
            prompt, cummulative_code = process_and_update_results(
                decoded_output, len(prompt), cummulative_code, stop_words, total_tokens, temperature, top_p)

            result_output = process_text_output(decoded_output)

            try:
                code_output = round(float(eval(cummulative_code))) % 1000
            except Exception as e:
                print(f"Final evaluation error: {e}")
                code_output = -1

            if code_output != -1:
                outputs.append(code_output)
                code_answers += 1

            if result_output != -1:
                outputs.append(result_output)
                text_answers += 1

            if len(outputs) > 0:
                most_common_output = Counter(outputs).most_common(1)[0]
                if most_common_output[1] > best_count:
                    best = most_common_output[0]
                    best_count = most_common_output[1]

                if most_common_output[1] > 5:
                    print("Answer found!")
                    break

            best_stats[i] = (best, best_count)
            question_type_counts[i] = (text_answers, code_answers)
            total_outputs[i] = outputs

            print(f"Text answers: {text_answers - starting_counts[0]}, Code answers: {code_answers - starting_counts[1]}")

            if DEBUG:
                break

下面是這個 function 維護的變量:

// 用來跟蹤每個問題的最佳答案和其出現的次數,格式為 {index: (best_answer, best_count)}
best_stats = {}
// 跟蹤每個問題的所有生成輸出。
total_outputs = {}
// 用來統計文本和代碼推理的次數,格式為 {index: (text_count, code_count)}。
question_type_counts = {}
// 預設的文本和代碼推理的次數。
starting_counts = (2, 3)
// 記錄主迴圈的起始時間,用於計算運行時間。
note_start_time = time.time()

接下來我們詳細解釋每一個 function 的實作。

def should_skip_question(best_count, repetition_index):
    """判斷是否應該跳過這個問題,根據當前最佳答案的出現次數。"""
    return best_count > np.sqrt(repetition_index)

should_skip_question()會根據問題的索引 index,從 best_stats 中獲取該問題的最佳答案出現的次數 best_count。如果 best_count 大於當前重複次數的平方根 np.sqrt(repetition),則返回 True,表示可以跳過該問題。
這部分也是上一版本中沒有做的優化,這個邏輯的目的是避免對已經獲得穩定答案的問題進行多次推理,以節省計算資源。

def generate_initial_prompt(problem, text_answers, code_answers):
    """生成初始提示,根據文本和代碼推理次數決定提示內容。"""
    counts = np.array([text_answers, code_answers])
    selected_prompt = choice(prompt_options, 1, p=counts / counts.sum())[0]
    initial_message = selected_prompt.format(problem, "{}")
    return f"User: {initial_message}"

接下來把這次選擇的 prompt 傳入模型,讓模型生成回答:

def execute_model_generation(prompt, model, tokenizer, max_tokens, use_past_key, temperature, top_p, stopping_criteria):
    """執行模型的文本生成,返回解碼後的輸出。"""
    model_inputs = tokenizer(prompt, return_tensors='pt').to(model.device)
    input_len = len(model_inputs['input_ids'][0])

    generation_output = model.generate(
        **model_inputs, 
        max_new_tokens=max_tokens,
        return_dict_in_generate=use_past_key,
        do_sample=True,
        temperature=temperature,
        top_p=top_p,
        num_return_sequences=1,
        stopping_criteria=stopping_criteria
    )

    output_ids = generation_output.sequences[0] if use_past_key else generation_output[0]
    decoded_output = tokenizer.decode(output_ids, skip_special_tokens=True)
    return decoded_output, input_len

接下來是最重要的一個 function:process_and_update_results()

這個function的主要任務是:

  1. 在生成的文本中尋找代碼片段。
  2. 執行生成的代碼
  3. 將執行的結果結合前面的代碼再次傳給 model 讓它根據上述資訊給出最終答案
def process_and_update_results(decoded_output, current_printed, cummulative_code, stop_words, total_tokens, temperature, top_p):
    """處理生成的代碼並更新生成結果,返回新的 prompt 和代碼累積。"""
    while (any(decoded_output.endswith(stop_word) for stop_word in stop_words)) and (len(cummulative_code) < total_tokens):
        if decoded_output.endswith("
python"):
            temperature_inner = temperature_coding
            top_p_inner = top_p_coding
            prompt = decoded_output
        else:
            temperature_inner = temperature
            top_p_inner = top_p

        try:
            code_text = extract_code_segment(decoded_output)
            cummulative_code += code_text

            code_output, code_status = process_code(cummulative_code, return_shell_output=True)
            if not code_status:
                cummulative_code = cummulative_code[:-len(code_text)]

        except Exception as e:
            print("ERROR PARSING CODE", e)
            code_output = -1

        prompt = f"{decoded_output}\n
output\n{code_output}\n
" if code_output != -1 else decoded_output
        decoded_output = execute_model_generation(prompt, model, tokenizer, total_tokens - len(cummulative_code), 
                                                  use_past_key, temperature_inner, top_p_inner, stopping_criteria)[0]
        current_printed += len(decoded_output)

    return prompt, cummulative_code

下面 process_text_output()負責 parsing模型生成的最終答案:

def process_text_output(output):
    try:
        boxed_result = re.findall(r'\\boxed\{(\d+)\}', output)
        return round(float(boxed_result[-1])) % 1000 if boxed_result else round(float(naive_parse(output))) % 1000
    except Exception as e:
        print(e, 'ERROR PARSING TEXT')
        return -1

總之,提交上面這段代碼,最高可以在 LB 取得 20 分的成績,最低只有 13 分,因為這種方法的穩定性還是不高。

🥉 3rd Solution

第三名也沒有 fine-tuned 任何模型,他們採用跟上面作法很像的策略,但是在實作上增加一些細節:

  • vLLM的應用與修改
    第三名的團隊選擇使用 vLLM 而不是 HuggingFace 上的 model api。vLLM 是一個經過高度優化的開源大型語言模型(LLM)服務框架,專門用來提升 GPU 上的服務效能,具備以下幾個特色:
    1. 支援 PagedAttention 技術的優化版 Transformer
    2. 支援batch inference,能提升整體的服務效能
    3. 支援多 GPU 的平行運算和分散式服務
      關於 vLLM 的介紹與使用方法可以參考這篇好文:LLM 加速推論的工具-vLLM
from vllm import LLM, SamplingParams
MAX_MODEL_LEN = 4000

llm = LLM(model="/kaggle/input/deepseek-math" if ON_KAGGLE else "deepseek-ai/deepseek-math-7b-rl",
          dtype='half',
          enforce_eager=True,
          gpu_memory_utilization=0.99,
          swap_space=4,
          max_model_len=MAX_MODEL_LEN,
          kv_cache_dtype="auto" if ON_KAGGLE else 'fp8', # We found that using fp16 for the kv_cache improved the score
          tensor_parallel_size=2)

tokenizer = llm.get_tokenizer()

在實作上他們發現使用 bf16 會比 fp16 效果更好,但是 vLLm 在 kaggle 的 T4 GPU 上不支援 bf16 的 floating type,所以他們修改 vLLM 的代碼,將檢查 bf16 alert 的部分去掉;並且他們發現 transformer 的 MLP 部分使用 bf16 計算會變超級慢(這可能就是 vLLM 不支援在 T4 上用 bf16 運算的原因),所以他們最終改成除了 MLP 的部分還是用 fp16 計算以外,其他所有計算都改成 bf16
詳細的代碼實現可以參考這邊

其他 improvement 我們直接重構第三名的 inference notebook 來解釋~

下面是他們的 main function:

from dataclasses import dataclass

@dataclass
class SolutionDetails:
    prompt: str
    cummulative_code: str = ""
    last_executed_code_result: str = -1
    error_count: int = 0
    forced_output: bool = False


MAX_NUM_ITERATIONS = 7
sampling_params = SamplingParams(
    temperature=0.9,
    max_tokens=2000,
    top_p=1.0,
    logits_processors=[process_token],
    stop=stop_words
)

problems_solved = 0
for test, sample_submission in iter_test:
    # 開始時間管理
    time_management.record_start()
    
    # 最大化 prompt 的數量
    prompt_counts = time_management.maximize_prompt_counts(prompt_counts, PROBLEMS_TO_SOLVE - problems_solved)
    print("New prompt counts:", prompt_counts)
    
    # 獲取當前問題
    problem = test['problem'].iloc[0] if PRIVATE else test['problem']
    prompts_and_code = [SolutionDetails(prompt=p) for p in get_prompts(problem)]
    
    times_iterated = 0
    solutions = []
    _solutions = defaultdict(list)
    
    # 迭代生成和執行代碼
    while len(prompts_and_code) > 0 and times_iterated < MAX_NUM_ITERATIONS:
        # 生成所有提示的回應
        prompts = [pc.prompt for pc in prompts_and_code]
        responses = llm.generate(prompts, sampling_params)
        responses = [LLMResult(response.prompt, out.text) for response in responses for out in response.outputs]
        
        prompt_to_code = [-1] * len(prompts_and_code)
        files_needed = 0
        
        # 提取並處理生成的代碼
        for i, (pc, response) in enumerate(zip(prompts_and_code, responses)):
            extracted_code = extract_code_from_text(response.completion)
            if extracted_code:
                code = preprocess_code(pc.cummulative_code + "\n" + extracted_code)
                prompt_to_code[i] = files_needed
                with open(f'codes/code{files_needed}.py', 'w') as f:
                    f.write(code)
                files_needed += 1

        # 批量執行代碼
        outputs = run_batch_of_codes(files_needed)
        prompts_and_code_new = []
        
        # 更新提示和代碼結果
        for i, (pc, candidate) in enumerate(zip(prompts_and_code, responses)):
            cummulative_code = pc.cummulative_code
            last_executed_code_result = pc.last_executed_code_result
            code_error_count = pc.error_count
            
            print("================================")
            text = candidate.completion
            prompt = candidate.prompt
            
            if prompt_to_code[i] != -1:
                # 執行代碼並更新提示
                executed_code, CODE_STATUS = outputs[prompt_to_code[i]]
                if executed_code != -1:
                    text += f"\n```output\n{executed_code}\n```\n"
                    if CODE_STATUS:
                        extracted_code = extract_code_from_text(text)
                        cummulative_code += extracted_code
                else:
                    cummulative_code = ""
                
                # 構建新提示
                new_prompt = prompt + text
                if last_executed_code_result == executed_code:
                    code_error_count += 1
                else:
                    code_error_count = 0
                
                last_executed_code_result = executed_code

                if len(tokenizer(new_prompt)['input_ids']) < MAX_MODEL_LEN and not pc.forced_output:
                    if code_error_count >= 1:
                        print("REPEATED ERRORS OR RESULT, PRUNING")
                        continue
                    
                    prompts_and_code_new.append(SolutionDetails(
                        prompt=new_prompt,
                        cummulative_code=cummulative_code,
                        last_executed_code_result=last_executed_code_result,
                        error_count=code_error_count
                    ))
                else:
                    last_executed_code_result = extract_final_answer_from_code_result(last_executed_code_result)
            else:
                # 處理非代碼結果
                output = process_text_output(prompt + text)
                if output != -1:
                    solutions.append([output, prompt + text])
                    _solutions[output].append(prompt + text)
                elif not '\\boxed{' in text and not pc.forced_output:
                    print("Forcing output on next turn.")
                    prompts_and_code_new.append(SolutionDetails(
                        prompt=prompt + text + "\nThe final answer is \\boxed{",
                        cummulative_code=cummulative_code,
                        last_executed_code_result=-1,
                        error_count=code_error_count,
                        forced_output=True
                    ))
                last_executed_code_result = extract_final_answer_from_code_result(last_executed_code_result)

        # 更新提示列表和迭代計數
        prompts_and_code = prompts_and_code_new
        times_iterated += 1
        
        # 提前退出條件檢查
        number_of_votes = sum([len(_solutions[k]) for k in _solutions])
        threshold = 10
        if len(_solutions) < 3 and number_of_votes < threshold:
            continue
        
        sorted_occurrences = sorted([len(_solutions[k]) for k in _solutions], reverse=True)
        if number_of_votes >= threshold:
            if sorted_occurrences[0] > number_of_votes / 2:
                print("EARLY EXIT")
                break
        
        max_new_votes = len(prompts_and_code_new)
        diff_to_cover = sorted_occurrences[0] - sorted_occurrences[1]
        if diff_to_cover > max_new_votes:
            print("EARLY EXIT")
            break
        
        if number_of_votes > 15 and len(_solutions) < 3:
            print('early exit')
            break
    
    # 對解決方案進行計分
    counts = defaultdict(int)
    for solution in solutions:
        try:
            solution[1] = solution[1].lower()
            solution[0] = process_text_output(solution[1])
            
            # 對純文本答案給予較少的分數
            counts[solution[0]] += 0.05
            code_outputs = [part.split('\n```')[0] for part in solution[1].split('```output\n')[1:]]
            
            # 若代碼結果與文本結果一致,給予更高分數
            for code_output in code_outputs:
                try:
                    if solution[0] == int(code_output):
                        counts[solution[0]] += 0.8
                except:
                    nums = extract_numbers_from_text(code_output)
                    for num in nums:
                        if solution[0] == num:
                            counts[num] += 1.0
        except:
            pass

    counts[-1] = -123123  # 懲罰無效的結果
    
    # 懲罰小數字和問題中出現的數字
    for i in range(0, 11):
        counts[i] -= 0.05 * (prompt_counts[0] + prompt_counts[1])
    
    for num in extract_numbers_from_text(problem):
        counts[num] -= 0.10 * (prompt_counts[0] + prompt_counts[1])
    
    print(counts)
    
    # 選擇最優解決方案
    best_solution = max(counts, key=counts.get)
    sample_submission['answer'] = best_solution
    env.predict(sample_submission)
    print("FINAL ANSWER:", best_solution)
    print("=" * 90)
    
    # 記錄時間結束
    time_management.record_end(prompt_counts)
    problems_solved += 1

邏輯跟前面差不多,但是新增了 TimeManagement,他們創建了一個 TimeManager 類來管理時間分配。它會追蹤每個問題所花費的時間,並動態調整後續問題的嘗試次數,以確保在給定的時間內解決盡可能多的問題。

SOFT_TIME_LIMIT = 60 * 60 * 8.2 if PRIVATE else 60 * 60 * 1# 8 hours
HARD_TIME_LIMIT = 60 * 60 * 8.4 if PRIVATE else 60 * 60 * 2# 8 and a half hours

class TimeManagement:
    
    def __init__(self, prompt_counts):
        # Initial counts, base time
        self.recorded_times = {
            (3, 4, 0, 0): 76, 
            (7, 10, 0, 0): 90.11047387123108, 
            (10, 14, 0, 0): 158.42619514465332, 
            (11, 15, 0, 0): 163.69477319717407, 
            (12, 17, 0, 0): 162.39567685127258, 
            (14, 19, 0, 0): 148.9604458808899, 
            (16, 23, 0, 0): 219.65354228019714, 
            (15, 20, 0, 0): 112.13560438156128, 
            (17, 24, 0, 0): 131.51007294654846, 
            (25, 35, 0, 0): 196.37973880767822, 
            (36, 50, 0, 0): 250.18024468421936, 
            (3, 5, 0, 0): 61.941444873809814, 
            (81, 108, 0, 0): 1550.27548289299, 
            (59, 78, 0, 0): 603.0817701816559, 
            (69, 93, 0, 0): 629.2428097724915, 
            (75, 100, 0, 0): 622.1524519920349, 
            (77, 103, 0, 0): 550.9045436382294, 
            (79, 105, 0, 0): 601.617894411087, 
            (89, 118, 0, 0): 517.1569063663483, 
            (149, 199, 0, 0): 2357.601425409317, 
            (142, 189, 0, 0): 1592.241544008255}
        self.time_start = None
        self.initial_prompt_counts = prompt_counts
    
    def _get_estimated_time(self, prompt_counts):
        # Calculate estimated time based on closest historical data or base time
        prompt_counts_tuple = tuple(prompt_counts)
        if prompt_counts_tuple in self.recorded_times:
            return self.recorded_times[prompt_counts_tuple]
        
        closest_key = min(self.recorded_times.keys(), key=lambda k: self._normalized_distance(k, prompt_counts_tuple))
        base_time = self.recorded_times[closest_key]
        # Adjust time by scalar of difference in sum of counts
        sum_initial = sum(closest_key)
        sum_current = sum(prompt_counts_tuple)
        return base_time + (sum_current - sum_initial) * 8  # Adding or subtracting time based on counts change
    
    def _normalized_distance(self, key1, key2):
        # Calculate Euclidean distance between two tuples
        max_length = max(len(key1), len(key2))
        extended_key1 = key1 + (0,) * (max_length - len(key1))
        extended_key2 = key2 + (0,) * (max_length - len(key2))
        return sum((x - y) ** 2 for x, y in zip(extended_key1, extended_key2)) ** 0.5
    
    def record_start(self):
        self.time_start = time.time()
        
    def record_end(self, prompt_counts):
        if self.time_start is None:
            return
        prompt_counts_tuple = tuple(prompt_counts)
        elapsed_time = time.time() - self.time_start
        if prompt_counts_tuple in self.recorded_times:
            self.recorded_times[prompt_counts_tuple] = 0.3*elapsed_time + 0.7*self.recorded_times[prompt_counts_tuple]
        else:
            self.recorded_times[prompt_counts_tuple] = elapsed_time
        self.time_start = None
    
    def exit_early(self, prompt_counts) -> bool:
        global NOTEBOOK_START_TIME, HARD_TIME_LIMIT
        time_left = HARD_TIME_LIMIT - (time.time() - NOTEBOOK_START_TIME)
        estimated_time = self._get_estimated_time(prompt_counts)
        return time_left < estimated_time
    
    def maximize_prompt_counts(self, prompt_counts, problems_left):
        global NOTEBOOK_START_TIME, SOFT_TIME_LIMIT, HARD_TIME_LIMIT
        time_left = (SOFT_TIME_LIMIT if problems_left != 1 else HARD_TIME_LIMIT) - (time.time() - NOTEBOOK_START_TIME)

        last_scale_factor = None
        scale_factor = 0.1
        
        while scale_factor < 50.0:
            scaled_counts = [int(x * scale_factor) for x in self.initial_prompt_counts]
            estimated_time = problems_left * self._get_estimated_time(scaled_counts)
            
            if estimated_time > time_left:
                break;
            last_scale_factor = scale_factor
            
            scale_factor += 0.01
            
        if last_scale_factor == None:
            return prompt_counts
        

        updated_prompt_counts = [int(x * last_scale_factor) for x in self.initial_prompt_counts]
        if sum(updated_prompt_counts) == 0:
            return prompt_counts

        return updated_prompt_counts


    
time_management = TimeManagement(prompt_counts)

另外為了提升準確性,他們讓 llm 對同一個問題 inference 多次(self-consistency),但這樣就會產生很多要執行的 code。為了加速整體的計算速度,他們先將 llm 產生的 code 都集中到一個 folder,接著會批次平行地執行這些代碼:

def run_batch_of_codes(files_count):
    """批量執行代碼,並解析每個文件的執行結果。"""
    subprocess.run(['./codes/runner.sh', str(files_count), '4'])
    results = []
    for i in range(files_count):
        try:
            with open(f'codes/result{i}.txt', 'r') as f:
                result = f.read()
                parsed_result = postprocess_code(result)
                results.append(parsed_result)
        except Exception as e:
            results.append(("Error reading file", False))
            print(f"Error reading file {i}/{files_count-1}: {e}")
    return results

為了節省時間,不花時間重複做太簡單的題目,他們會設定一個提前退出的機制。那什麼是太簡單的題目呢?就是模型給出的 10 次回答中,有超過一半都是一樣的答案,那代表這道題目對模型來說,用不同解法都能得到相同答案,一致性很高沒什麼搞不懂、模糊不清的地方,對這樣的題目就可以提前終止計算。

另外,他們還對模型做錯題分析,發現模型的最終結果常常出現兩種類型的錯誤:

  1. 答案的數字很小(<10),這通常是因為模型寫的code中有錯誤導致的。
  2. 答案是題目陳述中的一部分。也就是說模型直接把題目中出現過的數字貼到答案區這邊,而且他們發現在所有錯題情況中,這種情況的頻率非常高,遠高於答案可能真的是題目有提到過的數字的可能性。

因此,他們的解法就是做加權投票來決定最終的答案。也就是說以前可能模型生成 10 種不同解法,其中得到 3 種不同的答案,我們只需要去看哪一個答案出現的頻率最高,就將它當作最終答案輸出即可,這叫做多數決;但現在他們發現有些答案很可能是錯的,那我們就給這些答案一個很低的權重,最終將答案出現的頻率與權重相乘,根據這個加權過的頻率來選擇最終的答案。

舉例來說,
每個模型生成的答案都會有一個基本分(0.05),接下來會根據下列三個情況來調整配分:

  • 判斷「llm 自己推理出來得到的答案」與「執行 llm 生成的 code 所得到的答案」是否一樣,如果一樣的話,該個答案的含金量(配分)就會提高
    假設有以下解決方案:
solutions = [[42, "The answer is ```output\n42\n```"], [99, "The answer is ```output\n99\n```"]]

處理第一個解決方案(42):

solution[0] = 42
counts[42] += 0.05,初始分數為 0.05
code_outputs = ["42"]
42 == 42,匹配成功,因此 counts[42] += 0.8,最終分數為 0.85

處理第二個解決方案(99):

solution[0] = 99
counts[99] += 0.05,初始分數為 0.05
code_outputs = ["109"]
99 != 109,匹配成功,因此 counts[99] 還是只有 0.05 分。
  • 判斷該個答案數值是不是<10。這通常是代碼有問題導致的,這種答案會被扣分
  • 判斷該個答案是否出現在題目中。這通常是模型直接把題目中提到的答案抄一遍導致的,這種答案也會被扣分。

所以假設現在模型生成五個可能的答案:
第一次生成:42(得分 +0.05)和代碼結果匹配(得分 +0.8)
第二次生成:99(得分 +0.05),代碼結果不匹配
第三次生成:42(得分 +0.05)和代碼結果匹配(得分 +0.8)
第四次生成:123(得分 +0.05)和代碼結果匹配(得分 +0.8)
第五次生成:42(得分 +0.05),代碼結果不匹配

那們最終可以整理成一個 count 字典:

counts = {
    42: 2.55,  # 出現3次,2次代碼結果匹配
    99: 0.05,  # 出現1次,沒有代碼匹配
    123: 0.85  # 出現1次,有代碼匹配
}

最高分的答案是 42 ,所以將 42 當作該題的最終答案輸出。

總之,第三名從下面四個面向來提升模型在 zero-shot 的情況下的答題能力:

  1. 使用 vLLM 來生成大量的提示(大多數情況下超過 100 個)。
  2. 平行執行代碼,大幅縮短了所需的時間。節省更多時間就可以留給 llm 多次 inference 的機會,多次 inference 就可以提升得到正確解答的機率。
  3. 一些簡單的提前退出規則。
  4. 根據在validation set 做錯題分析的觀察,制定了一個有效地加權投票計分規則。

他們的完整代碼請參考這裡

小結

整體來說,第三名的解法很難給我們眼睛一亮、新的思路的那種驚喜,但裡面有一些工程技巧直得我們學習。
例如在kaggle有限的運算資源上如何加速模型的 inference 速度(ex:改 vLLM 的 code)與平行處理檔案、如何使用 TimeManagement 在有限的時間內,調配每個題目最佳的嘗試次數、如何在 validation set 上做錯題分析,發現多數決可以做一些優化。

看起來都是一些很小的觀察和改進,但積沙成塔,這些小 tricks 成為最終拉開與其他參賽者分數的關鍵。

除了第三名以外,第四名以及第七名也採用類似的self-consistency + code reasoning 的方式當作他們的 solution。

在前 12 名金牌搖滾區中,有五名參賽者公布他們的解法,其中兩位有 fine-tuned 模型並且是全參數微調,剩下的三位皆在不改變模型參數的情況,透過設計一些思維鏈、答案選擇的方式來取得佳績。

有別於以前參加 Kaggle 比賽一定要用主辦方提供的 training data 或是自己想辦法擴增可以用的訓練資料來 train model;在這次比賽中我們可以發現,其實現在有非常優秀的開源大語言模型,可以讓我們在「不 fine-tuned 」的前提下,透過設計一些精妙的「思維鏈」或搭配其他計算工具激發大模型的思考推理能力,其實就具有可以和在領域資料上全參數微調模型一較高下的能力了。

我個人比較喜歡這種小而美的方法,因為這些方法的經驗能低成本拿到其他領域或議題應用、快速迭代觀察結果。

不過我真的很好奇第三名第四名他們,為什麼都沒有像第一名一樣把 llm 前幾次生成的錯誤代碼,連同 error message 丟回去再叫 llm 修正呢?我想這樣應該可以有更好的表現。而且現在其實有先進的「思維鏈」設計方法,例如使用蒙地卡羅搜尋解法等。
之後打算修改一下第三名的代碼,late submit 上去看看效果再和大家分享~

不過說回來,前兩名效果最好的方案都還是有微調模型,並且都用 8 張 H100 訓練 10 個小時左右,第二名使用強化學習的訓練方法,所以一次還要 train 兩個模型,但也只比上述提供的方法多做出來一題。
(其實第一名也有嘗試用過強化學習的作法,但是遇到很多困難並沒有得到好的成果。)

關於 AIMO 賽題的系列文就到這邊結束啦~明天開始又會進入一個新的賽題了,大家明天見~


謝謝讀到最後的你,希望你會覺得有趣!
如果喜歡這系列,別忘了按下訂閱,才不會錯過最新更新,也可以按讚⭐️給我鼓勵唷!
如果有任何回饋和建議,歡迎在留言區和我說✨✨


Kaggle - AI Mathematical Olympiad - Progress Prize 1 解法分享系列)


上一篇
[Day24]Try and Error! 淺談整合 Tool-Integrated Reasoning 和 Code Debugging 能力的 Decode 策略
下一篇
[Day 26]"是人是AI,一照便知" - 沒想到最終能找出LLM槍手的原因,是因為LLM太完美了?!
系列文
一個Kaggle金牌解法是如何誕生的?跟隨Kaggle NLP競賽高手的討論,探索解題脈絡30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言