#!/usr/bin/env python3 """ 遗传算法参数优化 - 使用DEAP库实现 针对TTA-E神经解码的参数搜索优化 """ import os import sys import numpy as np import pickle import argparse import time import json from typing import Dict, List, Tuple import random # DEAP库 from deap import algorithms, base, creator, tools # GPU加速支持 try: import cupy as cp GPU_AVAILABLE = True print("GPU acceleration available with CuPy") except ImportError: import numpy as cp GPU_AVAILABLE = False print("Using CPU computation with NumPy") # 设置随机种子 random.seed(42) np.random.seed(42) def to_cpu(x): """将CuPy数组转换为NumPy数组""" if GPU_AVAILABLE and hasattr(x, 'get'): return x.get() return x def load_base_predictions(cache_file='base_predictions_cache.pkl'): """加载预计算的基础预测结果""" if os.path.exists(cache_file): print(f"Loading base predictions from {cache_file}") with open(cache_file, 'rb') as f: return pickle.load(f) else: print(f"Cache file {cache_file} not found. Generating mock data...") return generate_mock_predictions() def generate_mock_predictions(): """生成模拟预测数据用于测试""" print("Generating mock base predictions for testing...") n_trials = 20 # 模拟20个试验 seq_len = 40 # 序列长度 vocab_size = 31 # 词汇表大小 n_tta = 5 # TTA样本数 base_predictions = [] np.random.seed(42) # 固定种子获得一致的结果 for trial in range(n_trials): # 生成真实字符序列 true_chars = np.random.randint(0, vocab_size, seq_len) # 模拟GRU和LSTM的概率预测 - 加入一些真实性 gru_probs = np.random.rand(n_tta, seq_len, vocab_size) lstm_probs = np.random.rand(n_tta, seq_len, vocab_size) # 让概率偏向真实答案,提高一些准确性 for i in range(seq_len): for tta in range(n_tta): # 给真实答案更高的概率 gru_probs[tta, i, true_chars[i]] += 2.0 lstm_probs[tta, i, true_chars[i]] += 2.0 # 归一化为概率分布 gru_probs = gru_probs / np.sum(gru_probs, axis=2, keepdims=True) lstm_probs = lstm_probs / np.sum(lstm_probs, axis=2, keepdims=True) base_predictions.append({ 'gru_probs': gru_probs, 'lstm_probs': lstm_probs, 'true_chars': true_chars }) return base_predictions def calculate_per(predicted, true): """计算音素错误率 (PER)""" if len(predicted) == 0 and len(true) == 0: return 0.0 if len(predicted) == 0 or len(true) == 0: return 1.0 # 简单的字符级编辑距离 n, m = len(predicted), len(true) dp = np.zeros((n + 1, m + 1)) for i in range(n + 1): dp[i][0] = i for j in range(m + 1): dp[0][j] = j for i in range(1, n + 1): for j in range(1, m + 1): if predicted[i-1] == true[j-1]: dp[i][j] = dp[i-1][j-1] else: dp[i][j] = 1 + min(dp[i-1][j], dp[i][j-1], dp[i-1][j-1]) return dp[n][m] / max(n, m) def evaluate_individual(individual, base_predictions_data): """评估个体适应度(返回PER错误率,越小越好)""" try: # 解析个体参数 tta_weights = individual[:5] # 前5个参数是TTA权重 gru_weight = individual[5] # 第6个参数是GRU权重 # 转换为GPU数组(如果可用) if GPU_AVAILABLE: tta_weights = cp.array(tta_weights) else: tta_weights = np.array(tta_weights) total_per = 0.0 total_chars = 0 for trial_data in base_predictions_data: gru_probs = trial_data['gru_probs'] # shape: (5, seq_len, vocab_size) lstm_probs = trial_data['lstm_probs'] # shape: (5, seq_len, vocab_size) true_chars = trial_data['true_chars'] # 转换为GPU数组 if GPU_AVAILABLE: gru_probs = cp.asarray(gru_probs) lstm_probs = cp.asarray(lstm_probs) # 计算TTA加权平均 tta_sum = cp.sum(tta_weights) if tta_sum > 0: tta_weights_norm = tta_weights / tta_sum else: tta_weights_norm = cp.ones_like(tta_weights) / len(tta_weights) # 对每个TTA样本加权 gru_weighted = cp.sum(gru_probs * tta_weights_norm[:, None, None], axis=0) lstm_weighted = cp.sum(lstm_probs * tta_weights_norm[:, None, None], axis=0) # 模型集成 ensemble_probs = gru_weighted * gru_weight + lstm_weighted * (1 - gru_weight) # 解码预测 if GPU_AVAILABLE: predicted_chars = cp.argmax(ensemble_probs, axis=1) predicted_chars = to_cpu(predicted_chars) else: predicted_chars = np.argmax(ensemble_probs, axis=1) # 计算PER per = calculate_per(predicted_chars, true_chars) total_per += per * len(true_chars) total_chars += len(true_chars) avg_per = total_per / total_chars if total_chars > 0 else 1.0 return (avg_per,) # DEAP需要返回元组 except Exception as e: print(f"Error in evaluate_individual: {e}") return (1.0,) # 返回最差分数 class GeneticOptimizer: """基于DEAP的遗传算法优化器""" def __init__(self, bounds, population_size=50, generations=100, crossover_prob=0.8, mutation_prob=0.2, tournament_size=3, elite_size=2): self.bounds = bounds self.population_size = population_size self.generations = generations self.crossover_prob = crossover_prob self.mutation_prob = mutation_prob self.tournament_size = tournament_size self.elite_size = elite_size # 参数边界 self.param_names = list(bounds.keys()) self.n_params = len(self.param_names) self.lower_bounds = [bounds[name][0] for name in self.param_names] self.upper_bounds = [bounds[name][1] for name in self.param_names] self.setup_deap() def setup_deap(self): """设置DEAP框架""" # 清理之前的类定义 if hasattr(creator, "FitnessMin"): del creator.FitnessMin if hasattr(creator, "Individual"): del creator.Individual # 创建适应度类(最小化PER) creator.create("FitnessMin", base.Fitness, weights=(-1.0,)) creator.create("Individual", list, fitness=creator.FitnessMin) # 创建工具箱 self.toolbox = base.Toolbox() # 注册属性生成器 - 确保在范围内 def create_bounded_uniform(low, high): def bounded_uniform(): return random.uniform(low, high) return bounded_uniform for i, (name, (low, high)) in enumerate(zip(self.param_names, zip(self.lower_bounds, self.upper_bounds))): self.toolbox.register(f"attr_{name}", create_bounded_uniform(low, high)) # 注册个体和种群生成器 self.toolbox.register("individual", tools.initCycle, creator.Individual, [getattr(self.toolbox, f"attr_{name}") for name in self.param_names], n=1) self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual) # 注册遗传操作 self.toolbox.register("mate", tools.cxBlend, alpha=0.3) # 混合交叉,减小alpha self.toolbox.register("mutate", self.gaussian_mutation) # 高斯变异 self.toolbox.register("select", tools.selTournament, tournsize=self.tournament_size) # 锦标赛选择 self.toolbox.register("evaluate", self.evaluate_wrapper) self.toolbox.register("clone", tools.clone) def evaluate_wrapper(self, individual): """评估函数包装器""" return evaluate_individual(individual, self.base_predictions) def gaussian_mutation(self, individual): """高斯变异操作""" for i in range(len(individual)): if random.random() < self.mutation_prob: # 高斯变异,标准差为参数范围的10% sigma = (self.upper_bounds[i] - self.lower_bounds[i]) * 0.1 individual[i] += random.gauss(0, sigma) # 边界处理 - 硬约束 individual[i] = max(self.lower_bounds[i], min(self.upper_bounds[i], individual[i])) return (individual,) def constraint_repair(self, individual): """约束修复 - 确保所有参数在合理范围内""" for i in range(len(individual)): individual[i] = max(self.lower_bounds[i], min(self.upper_bounds[i], individual[i])) return individual def optimize(self, base_predictions): """运行遗传算法优化""" self.base_predictions = base_predictions print(f"Starting Genetic Algorithm Optimization") print(f"Population size: {self.population_size}") print(f"Generations: {self.generations}") print(f"Crossover probability: {self.crossover_prob}") print(f"Mutation probability: {self.mutation_prob}") print(f"Tournament size: {self.tournament_size}") start_time = time.time() # 创建初始种群 population = self.toolbox.population(n=self.population_size) # 确保初始种群在边界内 for individual in population: self.constraint_repair(individual) # 评估初始种群 fitnesses = list(map(self.toolbox.evaluate, population)) for ind, fit in zip(population, fitnesses): ind.fitness.values = fit # 统计信息 stats = tools.Statistics(lambda ind: ind.fitness.values) stats.register("avg", np.mean) stats.register("min", np.min) stats.register("max", np.max) stats.register("std", np.std) # 名人堂(保存最佳个体) hall_of_fame = tools.HallOfFame(maxsize=10) # 记录历史 history = [] print(f"Initial population evaluated. Best fitness: {min(fitnesses)[0]:.6f}") # 进化过程 for generation in range(self.generations): # 选择下一代的父代 offspring = self.toolbox.select(population, len(population)) offspring = list(map(self.toolbox.clone, offspring)) # 交叉和变异 for child1, child2 in zip(offspring[::2], offspring[1::2]): if random.random() < self.crossover_prob: self.toolbox.mate(child1, child2) # 约束修复 self.constraint_repair(child1) self.constraint_repair(child2) del child1.fitness.values del child2.fitness.values for mutant in offspring: if random.random() < self.mutation_prob: self.toolbox.mutate(mutant) # 约束修复 self.constraint_repair(mutant) del mutant.fitness.values # 评估无效个体 invalid_ind = [ind for ind in offspring if not ind.fitness.valid] fitnesses = map(self.toolbox.evaluate, invalid_ind) for ind, fit in zip(invalid_ind, fitnesses): ind.fitness.values = fit # 精英策略:保留最好的个体 combined = population + offspring combined.sort(key=lambda x: x.fitness.values[0]) population = combined[:self.population_size] # 更新统计信息和名人堂 hall_of_fame.update(population) record = stats.compile(population) # 记录历史 best_individual = hall_of_fame[0] best_params = {self.param_names[i]: best_individual[i] for i in range(self.n_params)} history.append((best_params.copy(), best_individual.fitness.values[0])) if generation % 10 == 0 or generation == self.generations - 1: print(f"Generation {generation+1:3d}: " f"Min={record['min']:.6f}, " f"Avg={record['avg']:.6f}, " f"Std={record['std']:.6f}") elapsed_time = time.time() - start_time # 返回最佳结果 best_individual = hall_of_fame[0] best_params = {self.param_names[i]: best_individual[i] for i in range(self.n_params)} best_score = best_individual.fitness.values[0] return { 'best_params': best_params, 'best_score': best_score, 'history': history, 'total_evaluations': self.population_size * (1 + self.generations), 'elapsed_time': elapsed_time, 'hall_of_fame': [ { 'params': {self.param_names[i]: ind[i] for i in range(self.n_params)}, 'score': ind.fitness.values[0] } for ind in hall_of_fame ] } def save_results(result, output_file='genetic_optimization_results.json'): """保存优化结果""" print(f"Saving results to {output_file}") with open(output_file, 'w') as f: json.dump(result, f, indent=2) print(f"Results saved successfully") def main(): parser = argparse.ArgumentParser(description='Genetic Algorithm for TTA-E Parameter Optimization') parser.add_argument('--cache_file', type=str, default='base_predictions_cache.pkl', help='Base predictions cache file') parser.add_argument('--output_file', type=str, default='genetic_optimization_results.json', help='Output file for results') # 遗传算法参数 parser.add_argument('--population_size', type=int, default=50, help='Population size') parser.add_argument('--generations', type=int, default=100, help='Number of generations') parser.add_argument('--crossover_prob', type=float, default=0.8, help='Crossover probability') parser.add_argument('--mutation_prob', type=float, default=0.2, help='Mutation probability') parser.add_argument('--tournament_size', type=int, default=3, help='Tournament selection size') parser.add_argument('--elite_size', type=int, default=2, help='Elite size for preservation') args = parser.parse_args() print(f"{'='*60}") print("TTA-E Genetic Algorithm Parameter Optimization") print(f"{'='*60}") # 加载基础预测数据 base_predictions = load_base_predictions(args.cache_file) # 定义参数搜索空间 bounds = { 'tta_weight_0': (0.0, 2.0), # 原始样本权重 'tta_weight_1': (0.0, 2.0), # 噪声增强权重 'tta_weight_2': (0.0, 2.0), # 缩放增强权重 'tta_weight_3': (0.0, 2.0), # 偏移增强权重 'tta_weight_4': (0.0, 2.0), # 平滑增强权重 'gru_weight': (0.0, 1.0) # GRU模型权重 } print(f"Parameter search space:") for param, (low, high) in bounds.items(): print(f" {param}: [{low}, {high}]") print() # 创建遗传算法优化器 optimizer = GeneticOptimizer( bounds=bounds, population_size=args.population_size, generations=args.generations, crossover_prob=args.crossover_prob, mutation_prob=args.mutation_prob, tournament_size=args.tournament_size, elite_size=args.elite_size ) # 运行优化 result = optimizer.optimize(base_predictions) # 显示结果 print(f"\n{'='*60}") print("OPTIMIZATION RESULTS") print(f"{'='*60}") print(f"Best Score (PER): {result['best_score']:.6f}") print(f"Best Parameters:") for param, value in result['best_params'].items(): print(f" {param}: {value:.4f}") print(f"Total Evaluations: {result['total_evaluations']:,}") print(f"Elapsed Time: {result['elapsed_time']:.2f} seconds") print() print("Top 5 Solutions from Hall of Fame:") for i, solution in enumerate(result['hall_of_fame'][:5], 1): print(f"{i}. Score: {solution['score']:.6f}") params_str = ", ".join([f"{k}={v:.3f}" for k, v in solution['params'].items()]) print(f" Parameters: {params_str}") # 保存结果 save_results(result, args.output_file) print(f"\nOptimization completed successfully!") if __name__ == '__main__': main()