Files
b2txt25/TTA-E/genetic_search.py

462 lines
17 KiB
Python
Raw Normal View History

2025-10-06 15:17:44 +08:00
#!/usr/bin/env python3
"""
遗传算法参数优化 - 使用DEAP库实现
针对TTA-E神经解码的参数搜索优化
"""
import os
import sys
import numpy as np
import pickle
import argparse
import time
import json
from typing import Dict, List, Tuple
import random
# DEAP库
from deap import algorithms, base, creator, tools
# GPU加速支持
try:
import cupy as cp
GPU_AVAILABLE = True
print("GPU acceleration available with CuPy")
except ImportError:
import numpy as cp
GPU_AVAILABLE = False
print("Using CPU computation with NumPy")
# 设置随机种子
random.seed(42)
np.random.seed(42)
def to_cpu(x):
"""将CuPy数组转换为NumPy数组"""
if GPU_AVAILABLE and hasattr(x, 'get'):
return x.get()
return x
def load_base_predictions(cache_file='base_predictions_cache.pkl'):
"""加载预计算的基础预测结果"""
if os.path.exists(cache_file):
print(f"Loading base predictions from {cache_file}")
with open(cache_file, 'rb') as f:
return pickle.load(f)
else:
print(f"Cache file {cache_file} not found. Generating mock data...")
return generate_mock_predictions()
def generate_mock_predictions():
"""生成模拟预测数据用于测试"""
print("Generating mock base predictions for testing...")
n_trials = 20 # 模拟20个试验
seq_len = 40 # 序列长度
vocab_size = 31 # 词汇表大小
n_tta = 5 # TTA样本数
base_predictions = []
np.random.seed(42) # 固定种子获得一致的结果
for trial in range(n_trials):
# 生成真实字符序列
true_chars = np.random.randint(0, vocab_size, seq_len)
# 模拟GRU和LSTM的概率预测 - 加入一些真实性
gru_probs = np.random.rand(n_tta, seq_len, vocab_size)
lstm_probs = np.random.rand(n_tta, seq_len, vocab_size)
# 让概率偏向真实答案,提高一些准确性
for i in range(seq_len):
for tta in range(n_tta):
# 给真实答案更高的概率
gru_probs[tta, i, true_chars[i]] += 2.0
lstm_probs[tta, i, true_chars[i]] += 2.0
# 归一化为概率分布
gru_probs = gru_probs / np.sum(gru_probs, axis=2, keepdims=True)
lstm_probs = lstm_probs / np.sum(lstm_probs, axis=2, keepdims=True)
base_predictions.append({
'gru_probs': gru_probs,
'lstm_probs': lstm_probs,
'true_chars': true_chars
})
return base_predictions
def calculate_per(predicted, true):
"""计算音素错误率 (PER)"""
if len(predicted) == 0 and len(true) == 0:
return 0.0
if len(predicted) == 0 or len(true) == 0:
return 1.0
# 简单的字符级编辑距离
n, m = len(predicted), len(true)
dp = np.zeros((n + 1, m + 1))
for i in range(n + 1):
dp[i][0] = i
for j in range(m + 1):
dp[0][j] = j
for i in range(1, n + 1):
for j in range(1, m + 1):
if predicted[i-1] == true[j-1]:
dp[i][j] = dp[i-1][j-1]
else:
dp[i][j] = 1 + min(dp[i-1][j], dp[i][j-1], dp[i-1][j-1])
return dp[n][m] / max(n, m)
def evaluate_individual(individual, base_predictions_data):
"""评估个体适应度返回PER错误率越小越好"""
try:
# 解析个体参数
tta_weights = individual[:5] # 前5个参数是TTA权重
gru_weight = individual[5] # 第6个参数是GRU权重
# 转换为GPU数组如果可用
if GPU_AVAILABLE:
tta_weights = cp.array(tta_weights)
else:
tta_weights = np.array(tta_weights)
total_per = 0.0
total_chars = 0
for trial_data in base_predictions_data:
gru_probs = trial_data['gru_probs'] # shape: (5, seq_len, vocab_size)
lstm_probs = trial_data['lstm_probs'] # shape: (5, seq_len, vocab_size)
true_chars = trial_data['true_chars']
# 转换为GPU数组
if GPU_AVAILABLE:
gru_probs = cp.asarray(gru_probs)
lstm_probs = cp.asarray(lstm_probs)
# 计算TTA加权平均
tta_sum = cp.sum(tta_weights)
if tta_sum > 0:
tta_weights_norm = tta_weights / tta_sum
else:
tta_weights_norm = cp.ones_like(tta_weights) / len(tta_weights)
# 对每个TTA样本加权
gru_weighted = cp.sum(gru_probs * tta_weights_norm[:, None, None], axis=0)
lstm_weighted = cp.sum(lstm_probs * tta_weights_norm[:, None, None], axis=0)
# 模型集成
ensemble_probs = gru_weighted * gru_weight + lstm_weighted * (1 - gru_weight)
# 解码预测
if GPU_AVAILABLE:
predicted_chars = cp.argmax(ensemble_probs, axis=1)
predicted_chars = to_cpu(predicted_chars)
else:
predicted_chars = np.argmax(ensemble_probs, axis=1)
# 计算PER
per = calculate_per(predicted_chars, true_chars)
total_per += per * len(true_chars)
total_chars += len(true_chars)
avg_per = total_per / total_chars if total_chars > 0 else 1.0
return (avg_per,) # DEAP需要返回元组
except Exception as e:
print(f"Error in evaluate_individual: {e}")
return (1.0,) # 返回最差分数
class GeneticOptimizer:
"""基于DEAP的遗传算法优化器"""
def __init__(self,
bounds,
population_size=50,
generations=100,
crossover_prob=0.8,
mutation_prob=0.2,
tournament_size=3,
elite_size=2):
self.bounds = bounds
self.population_size = population_size
self.generations = generations
self.crossover_prob = crossover_prob
self.mutation_prob = mutation_prob
self.tournament_size = tournament_size
self.elite_size = elite_size
# 参数边界
self.param_names = list(bounds.keys())
self.n_params = len(self.param_names)
self.lower_bounds = [bounds[name][0] for name in self.param_names]
self.upper_bounds = [bounds[name][1] for name in self.param_names]
self.setup_deap()
def setup_deap(self):
"""设置DEAP框架"""
# 清理之前的类定义
if hasattr(creator, "FitnessMin"):
del creator.FitnessMin
if hasattr(creator, "Individual"):
del creator.Individual
# 创建适应度类最小化PER
creator.create("FitnessMin", base.Fitness, weights=(-1.0,))
creator.create("Individual", list, fitness=creator.FitnessMin)
# 创建工具箱
self.toolbox = base.Toolbox()
# 注册属性生成器 - 确保在范围内
def create_bounded_uniform(low, high):
def bounded_uniform():
return random.uniform(low, high)
return bounded_uniform
for i, (name, (low, high)) in enumerate(zip(self.param_names, zip(self.lower_bounds, self.upper_bounds))):
self.toolbox.register(f"attr_{name}", create_bounded_uniform(low, high))
# 注册个体和种群生成器
self.toolbox.register("individual", tools.initCycle, creator.Individual,
[getattr(self.toolbox, f"attr_{name}") for name in self.param_names], n=1)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
# 注册遗传操作
self.toolbox.register("mate", tools.cxBlend, alpha=0.3) # 混合交叉减小alpha
self.toolbox.register("mutate", self.gaussian_mutation) # 高斯变异
self.toolbox.register("select", tools.selTournament, tournsize=self.tournament_size) # 锦标赛选择
self.toolbox.register("evaluate", self.evaluate_wrapper)
self.toolbox.register("clone", tools.clone)
def evaluate_wrapper(self, individual):
"""评估函数包装器"""
return evaluate_individual(individual, self.base_predictions)
def gaussian_mutation(self, individual):
"""高斯变异操作"""
for i in range(len(individual)):
if random.random() < self.mutation_prob:
# 高斯变异标准差为参数范围的10%
sigma = (self.upper_bounds[i] - self.lower_bounds[i]) * 0.1
individual[i] += random.gauss(0, sigma)
# 边界处理 - 硬约束
individual[i] = max(self.lower_bounds[i], min(self.upper_bounds[i], individual[i]))
return (individual,)
def constraint_repair(self, individual):
"""约束修复 - 确保所有参数在合理范围内"""
for i in range(len(individual)):
individual[i] = max(self.lower_bounds[i], min(self.upper_bounds[i], individual[i]))
return individual
def optimize(self, base_predictions):
"""运行遗传算法优化"""
self.base_predictions = base_predictions
print(f"Starting Genetic Algorithm Optimization")
print(f"Population size: {self.population_size}")
print(f"Generations: {self.generations}")
print(f"Crossover probability: {self.crossover_prob}")
print(f"Mutation probability: {self.mutation_prob}")
print(f"Tournament size: {self.tournament_size}")
start_time = time.time()
# 创建初始种群
population = self.toolbox.population(n=self.population_size)
# 确保初始种群在边界内
for individual in population:
self.constraint_repair(individual)
# 评估初始种群
fitnesses = list(map(self.toolbox.evaluate, population))
for ind, fit in zip(population, fitnesses):
ind.fitness.values = fit
# 统计信息
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("min", np.min)
stats.register("max", np.max)
stats.register("std", np.std)
# 名人堂(保存最佳个体)
hall_of_fame = tools.HallOfFame(maxsize=10)
# 记录历史
history = []
print(f"Initial population evaluated. Best fitness: {min(fitnesses)[0]:.6f}")
# 进化过程
for generation in range(self.generations):
# 选择下一代的父代
offspring = self.toolbox.select(population, len(population))
offspring = list(map(self.toolbox.clone, offspring))
# 交叉和变异
for child1, child2 in zip(offspring[::2], offspring[1::2]):
if random.random() < self.crossover_prob:
self.toolbox.mate(child1, child2)
# 约束修复
self.constraint_repair(child1)
self.constraint_repair(child2)
del child1.fitness.values
del child2.fitness.values
for mutant in offspring:
if random.random() < self.mutation_prob:
self.toolbox.mutate(mutant)
# 约束修复
self.constraint_repair(mutant)
del mutant.fitness.values
# 评估无效个体
invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
fitnesses = map(self.toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# 精英策略:保留最好的个体
combined = population + offspring
combined.sort(key=lambda x: x.fitness.values[0])
population = combined[:self.population_size]
# 更新统计信息和名人堂
hall_of_fame.update(population)
record = stats.compile(population)
# 记录历史
best_individual = hall_of_fame[0]
best_params = {self.param_names[i]: best_individual[i] for i in range(self.n_params)}
history.append((best_params.copy(), best_individual.fitness.values[0]))
if generation % 10 == 0 or generation == self.generations - 1:
print(f"Generation {generation+1:3d}: "
f"Min={record['min']:.6f}, "
f"Avg={record['avg']:.6f}, "
f"Std={record['std']:.6f}")
elapsed_time = time.time() - start_time
# 返回最佳结果
best_individual = hall_of_fame[0]
best_params = {self.param_names[i]: best_individual[i] for i in range(self.n_params)}
best_score = best_individual.fitness.values[0]
return {
'best_params': best_params,
'best_score': best_score,
'history': history,
'total_evaluations': self.population_size * (1 + self.generations),
'elapsed_time': elapsed_time,
'hall_of_fame': [
{
'params': {self.param_names[i]: ind[i] for i in range(self.n_params)},
'score': ind.fitness.values[0]
}
for ind in hall_of_fame
]
}
def save_results(result, output_file='genetic_optimization_results.json'):
"""保存优化结果"""
print(f"Saving results to {output_file}")
with open(output_file, 'w') as f:
json.dump(result, f, indent=2)
print(f"Results saved successfully")
def main():
parser = argparse.ArgumentParser(description='Genetic Algorithm for TTA-E Parameter Optimization')
parser.add_argument('--cache_file', type=str, default='base_predictions_cache.pkl',
help='Base predictions cache file')
parser.add_argument('--output_file', type=str, default='genetic_optimization_results.json',
help='Output file for results')
# 遗传算法参数
parser.add_argument('--population_size', type=int, default=50,
help='Population size')
parser.add_argument('--generations', type=int, default=100,
help='Number of generations')
parser.add_argument('--crossover_prob', type=float, default=0.8,
help='Crossover probability')
parser.add_argument('--mutation_prob', type=float, default=0.2,
help='Mutation probability')
parser.add_argument('--tournament_size', type=int, default=3,
help='Tournament selection size')
parser.add_argument('--elite_size', type=int, default=2,
help='Elite size for preservation')
args = parser.parse_args()
print(f"{'='*60}")
print("TTA-E Genetic Algorithm Parameter Optimization")
print(f"{'='*60}")
# 加载基础预测数据
base_predictions = load_base_predictions(args.cache_file)
# 定义参数搜索空间
bounds = {
'tta_weight_0': (0.0, 2.0), # 原始样本权重
'tta_weight_1': (0.0, 2.0), # 噪声增强权重
'tta_weight_2': (0.0, 2.0), # 缩放增强权重
'tta_weight_3': (0.0, 2.0), # 偏移增强权重
'tta_weight_4': (0.0, 2.0), # 平滑增强权重
'gru_weight': (0.0, 1.0) # GRU模型权重
}
print(f"Parameter search space:")
for param, (low, high) in bounds.items():
print(f" {param}: [{low}, {high}]")
print()
# 创建遗传算法优化器
optimizer = GeneticOptimizer(
bounds=bounds,
population_size=args.population_size,
generations=args.generations,
crossover_prob=args.crossover_prob,
mutation_prob=args.mutation_prob,
tournament_size=args.tournament_size,
elite_size=args.elite_size
)
# 运行优化
result = optimizer.optimize(base_predictions)
# 显示结果
print(f"\n{'='*60}")
print("OPTIMIZATION RESULTS")
print(f"{'='*60}")
print(f"Best Score (PER): {result['best_score']:.6f}")
print(f"Best Parameters:")
for param, value in result['best_params'].items():
print(f" {param}: {value:.4f}")
print(f"Total Evaluations: {result['total_evaluations']:,}")
print(f"Elapsed Time: {result['elapsed_time']:.2f} seconds")
print()
print("Top 5 Solutions from Hall of Fame:")
for i, solution in enumerate(result['hall_of_fame'][:5], 1):
print(f"{i}. Score: {solution['score']:.6f}")
params_str = ", ".join([f"{k}={v:.3f}" for k, v in solution['params'].items()])
print(f" Parameters: {params_str}")
# 保存结果
save_results(result, args.output_file)
print(f"\nOptimization completed successfully!")
if __name__ == '__main__':
main()