Deap源码学习
base模块
Toolbox类
Toolbox类是用来管理算子的,包含如下属性和方法:
- clone属性:对应的就是deepcopy函数。
- map属性:对应的就是map函数。
- register方法:这个方法是用来给Toolbox类添加属性(这个属性的值是函数)的。实现的关键使用了functool的partial函数,这个函数的作用是去将函数的部分参数给固定住。
- unregister方法:和register方法对应的方法,就是将制定名称的属性删除。
- decorate方法:就是将register得到的属性对应的函数用指定装饰器装饰。
Fitness类
Fitness类是用来管理个体的适应度值的,包含以下属性和方法:
- weights属性:Fitness类是不能直接使用的,必须创建它的子类,并且在创建Fitness类子类的过程中确定好weights属性。在设计上要求weights属性必须是一个元组,这时为了处理多目标的情形。 如果我们把values理解成个体在各个目标下对应的目标值,那么weights的作用就是将目标函数标定为适应度值wvalues。对于最小化问题,weights对应位置元素应该设置为负数,对于最大化问题weights 对应位置应该设置为正数,这样无论是最大化还是最小化目标函数,一个的好坏都可以通过判断wvalues是否更大得出。
- wvalues属性:设计上wvalues同样是一个元组,可以理解成各个目标标定后的适应度值,wvalues中的元素都是越大说明个体越好,这个属性会在设置values属性的时候被计算。
- values属性:设计上values同样是一个元组,并且大小和weight、wvalues属性应该匹配。可以理解为个体在各个目标下对应的目标值。
- valid属性:valid本身是一个方法,通过property装饰成为了一个属性,用于检查Fitness的实例的wvalues属性是不是空的。
- dominate方法:通过比较wvalues参数对应的每一个元素来检查两个Fitness实例的优良关系。当且仅当第一个实例的wvalues中每一个元素都不严格小于的第二个实例的wvalues对应元素且至少有一个元素严格大于时才返回真。
- 各种关系运算符对应的魔术方法:这里就是实现的逻辑时对Fitness类的wvalues属性进行大小比较。由于wvalues是元组,所以比较规则和元组之间的关系运算是一样的。
- 用于显示的魔术方法
一定要把目标函数和适应度值区分清楚,官方文档在这块搞得比较混乱,按理讲适应度值就得是越大越好的…
ConstrainedFitness类
这个类是Fitness类的子类,它是在Fitness类的基础上新增了一个交constraint_violation的实例属性,并重载了关系运算符的比较规则。
creator模块
这个模块的实现很有技术含量,但关键其实就是它里面的create函数,这个函数的功能是继承某个类并设置相应的类属性和实例属性得到一个新的类。这里属性的设置规则是,如果传递给create的参数是一个类,那么就会使用这个类创建一个实例属性,如果传递给create的参数是某个类的实例,那么就会创建对应的类属性。
create函数工作原理浅析create('Class2',Class1,param1=dict,param2=1)
等价于。
create函数工作原理浅析class Class2(Class1): params2 = 1 # 因为1是int类的实例,所以Class2类新添了一个类属性 def __init__(self): self.param1 = dict() # 因为dict是一个类,所以Class2使用dict添加了一个实例属性
tools模块
这个tools模块下边实际上还有非常多的子模块,不过作者已经在__init__模块中提前导入这些模块了,所以我们用tool模块的时候就写个import tools就可以用tools模块下的所有子模块了。
tools模块下的__init__.py文件from .constraint import * from .crossover import * from .emo import * from .indicator import * from .init import * from .migration import * from .mutation import * from .selection import * from .support import *
selection子模块
这个模块里面定义了常见的选择算子,由于选择后得到的交配池中可能有同样的父代个体,而交叉和编译算子都是就地操作,所以选择操作完了之后必须有Toolbox类的克隆方法深拷贝一下交配池中的每一个个体。
算子名称 | 选择策略 | 描述 | 适用场景 |
---|---|---|---|
selRandom |
随机选择 | 完全随机选择个体,无任何偏好 | 基线对比/多样性保持 |
selBest |
精英选择 | 直接选择适应度最高的前N个个体 | 保留最优解 |
selWorst |
反向选择 | 选择适应度最差的个体(常用于特殊用途) | 异常检测/反向优化 |
selRoulette |
轮盘赌选择 | 按适应度比例的概率选择(适应度越高选中概率越大) | 传统遗传算法 |
selTournament |
锦标赛选择 | 随机选取K个个体竞争,选择其中最优的 | 通用场景/平衡选择压力 |
selDoubleTournament |
双锦标赛选择 | 两阶段锦标赛:先按大小选择,再按适应度选择 | 多目标优化/兼顾多样性与收敛 |
selStochasticUniversalSampling |
随机通用采样(SUS) | 改进的轮盘赌,单轮均匀采样多个个体 | 减少轮盘赌的随机偏差 |
selLexicase |
字典序选择 | 按随机测试用例顺序筛选,适合多准则优化 | 程序合成/符号回归 |
selEpsilonLexicase |
ε-字典序选择 | Lexicase的宽松版本,允许ε误差范围内的个体通过 | 噪声环境下的多目标优化 |
selAutomaticEpsilonLexicase |
自适应ε-字典序选择 | 自动调整ε阈值的Lexicase变体 | 动态环境优化 |
crossover子模块
这个模块里边定义了常见的交叉算子,这些算子的一大特点就是就地操作。
算子名称 | 类型 | 描述 | 状态 |
---|---|---|---|
cxOnePoint |
单点交叉 | 在随机选择的一个位置切割两个父代染色体并交换片段 | 活跃 |
cxTwoPoint |
两点交叉 | 在随机选择的两点位置切割并交换中间片段 | 活跃 |
cxUniform |
均匀交叉 | 按概率逐个基因交换(类似遗传算法的均匀交叉) | 活跃 |
cxPartialyMatched (PMX) |
部分匹配 | 保持排列有效性的部分匹配交叉,适用于TSP问题 | 活跃 |
cxUniformPartialyMatched |
均匀PMX | PMX的均匀交叉变种 | 活跃 |
cxOrdered (OX) |
顺序交叉 | 保持相对顺序的顺序交叉,适用于排列编码 | 活跃 |
cxBlend |
混合交叉 | 对实值编码进行线性混合(α-混合) | 活跃 |
cxSimulatedBinary (SBX) |
模拟二进制 | 模拟二进制交叉的实值编码方法 | 活跃 |
cxSimulatedBinaryBounded |
有界SBX | 带边界约束的SBX变种 | 活跃 |
cxMessyOnePoint |
乱序单点 | 允许不等长基因组的单点交叉 | 活跃 |
cxESBlend |
ES混合 | 进化策略专用的混合交叉 | 活跃 |
cxESTwoPoint |
ES两点 | 进化策略专用的两点交叉 | 活跃 |
cxTwoPoints (旧名) |
两点交叉 | cxTwoPoint 的旧命名(功能相同) |
已弃用 |
cxESTwoPoints (旧名) |
ES两点 | cxESTwoPoint 的旧命名(功能相同) |
已弃用 |
mutation子模块
这个模块里边定义了常见的编译算子,这些算子的一大特点还是就地操作。
算子名称 | 适用编码类型 | 描述 | 典型应用场景 |
---|---|---|---|
mutGaussian |
实值编码 | 高斯(正态分布)变异,添加均值为0的随机噪声 | 连续优化问题 |
mutPolynomialBounded |
实值编码(边界约束) | 多项式变异,保持解在边界范围内 | 有约束的实数优化 |
mutShuffleIndexes |
排列编码 | 随机打乱部分基因的位置 | TSP/调度等排列问题 |
mutFlipBit |
二进制编码 | 按概率翻转位值(0↔︎1) | 二进制/布尔型基因 |
mutUniformInt |
整数编码 | 在指定范围内用均匀分布随机生成新整数值 | 离散整数优化 |
mutInversion |
排列编码 | 反转染色体中选定区间的基因顺序 | 保持排列有效性的变异 |
mutESLogNormal |
进化策略(ES) | 对数正态分布变异,常用于ES算法的策略参数更新 | 进化策略的自适应变异 |
init子模块
这个模块比较简单,包含3个函数。
- initRepeat:重复调用某个函数填充一个容器对象。
- initIterate:调用一个返回可迭代对象的函数填充一个容器对象。
- initCircle:重复依次调用一个函数序列中的每个函数填充一个容器对象。
constraint子模块
这个模块实现了两个装饰器,用于装饰evaluate函数(也就是计算目标函数值的那个算子)。其中DeltaPenalty的策略是如果个体可行,就使用原有的evaluate函数计算目标函数,否则将目标函数设置为参数delta,这里需要特别注意delta的取值,否则就不是惩罚而是奖励了,如果是max问题,delta应该选足够小的数,如果是min问题,delta应该取足够大的数。而ClosestValidPenalty的策略是如果个体可行,就使用原有的evaluate函数计算目标函数,否则就对该个体进行修复,让他变得尽可能可行,然后再用原有的evaluate函数计算目标函数。
support子模块
History类
这个类可以用来存储遗传算法运行过程中产生的所有个体,并记录每个个体对应的父代,还可以配合networkx库绘制家谱图。

Statistics类
这个类可以看作是一个统计工具,比如可以用它来统计遗传算法运行过程中每一代的平均适应度和最大适应度。这个类包含如下属性和方法:
- key属性:这个属性对应的是一个函数,用于对统计对象做预处理,默认情况下是不处理。
- functions属性:这个属性是一个字典,存放的是在Statistics类实例注册的函数。
- fields属性:实际上就是functions属性调用keys()函数的返回值。
- register方法:这个方法的实现逻辑和Toolbox类的register方法实现逻辑一样。
- compile方法:输入统计对象,这个方法会用key对统计对象先做一遍预处理,处理完之后依次调用被注册的函数,并把结果写到一个字典里面并返回。
MultiStatistics类
这个类是dict类的子类,它用于一次性创建多个具有不同预处理方式的Statistcs类,并提供了相应的register和compile方法,实际上,多次使用Statiscs类也能实现相同的效果。
Logbook类
Logbook类是lits类的子类,这个类用于在遗传算法中记录Statistics类得出的统计信息,并提供了对应的格式化输出。这个类包含如下属性和方法:
- headers属性:字符串的列表,用于设置所展示信息,展示顺序由sorted(headers)决定
- log_header属性:bool值。用于控制使用Logbook类的stream或__str__格式化输出记录结果时是否要输出标题行,也就是headers。
- record方法:简单将这个方法就是将输入的关键字参数彻底展开,整理成一个字典,然后再把这个字典存到Logbook类的实例(列表)中。
- select方法:这个方法和record方法相对,是将Logbook中所有指定键对应的值存到列表中返回,如果有多个指定键就返回一个列表元组。
- stream方法:格式化输出Logbook中存储的一次迭代的信息(对应一个字典),如果log_header为true,则第一次调用stream方法输出的是headers的格式化字符串。注意这个方法使用property装饰器装饰的,所以可以当成属性用。
- __txt__方法:这个方法是为stream方法和__str__方法服务的,实现比较复杂,但简单讲就是在讲信息格式化为字符串。
- __str__方法:这个方法再调用print函数时将直接把Logbook中的所有信息给格式化输出出来,如果log_header为true,第一行会是headers的格式化输出结果。
HallOfFame类
HallOfFame类的作用有点类似与gurobi当中的解池,它的主要属性和方法是:
- maxsize属性:设置解池的大小。
- update方法:传递当前的种群,用于更新HallOfFame的实例,实例中的个体会按照适应度值的优劣进行排序,fitness最好的个体会被放到HallOfFame的实例(一个可迭代对象)的最左边。
- remove方法:溢出某个个体。
- clear方法:清空HallOfFame实例。
algorithms模块
varAnd函数
这个函数是用来做遗传运算的,关键需要注意以下几点:
- 输入的population应该是经过选择操作得到的交配池,并且调用这个函数前不需要做深拷贝操作,这个函数会帮我们做。
- 输入的toolbox类实例对alias是有要求的,对于交叉操作alias要设置为mate,对于变异操作alias要设置成mutate。
- 这个函数内部已经帮我们删除发生variation的个体的fitness属性了。
varAnd函数源码def varAnd(population, toolbox, cxpb, mutpb): offspring = [toolbox.clone(ind) for ind in population] # 深拷贝一下交配池 # Apply crossover and mutation on the offspring for i in range(1, len(offspring), 2): if random.random() < cxpb: offspring[i - 1], offspring[i] = toolbox.mate(offspring[i - 1], offspring[i]) del offspring[i - 1].fitness.values, offspring[i].fitness.values # 后代已经改变了,删掉对应的fitness属性 for i in range(len(offspring)): if random.random() < mutpb: offspring[i], = toolbox.mutate(offspring[i]) del offspring[i].fitness.values # 后代已经改变了,删掉对应的fitness属性 return offspring
esSimple函数
eaSimple函数实现了最基础的遗传算法,关键需要注意以下几点:
- 输入的population是初始化种群,默认没有计算适应度值,eaSimple函数会帮我们做。
- toolbox的alias是有要求的,选择是select,交叉是mate,变异是mutate,计算目标函数值是evaluate。
- verbose参数控制的是要不要在运行过程中格式化输出Logbook类对象的记录结果。
eaSimple函数源码def eaSimple(population, toolbox, cxpb, mutpb, ngen, stats=None, halloffame=None, verbose=__debug__): logbook = tools.Logbook() # 创建Logbook实例,用于record用Statitics类计算得到的各代统计信息,并展示格式化输出结果 logbook.header = ['gen', 'nevals'] + (stats.fields if stats else []) # 设置Logbook格式化输出信息的标题行顺序 # Evaluate the individuals with an invalid fitness invalid_ind = [ind for ind in population if not ind.fitness.valid] # 输入的是初始化种群,他这里假设的是个体都的fitness属性都还是空的 fitnesses = toolbox.map(toolbox.evaluate, invalid_ind) # 计算目标函数值 for ind, fit in zip(invalid_ind, fitnesses): ind.fitness.values = fit # 设置values属性的时候就自动计算wvalues了 if halloffame is not None: halloffame.update(population) # 更新名人堂 record = stats.compile(population) if stats else {} # 如果给了Statistics对象就用Statistics对象统计得到一个字典。算”当前一代“的统计信息 logbook.record(gen=0, nevals=len(invalid_ind), **record) # 使用Logbook对象将record记录起来 if verbose: print(logbook.stream) # 这里就是打印标题头 # Begin the generational process for gen in range(1, ngen + 1): # Select the next generation individuals offspring = toolbox.select(population, len(population)) # 选择 # Vary the pool of individuals offspring = varAnd(offspring, toolbox, cxpb, mutpb) # 遗传运算,交叉和变异 # Evaluate the individuals with an invalid fitness invalid_ind = [ind for ind in offspring if not ind.fitness.valid] # 使用varAnd做交叉和变异会自动将发生变化的个体的fitness属性置为空,通过列表推导找到这个个体,补上fitness属性 fitnesses = toolbox.map(toolbox.evaluate, invalid_ind) for ind, fit in zip(invalid_ind, fitnesses): ind.fitness.values = fit # Update the hall of fame with the generated individuals if halloffame is not None: halloffame.update(offspring)# 更新名人堂 # Replace the current population by the offspring population[:] = offspring # 一代结束,更新种群 # Append the current generation statistics to the logbook record = stats.compile(population) if stats else {} # 计算当前一代的统计信息 logbook.record(gen=gen, nevals=len(invalid_ind), **record) # 记录当前一代的统计信息 if verbose: # 这里就是在打印标题头下面的信息了 print(logbook.stream) return population, logbook # 返回最终种群和Logbook类对象
案例
使用deap框架写遗传算法求解问题的基本思路:(1)先写一个类封装一下要解决的问题,这个类至少应该包含与这个问题有关的数据、计算目标函数的方法、计算约束违反的方法(如果存在违反的话);(2)写一个用于求解的类,封装一下遗传算法的相关参数,与(1)中提到的类进行交互,使用遗传算法完成求解。这个思路其实和用gurobi求解问题很想,gurobi中是写一个类封装数据,再写一个类封装求解器相关参数,两个类之间交互完成求解。
添加精英策略求解TSP问题
这个部分包含两个模块,tsp_problem模块的作用是将从TSPlib上获取标准算例,将tsp问题封装起来。tsp_solver模块实现了带精英策略的遗传算法。
精英策略
运用遗传算法时有一个技巧——绘制一下各代种群的平均目标函数值和最优目标函数值的图像,观察两个曲线趋于重合的速度,如果两个曲线很快就重合了,那就说明exploitation(早熟)多了,如果趋于重合的速度很慢就说明exploration(随机性太高)多了,要让遗传算法有好的性能必须得在exploration和exploitation之间折中。精英策略就是给好的个体留后门,让它们不参与选择过程直接进入交配池,显然这个策略的作用就是降低exploration。
tsp_problem模块#!/usr/bin/env python # -*- coding: utf-8 -*- # @Date : 2025-05-13 10:01:46 # @Author : syuansheng (Dalian Maritime University) import requests import gzip import io from math import sqrt from typing import List from matplotlib import pyplot as plt class TSP: """ 这个类的作用就是封装一个TSP问题,包含对应TSP问题的数据 """ def __init__(self,name:str=None): """ 读取name对应tsp问题的相关数据,来源:http://comopt.ifi.uni-heidelberg.de/software/TSPLIB95/tsp/ """ self.location = [] self.distance = [] self.size_of_tsp = 0 self._initial(name) def _initial(self,name): url = 'http://comopt.ifi.uni-heidelberg.de/software/TSPLIB95/tsp/'+name+'.tsp.gz' try: response = requests.get(url) # 发送请求 with gzip.GzipFile(fileobj=io.BytesIO(response.content)) as f: # GzipFile类模拟了 file object 的大部分方法 content = f.read().decode('utf-8') # 将文件内容解码为字符串 start_read = False for line in content.split('\n'): if line == 'DISPLAY_DATA_SECTION': start_read = True continue if line == 'EOF': break if start_read: self.location.append(list(map(float,line.strip().split()))[1:]) self.size_of_tsp = len(self.location) self.distance = [[round(sqrt((loc2[0]-loc1[0])**2+(loc2[1]-loc1[1])**2),2) for loc2 in self.location] for loc1 in self.location] print("成功读取算例:{}".format(name+'.tsp')) except requests.exceptions.ConnectionError as e: print('算例获取失败,请连接网络') def plot(self,solution:List=None): """ 可视化任意一个可行解对应的路径 """ try: if solution is None: solution = range(self.size_of_tsp) loc_in_order = [self.location[i] for i in solution] loc_in_order.append(self.location[solution[0]]) plt.close('all') fig,ax = plt.subplots() ax.scatter(*zip(*self.location),marker='o',color='red',s=5) # 使用*和zip函数解压嵌套列表:两个列表【1,2,3】,【4,5,6】变成【【1,4】,【2,5】,【3,6】】叫压缩,反之叫解压 ax.plot(*zip(*loc_in_order),lw=1,color='grey') # 折线上所有点有顺序的分成x和y给plot plt.show() except IndexError: print('无法绘制图像,请首先连接网络获取算例') def get_total_distance(self,solution:List=None): """ 计算任意一个可行解对应的目标函数值 """ try: if solution is None: solution = range(self.size_of_tsp) loc_in_order = [self.location[i] for i in solution] loc_in_order.append(self.location[solution[0]]) total_distance = 0 for i in range(self.size_of_tsp): loc1,loc2 = loc_in_order[i],loc_in_order[i+1] total_distance += sqrt((loc2[0]-loc1[0])**2+(loc2[1]-loc1[1])**2) return round(total_distance,2) except IndexError: print('无法计算目标函数值,请首先连接网络获取算例') __all__ = ['Tsp'] if __name__ == "__main__": # 测试 tsp = TSP('bayg29') best_solution = [0,27,11,5,20,4,8,25,2,28,1,19,9,3,14,17,13,16,21,10,18,12,23,7,26,22,6,24,15] print("测试可行解的目标函数值:{}".format(tsp.get_total_distance(best_solution))) tsp.plot(best_solution)
与文件读取有关的补充
这里用到了从互联网上获取文件的技术,补充一下。基本思路就是先用requests库对有你想要的文件的那个网址发送请求,一般返回给我们的内容都是字节。接着使用BytesIo类把返回回来的东西暂存到内存(和放磁盘不一样,放内存里面更快,适合暂存文件的场景)里面。然后在使用对应的类去处理这个文件,比如咱这里的文件是gz后缀的压缩文件,百度一下就知道可以用GzipFile类(实现了__enter__和__exit__魔术方法,是上下文对象)去处理。
tsp_solver模块#!/usr/bin/env python # -*- coding: utf-8 -*- # @Date : 2025-06-01 22:50:45 # @Author : syuansheng (Dalian Maritime University) import random import tsp_problem import numpy as np from deap import creator from deap.base import Fitness,Toolbox from deap.tools.support import Logbook,HallOfFame,Statistics from deap.tools.init import initRepeat,initIterate from deap.tools.selection import selTournament from deap.tools.crossover import cxOrdered from deap.tools.mutation import mutShuffleIndexes class TSPSolver: """ 基于遗传算法的TSP问题求解器 """ def __init__(self,POPULATION_SIZE:int,MAX_GENERATIONS:int,P_CROSSOVER:float,P_MUTATION:float): self.POPULATION_SIZE = POPULATION_SIZE # 种群规模 self.MAX_GENERATIONS = MAX_GENERATIONS # 最大迭代次数 self.P_CROSSOVER = P_CROSSOVER # 交叉概率 self.P_MUTATION = P_MUTATION # 变异概率 def solve(self,population,toolbox,stats,poolSolutions:int=1,verbose=True): """ 对deap的algorithms模块的eaSimple函数的复现,实现了精英策略 """ # 设置日志、名人堂(解池) logbook = Logbook() # Logbook的作用就是记录Statistics类的计算结果 logbook.headers = ['generation','nevals']+(stats.fields if stats else {}) # 设置一下logbook要展示什么,这里我们想展示当前的代数(generation)、当前交配池中发生交叉或变异的个体数量(nevals),和stats统计的信息,顺序由sorted(headers)决定 logbook.log_header = True # 是否展示表头(headers) halloffame = HallOfFame(maxsize=poolSolutions) # 将解池的大小设置为poolSolutions,默认只保留找到的解里面最好的那一个 # 计算适应度函数值,也就是Fitness类的wvalues属性 invalid_individual = [individual for individual in population if not individual.fitness.valid] # 筛选出来没有wvalues属性的个体 values = toolbox.map(toolbox.evaluate,invalid_individual) # 计算没有wvalues属性的个体对应的目标函数值values,结果是一个元组列表 for individual,value in zip(invalid_individual,values): individual.fitness.values = value # 设置values属性的同时会调用Fitness类的setValues方法将wvalues属性设置好,间源代码206行 # 更新名人堂(解池)并记录统计信息 halloffame.update(population) infos = stats.compile(population) # 返回结果是一个字典,装的是population在预处理后在各个register的函数下的取值 logbook.record(generation=0,nevals=len(invalid_individual),**infos) # 输入会被展平,作为一个整体的字典存在列表中(logbook集成的是列表嘛) # 打印求解进程 if verbose: print(logbook.stream) # stream方法用于格式化输出logbook中的一条信息,如果log_header=true,第一条信息会是headers,且由于该方法被property装饰器修饰,所以可以当成属性使用 for i in range(1,1+self.MAX_GENERATIONS): # 选择操作 offsprings = toolbox.select(population,self.POPULATION_SIZE-poolSolutions) # 留后门,让精英全部被选中 offsprings.extend(halloffame.items) # 这里写成offsprings=toolbox.clone(offsprings)就不对了,想一想为什么 https://pythontutor.com/ offsprings = [toolbox.clone(ind) for ind in offsprings] # 这里得深拷贝一下,因为编程上offsprings指向的还是population中的对象,而选择操作是可能把population中的某一个个体选择多次的,如果不深拷贝,就会相会影响 # 交叉操作 for j in range(1,self.POPULATION_SIZE,2): if random.random() < self.P_CROSSOVER: offsprings[j-1],offsprings[j] = toolbox.mate(offsprings[j-1],offsprings[j]) del offsprings[j-1].fitness.values,offsprings[j].fitness.values # 删除目标函数只values的同时会调用delValues方法删除wvalues属性 # 变异操作 for j in range(self.POPULATION_SIZE): if random.random() < self.P_MUTATION: offsprings[j], = toolbox.mutate(offsprings[j]) del offsprings[j].fitness.values # 删除目标函数只values的同时会调用delValues方法删除wvalues属性 # 计算后代的适应度函数值 invalid_individual = [ind for ind in offsprings if not ind.fitness.valid] values = toolbox.map(toolbox.evaluate,invalid_individual) for individual,value in zip(invalid_individual,values): individual.fitness.values = value # 更新种群 population[:] = offsprings # 更新解池,记录统计信息 halloffame.update(population) infos = stats.compile(population) logbook.record(generation=i,nevals=len(invalid_individual),**infos) # 打印求解进程 if verbose: print(logbook.stream) return population,logbook,halloffame if __name__ == '__main__': # 参数设置 POPULATION_SIZE = 500 MAX_GENERATIONS = 300 P_CROSSOVER = 0.9 P_MUTATION = 0.1 # 从Tsplib网站读取指定测试算例 tsp_problem = tsp_problem.TSP('bayg29') # 创建必要的类 creator.create('FitnessMin',Fitness,weights=(-1,)) # 创建一个叫FitnessMin的类,它继承自Fitness类,weights类属性被设置为(-1,),这个属性在用values计算wvalues时会被用到,对于min问题,weights对应元素应该设置为负值 creator.create('Individual',list,fitness=creator.FitnessMin) # 创建一个叫Individual的类,它继承自Lisr类,有一个实例属性fitness(它是FitnessMin类的实例) # 使用Toolbox类管理相关算子 toolbox = Toolbox() toolbox.register("randomSolution",random.sample,range(tsp_problem.size_of_tsp),tsp_problem.size_of_tsp) # 生成tsp的一个随机的可行解 toolbox.register("createIndividual",initIterate,creator.Individual,toolbox.randomSolution) # 创建个体 toolbox.register("initializePopulation",initRepeat,list,toolbox.createIndividual,POPULATION_SIZE) # 创建初始种群 toolbox.register("evaluate",lambda individual:(tsp_problem.get_total_distance(individual),)) # 目标函数计算 toolbox.register("select", selTournament, tournsize=3) # 选择方法 toolbox.register("mate", cxOrdered) # 交叉方法 toolbox.register("mutate", mutShuffleIndexes, indpb=1.0/tsp_problem.size_of_tsp) # 变异方法 # 设置如何计算统计信息 stats = Statistics(lambda ind: ind.fitness.values) # 预处理方式为取出目标函数值values stats.register("pavg", np.mean) stats.register("pmin", np.min) # 搭建求解器 tsp_solver = TSPSolver(POPULATION_SIZE,MAX_GENERATIONS,P_CROSSOVER,P_MUTATION) population,logbook,halloffame = tsp_solver.solve(toolbox.initializePopulation(),toolbox,poolSolutions=20,stats=stats) tsp_problem.plot(halloffame[0])
复现过程中遇到的一个与深拷贝相关的问题
对population做完选择操作后需要深拷贝一下,因为选择操作可能会把相同的individual选择到offsprings中,如果不深拷贝,对某一个个体的遗传运算就会导致另一个个体相应的发生改变,这是不对的,我们希望它们之间是独立的。开始我写的是offsprings=toolbox.clone(offsprings)
我以为这样就能让各个个体之间独立了(实际上不能),结果运行起来算法完全是在乱搜索。解释见下图,必须做元素级的深拷贝才能让每个个体都独立。

遗传算法求解带约束条件的问题
在用遗传算法求解带约束的问题时可以把约束分成两类:软约束和硬约束。软约束是指最好不要违反,但实在难以遵守时可以违反的约束,它的处理思路就是惩罚策略,即当违反软约束时给予目标函数一定的惩罚。而硬约束则指绝对不能违反的约束,一旦解违反了硬约束那它就是不可行的,处理硬约束一般有一下四种思路:
- 设计合适的编码方式和遗传算子使得约束始终不被违反
- 舍弃违反硬约束的解
- 修复违反硬约束的解
- 惩罚违反硬约束的解,当一个问题既有软约束又有硬约束时,对硬约束使用惩罚策略实际上是将其作为软约束对待,为了区分,必须使用更大的惩罚因子
这里以惩罚策略为例,求解护士排班问题(NSP)。
nsp_problem#!/usr/bin/env python # -*- coding: utf-8 -*- # @Date : 2025-06-07 18:44:26 # @Author : syuansheng (Dalian Maritime University) import random from typing import List,Dict class NSP: """ 封装护士排班问题,包含相关数据、目标函数、约束违反的计算方法 """ def __init__( self,namelst:List[str],preferlst:List[List],MaxShiftPerweek:int,MaxConsecShift:int,\ MinShiftNumConstr:List[int],MaxShiftNumConstr:List[int],WorkDayNum:int=7,\ Weeks2Schedule:int=1,DailyShiftNum:int=3,HardConstrPenalty:float=9999 ): """ namelst:员工姓名列表 preferlst:各个员工的班次偏好,shape是(员工人数,每日的班次数) MaxShiftPerweek:每个员工每周最多上几个班次 MaxConsecShift:一个员工最多连续上几个班次 MinShiftNumConstr:每个班次所需人数的下限,shape是(每日班次数,) MaxShiftNumConstr:每个班次所需人数的上线,shape是(每日班次,) WorkDayNum:每周上几天班,总规划班次=Weeks2Schedule*WorkDayNum*每日班次数 Weeks2Schedule:规划几周 HardConstrPenalty: 硬约束的惩罚系数 """ self.namelst = namelst self.preferlst = preferlst self.MaxShiftPerweek = MaxShiftPerweek self.MaxConsecShift = MaxConsecShift self.MinShiftNumConstr = MinShiftNumConstr self.MaxShiftNumConstr = MaxShiftNumConstr self.WorkDayNum = WorkDayNum self.Weeks2Schedule = Weeks2Schedule self.DailyShiftNum = DailyShiftNum self.TotalShiftNum = DailyShiftNum*WorkDayNum*Weeks2Schedule self.HardConstrPenalty = HardConstrPenalty def __len__(self): return len(self.namelst)*self.TotalShiftNum # 编码这个NSP问题所需要的二进制位数 def splitSolution(self,solution:List)->Dict[str,List]: """ solution是一个长度为self.__len__()的二进制序列,这个方法的作用是把这个二进制序列拆分为字典,键是员工姓名,值是员工的班次信息,长度是self.TotalShiftNum 相当于是编码空间转换到解空间 """ solution_dict = {} for idx,i in enumerate(range(0,self.__len__(),self.TotalShiftNum)): solution_dict[self.namelst[idx]] = solution[i:i+self.TotalShiftNum] return solution_dict def getPreferViolation(self,solution:List): violation = 0 for person,shifts in enumerate(self.splitSolution(solution).values()): for i in range(0,self.TotalShiftNum,self.WorkDayNum*self.DailyShiftNum): one_week_shifts = shifts[i:i+self.WorkDayNum*self.DailyShiftNum] for idx,shift in enumerate(one_week_shifts): preference = self.preferlst[person][idx%self.DailyShiftNum] if preference==0 and shift == 1: violation += 1 return violation def getMaxShiftPerweekViolation(self,solution:List): """ 每个员工每周上的班次数是由MaxShiftPerWeek限制的,计算这类违反的总数,要注意Weeks2Schedule!!!需要设计一个周拆分函数吗? """ violation = 0 for shifts in self.splitSolution(solution).values(): for i in range(0,self.TotalShiftNum,self.WorkDayNum*self.DailyShiftNum): one_week_shifts = shifts[i:i+self.WorkDayNum*self.DailyShiftNum] total_shift = sum(one_week_shifts) # 当前员工在当前一个week的总班次 if total_shift > self.MaxShiftPerweek: violation += total_shift-self.MaxShiftPerweek return violation def getMaxConsecShiftViolation(self,solution:List): """ 每个员工能连续上的班次受到MaxConsecShiftViolation限制,计算这类违反的总数,要注意Weeks2Schedule!!!需要设计一个周拆分函数吗? """ violation = 0 for shifts in self.splitSolution(solution).values(): for i in range(0,self.TotalShiftNum,self.WorkDayNum*self.DailyShiftNum): one_week_shifts = shifts[i:i+self.WorkDayNum*self.DailyShiftNum] match = list(zip(*[one_week_shifts[i:] for i in range(self.MaxConsecShift)])) # 每self.MaxConsecShift个shift打包成一个元组 for shift_tuple in match: if sum(shift_tuple) == self.MaxConsecShift: violation += 1 return violation def getShiftNumConstrViolation(self,solution:List): """ 每天各个班次的人数范围是有要求的,人数必须在MinShiftNumConstr和MaxShiftNumConstr """ violation = 0 total_per_shift = [sum(this_shift_tuple) for this_shift_tuple in zip(*self.splitSolution(solution).values())] # 计算每个班次对应的人数,长度(TotalShiftNum,) for idx,shift in enumerate(total_per_shift): # 当前班次人数下限 min_num = self.MinShiftNumConstr[idx%self.DailyShiftNum] max_num = self.MaxShiftNumConstr[idx%self.DailyShiftNum] if shift < min_num: violation += min_num - shift if shift > max_num: violation += shift - max_num return violation def getTotalCost(self,solution:List): """ 提供这个问题目标函数的计算接口 """ return self.HardConstrPenalty*(self.getMaxShiftPerweekViolation(solution)+self.getMaxConsecShiftViolation(solution)+\ self.getShiftNumConstrViolation(solution))+self.getPreferViolation(solution) if __name__ == '__main__': # test namelst = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'] preferlst = [[1, 0, 0], [1, 1, 0], [0, 0, 1], [0, 1, 0], [0, 0, 1], [1, 1, 1], [0, 1, 1], [1, 1, 1]] MaxShiftPerweek = 5 MaxConsecShift = 2 MinShiftNumConstr = [2, 2, 1] MaxShiftNumConstr = [3, 4, 2] nsp = NSP(namelst,preferlst,MaxShiftPerweek,MaxConsecShift,MinShiftNumConstr,MaxShiftNumConstr) randomSolution = [random.randint(0,1) for _ in range(len(nsp))] # 产生一个随机解 print("测试splitSolution效果:\n",nsp.splitSolution(randomSolution)) print("测试getMaxShiftPerweekViolation效果:\n",nsp.getMaxShiftPerweekViolation(randomSolution)) print("测试getMaxConsecShiftViolation效果:\n",nsp.getMaxConsecShiftViolation(randomSolution)) print("测试getShiftNumConstrViolation效果:\n",nsp.getShiftNumConstrViolation(randomSolution)) print("测试getPreferViolation效果:\n",nsp.getPreferViolation(randomSolution)) print("测试getTotalCost效果:\n",nsp.getTotalCost(randomSolution))
nsp_solver#!/usr/bin/env python # -*- coding: utf-8 -*- # @Date : 2025-06-08 15:46:29 # @Author : syuansheng (Dalian Maritime University) import random import nsp_problem import numpy as np from matplotlib import pyplot as plt from deap import creator from deap.base import Fitness,Toolbox from deap.tools.support import Logbook,HallOfFame,Statistics from deap.tools.init import initRepeat,initIterate from deap.tools.selection import selTournament from deap.tools.crossover import cxTwoPoint from deap.tools.mutation import mutFlipBit class NSPSolver: """ 基于遗传算法的TSP问题求解器 """ def __init__(self,POPULATION_SIZE:int,MAX_GENERATIONS:int,P_CROSSOVER:float,P_MUTATION:float): self.POPULATION_SIZE = POPULATION_SIZE # 种群规模 self.MAX_GENERATIONS = MAX_GENERATIONS # 最大迭代次数 self.P_CROSSOVER = P_CROSSOVER # 交叉概率 self.P_MUTATION = P_MUTATION # 变异概率 def solve(self,population,toolbox,stats,poolSolutions:int=1,verbose=True): """ 对deap的algorithms模块的eaSimple函数的复现,实现了精英策略 """ # 设置日志、名人堂(解池) logbook = Logbook() # Logbook的作用就是记录Statistics类的计算结果 logbook.headers = ['generation','nevals']+(stats.fields if stats else {}) # 设置一下logbook要展示什么,这里我们想展示当前的代数(generation)、当前交配池中发生交叉或变异的个体数量(nevals),和stats统计的信息,顺序由sorted(headers)决定 logbook.log_header = True # 是否展示表头(headers) halloffame = HallOfFame(maxsize=poolSolutions) # 将解池的大小设置为poolSolutions,默认只保留找到的解里面最好的那一个 # 计算适应度函数值,也就是Fitness类的wvalues属性 invalid_individual = [individual for individual in population if not individual.fitness.valid] # 筛选出来没有wvalues属性的个体 values = toolbox.map(toolbox.evaluate,invalid_individual) # 计算没有wvalues属性的个体对应的目标函数值values,结果是一个元组列表 for individual,value in zip(invalid_individual,values): individual.fitness.values = value # 设置values属性的同时会调用Fitness类的setValues方法将wvalues属性设置好,间源代码206行 # 更新名人堂(解池)并记录统计信息 halloffame.update(population) infos = stats.compile(population) # 返回结果是一个字典,装的是population在预处理后在各个register的函数下的取值 logbook.record(generation=0,nevals=len(invalid_individual),**infos) # 输入会被展平,作为一个整体的字典存在列表中(logbook集成的是列表嘛) # 打印求解进程 if verbose: print(logbook.stream) # stream方法用于格式化输出logbook中的一条信息,如果log_header=true,第一条信息会是headers,且由于该方法被property装饰器修饰,所以可以当成属性使用 for i in range(1,1+self.MAX_GENERATIONS): # 选择操作 offsprings = toolbox.select(population,self.POPULATION_SIZE-poolSolutions) # 留后门,让精英全部被选中 offsprings.extend(halloffame.items) # 这里写成offsprings=toolbox.clone(offsprings)就不对了,想一想为什么 https://pythontutor.com/ offsprings = [toolbox.clone(ind) for ind in offsprings] # 这里得深拷贝一下,因为编程上offsprings指向的还是population中的对象,而选择操作是可能把population中的某一个个体选择多次的,如果不深拷贝,就会相会影响 # 交叉操作 for j in range(1,self.POPULATION_SIZE,2): if random.random() < self.P_CROSSOVER: offsprings[j-1],offsprings[j] = toolbox.mate(offsprings[j-1],offsprings[j]) del offsprings[j-1].fitness.values,offsprings[j].fitness.values # 删除目标函数只values的同时会调用delValues方法删除wvalues属性 # 变异操作 for j in range(self.POPULATION_SIZE): if random.random() < self.P_MUTATION: offsprings[j], = toolbox.mutate(offsprings[j]) del offsprings[j].fitness.values # 删除目标函数只values的同时会调用delValues方法删除wvalues属性 # 计算后代的适应度函数值 invalid_individual = [ind for ind in offsprings if not ind.fitness.valid] values = toolbox.map(toolbox.evaluate,invalid_individual) for individual,value in zip(invalid_individual,values): individual.fitness.values = value # 更新种群 population[:] = offsprings # 更新解池,记录统计信息 halloffame.update(population) infos = stats.compile(population) logbook.record(generation=i,nevals=len(invalid_individual),**infos) # 打印求解进程 if verbose: print(logbook.stream) return population,logbook,halloffame def check(self,ax,pavg,pbest): """ 绘制各代平均目标函数值和最优目标函数值曲线,判断exploration和exploitation的程度 """ le1, = ax.plot(range(self.MAX_GENERATIONS+1),pavg,color='blue') le2, = ax.plot(range(self.MAX_GENERATIONS+1),pbest,color='green') ax.set_xlabel('Generation') ax.set_ylabel('objval') ax.legend([le1,le2],['pavg','pbest']) if __name__ == '__main__': # 参数设置 POPULATION_SIZE = 300 MAX_GENERATIONS = 1000 P_CROSSOVER = 0.9 P_MUTATION = 0.2 namelst = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H'] preferlst = [[1, 0, 0], [1, 1, 0], [0, 0, 1], [0, 1, 0], [0, 0, 1], [1, 1, 1], [0, 1, 1], [1, 1, 1]] MaxShiftPerweek = 5 MaxConsecShift = 2 MinShiftNumConstr = [2, 2, 1] MaxShiftNumConstr = [3, 4, 2] nsp_problem = nsp_problem.NSP(namelst,preferlst,MaxShiftPerweek,MaxConsecShift,MinShiftNumConstr,MaxShiftNumConstr) # 创建必要的类 creator.create('FitnessMin',Fitness,weights=(-1,)) # 创建一个叫FitnessMin的类,它继承自Fitness类,weights类属性被设置为(-1,),这个属性在用values计算wvalues时会被用到,对于min问题,weights对应元素应该设置为负值 creator.create('Individual',list,fitness=creator.FitnessMin) # 创建一个叫Individual的类,它继承自Lisr类,有一个实例属性fitness(它是FitnessMin类的实例) # 使用Toolbox类管理相关算子 toolbox = Toolbox() toolbox.register("randomSolution",lambda:[random.randint(0,1) for _ in range(len(nsp_problem))]) # 生成nsp的一个随机的可行解 toolbox.register("createIndividual",initIterate,creator.Individual,toolbox.randomSolution) # 创建个体 toolbox.register("initializePopulation",initRepeat,list,toolbox.createIndividual,POPULATION_SIZE) # 创建初始种群 toolbox.register("evaluate",lambda individual:(nsp_problem.getTotalCost(individual),)) # 目标函数计算 toolbox.register("select", selTournament, tournsize=2) # 选择方法 toolbox.register("mate",cxTwoPoint) # 交叉方法 toolbox.register("mutate", mutFlipBit, indpb=1.0/len(nsp_problem)) # 变异方法 # 设置如何计算统计信息 stats = Statistics(lambda ind: ind.fitness.values) # 预处理方式为取出目标函数值values stats.register("pavg", np.mean) stats.register("pmin", np.min) # 搭建求解器 nsp_solver = NSPSolver(POPULATION_SIZE,MAX_GENERATIONS,P_CROSSOVER,P_MUTATION) population,logbook,halloffame = nsp_solver.solve(toolbox.initializePopulation(),toolbox,poolSolutions=30,stats=stats) # 打印求解结果 print('-'*20+'最优排班'+'-'*20) print(nsp_problem.splitSolution(halloffame[0])) print('偏好违反数:{}'.format(nsp_problem.getPreferViolation(halloffame[0]))) print('硬约束违反数:{}'.format(nsp_problem.getTotalCost(halloffame[0])-nsp_problem.getPreferViolation(halloffame[0]))) print('-'*44) # 检查exploitation和exploration pavg,pbest = logbook.select('pavg','pmin') fig,ax = plt.subplots() nsp_solver.check(ax,pavg,pbest) plt.show()