目录
目录README.md

通过EdgeMoe 优化deepseek

首先,实现层次存储技术。通过修改现有代码,使非专家权重存储在设备内存中,而专家权重存储在外部存储中,仅在需要时加载到内存。实现步骤如下:

  1. 创建 ExpertStorage 类来管理外部存储:
import torch
import os

class ExpertStorage:
    def __init__(self, config, storage_path):
        self.config = config
        self.storage_path = storage_path
        os.makedirs(storage_path, exist_ok=True)

    def save_expert(self, expert_id, expert_state):
        path = os.path.join(self.storage_path, f"expert_{expert_id}.pt")
        torch.save(expert_state, path)

    def load_expert(self, expert_id):
        path = os.path.join(self.storage_path, f"expert_{expert_id}.pt")
        return torch.load(path, map_location='cpu')
  1. 修改 DeepseekMoE 类以使用 ExpertStorage
class DeepseekMoE(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.num_experts_per_tok = config.num_experts_per_tok
        self.n_routed_experts = config.n_routed_experts
        self.gate = MoEGate(config)
        
        # 创建专家存储
        self.expert_storage = ExpertStorage(config, "expert_storage")
        
        # 创建专家缓冲区
        self.expert_buffer = {}
        
        # 创建非专家权重
        if config.n_shared_experts is not None:
            intermediate_size = config.moe_intermediate_size * config.n_shared_experts
            self.shared_experts = DeepseekMLP(config=config, intermediate_size=intermediate_size)
    
    def _create_expert(self):
        return DeepseekMLP(self.config, intermediate_size=self.config.moe_intermediate_size)

    def _get_expert(self, expert_id):
        if expert_id not in self.expert_buffer:
            # 如果专家不在缓冲区中,从外部存储加载
            expert_state = self.expert_storage.load_expert(expert_id)
            expert = self._create_expert()
            expert.load_state_dict(expert_state)
            self.expert_buffer[expert_id] = expert
        return self.expert_buffer[expert_id]

    def forward(self, hidden_states):
        identity = hidden_states
        orig_shape = hidden_states.shape
        topk_idx, topk_weight, aux_loss = self.gate(hidden_states)
        hidden_states = hidden_states.view(-1, hidden_states.shape[-1])
        flat_topk_idx = topk_idx.view(-1)
        
        outputs = []
        for i in range(self.n_routed_experts):
            mask = (flat_topk_idx == i)
            if mask.any():
                expert = self._get_expert(i)
                expert_output = expert(hidden_states[mask])
                outputs.append((mask, expert_output))
        
        y = torch.zeros_like(hidden_states)
        for mask, expert_output in outputs:
            y[mask] = expert_output
        
        y = (y.view(*topk_weight.shape, -1) * topk_weight.unsqueeze(-1)).sum(dim=1)
        y = y.view(*orig_shape)
        
        if self.config.n_shared_experts is not None:
            y = y + self.shared_experts(identity)
        
        return y
  1. 修改 DeepseekModel 类以支持专家的初始化和存储:
class DeepseekModel(DeepseekPreTrainedModel):
    def __init__(self, config: DeepseekConfig):
        super().__init__(config)
        # ...(其他初始化代码保持不变)
        
        # 初始化并存储专家
        self._init_and_store_experts()

    def _init_and_store_experts(self):
        for layer in self.layers:
            if isinstance(layer.mlp, DeepseekMoE):
                for i in range(layer.mlp.n_routed_experts):
                    expert = layer.mlp._create_expert()
                    expert_state = expert.state_dict()
                    layer.mlp.expert_storage.save_expert(i, expert_state)
                # 清除内存中的专家
                layer.mlp.expert_buffer.clear()

    # ...(其他方法保持不变)
  1. 修改 DeepseekForCausalLM 类以确保在加载模型时正确处理专家:
class DeepseekForCausalLM(DeepseekPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)
        self.model = DeepseekModel(config)
        # ...(其他初始化代码保持不变)

    @classmethod
    def from_pretrained(cls, pretrained_model_name_or_path, *model_args, **kwargs):
        model = super().from_pretrained(pretrained_model_name_or_path, *model_args, **kwargs)
        # 确保专家被正确初始化和存储
        model.model._init_and_store_experts()
        return model

    # ...(其他方法保持不变)

这些修改实现了以下功能:

  1. 非专家权重(如共享专家和其他模型组件)保存在设备内存中。
  2. 专家权重被存储在外部存储(在此实现中为文件系统)中。
  3. 专家仅在需要时加载到内存中的专家缓冲区。
  4. 使用了一个简单的缓冲策略,即保留所有已加载的专家。在实际应用中,可能需要实现更复杂的缓冲策略,如 LRU(最近最少使用)策略。

该实现提供了层次存储的基本框架。在实际应用中,可能需要进一步优化,例如:

  • 实现更高效的外部存储机制(如使用内存映射文件或数据库)。
  • 优化专家的加载和卸载策略,以平衡内存使用和计算效率。
  • 考虑并发访问和线程安全性。
  • 实现错误处理和恢复机制。

请注意,这些更改可能会影响模型的性能和内存使用。可能需要进行进一步的测试和优化,以确保在特定用例中达到最佳性能。


接下来,实现专家位宽自适应技术。该实现包括以下几个部分:

  1. 创建一个量化函数,将权重量化为 INT2/4/8:
import torch

def quantize_weights(weights, bits):
    if bits not in [2, 4, 8]:
        raise ValueError("Bits must be 2, 4, or 8")
    
    # 计算量化范围
    min_val, max_val = weights.min(), weights.max()
    scale = (max_val - min_val) / (2**bits - 1)
    zero_point = min_val

    # 量化
    quantized = torch.round((weights - zero_point) / scale).clamp(0, 2**bits - 1).to(torch.int8)

    # 反量化(用于模拟量化效果)
    dequantized = scale * quantized + zero_point

    return quantized, scale, zero_point, dequantized
  1. 修改 DeepseekMLP 类以支持量化权重:
class QuantizedDeepseekMLP(nn.Module):
    def __init__(self, config, hidden_size=None, intermediate_size=None):
        super().__init__()
        self.config = config
        self.hidden_size = hidden_size if hidden_size is not None else config.hidden_size
        self.intermediate_size = intermediate_size if intermediate_size is not None else config.intermediate_size
        self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
        self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
        self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False)
        self.act_fn = ACT2FN[config.hidden_act]
        
        self.quantized = False
        self.quantization_bits = None
        self.quantized_weights = {}
        self.scales = {}
        self.zero_points = {}

    def quantize(self, bits):
        self.quantized = True
        self.quantization_bits = bits
        
        for name, param in self.named_parameters():
            if 'weight' in name:
                quantized, scale, zero_point, _ = quantize_weights(param.data, bits)
                self.quantized_weights[name] = quantized
                self.scales[name] = scale
                self.zero_points[name] = zero_point

    def forward(self, x):
        if not self.quantized:
            return super().forward(x)
        
        def dequantize(name):
            return self.scales[name] * self.quantized_weights[name] + self.zero_points[name]
        
        gate_output = F.linear(x, dequantize('gate_proj.weight'))
        up_output = F.linear(x, dequantize('up_proj.weight'))
        
        intermediate_output = self.act_fn(gate_output) * up_output
        down_output = F.linear(intermediate_output, dequantize('down_proj.weight'))
        
        return down_output
  1. 实现一个离线过程,逐步降低专家的位宽:
import copy

def adaptive_quantization(model, dataset, tolerance, initial_bits=8):
    original_accuracy = evaluate_model(model, dataset)
    print(f"Original accuracy: {original_accuracy}")

    quantized_experts = {}
    for layer_idx, layer in enumerate(model.model.layers):
        if isinstance(layer.mlp, DeepseekMoE):
            for expert_idx in range(layer.mlp.n_routed_experts):
                expert = layer.mlp._get_expert(expert_idx)
                best_bits = initial_bits
                best_accuracy = original_accuracy

                for bits in [8, 4, 2]:
                    quantized_expert = copy.deepcopy(expert)
                    quantized_expert.quantize(bits)
                    
                    # 替换原始专家进行评估
                    layer.mlp.expert_buffer[expert_idx] = quantized_expert
                    accuracy = evaluate_model(model, dataset)
                    
                    if accuracy >= best_accuracy - tolerance:
                        best_bits = bits
                        best_accuracy = accuracy
                    else:
                        break
                
                print(f"Layer {layer_idx}, Expert {expert_idx}: Best bits = {best_bits}, Accuracy = {best_accuracy}")
                quantized_experts[(layer_idx, expert_idx)] = (best_bits, best_accuracy)
                
                # 恢复原始专家
                layer.mlp.expert_buffer[expert_idx] = expert

    return quantized_experts

def evaluate_model(model, dataset):
    # 实现模型评估逻辑,返回准确率
    # 需要根据具体任务和数据集实现
    pass
  1. 修改 DeepseekMoE 类以使用量化后的专家:
class DeepseekMoE(nn.Module):
    def __init__(self, config):
        # ...(其他初始化保持不变)
        self.quantized_bits = {}

    def _create_expert(self):
        return QuantizedDeepseekMLP(self.config, intermediate_size=self.config.moe_intermediate_size)

    def _get_expert(self, expert_id):
        if expert_id not in self.expert_buffer:
            expert_state = self.expert_storage.load_expert(expert_id)
            expert = self._create_expert()
            expert.load_state_dict(expert_state)
            if expert_id in self.quantized_bits:
                expert.quantize(self.quantized_bits[expert_id])
            self.expert_buffer[expert_id] = expert
        return self.expert_buffer[expert_id]

    def set_quantization_bits(self, expert_id, bits):
        self.quantized_bits[expert_id] = bits
        if expert_id in self.expert_buffer:
            self.expert_buffer[expert_id].quantize(bits)

使用该实现,可以按以下步骤进行专家位宽自适应:

  1. 训练原始模型。
  2. 使用 adaptive_quantization 函数确定每个专家的最佳量化位宽。
  3. 使用得到的量化位宽信息设置模型中每个专家的量化位宽。

示例:

model = DeepseekForCausalLM.from_pretrained("path_to_your_model")
dataset = load_your_dataset()  # 加载数据集

quantized_experts = adaptive_quantization(model, dataset, tolerance=0.01)

for (layer_idx, expert_idx), (bits, accuracy) in quantized_experts.items():
    model.model.layers[layer_idx].mlp.set_quantization_bits(expert_idx, bits)

# 保存量化后的模型
model.save_pretrained("path_to_save_quantized_model")

该实现提供了专家位宽自适应的基本框架。在实际应用中,可能需要进一步优化和调整,例如:

  • 实现更复杂的量化策略,如对称量化或非线性量化。
  • 优化评估过程,可能使用更小的验证集以加速。
  • 实现更复杂的搜索策略,寻找最佳的量化位宽组合。
  • 考虑不同层和不同类型参数(如注意力权重与前馈网络权重)的量化敏感度。

请注意,该实现可能会显著增加模型的复杂性和训练/推理时间。需要根据具体需求和硬件限制权衡精度和效率。


最后,实现专家管理技术,包括以下几个部分:

  1. 创建专家激活预测模型。
import torch
import torch.nn as nn

class ExpertActivationPredictor(nn.Module):
    def __init__(self, num_layers, num_experts):
        super().__init__()
        self.predictor = nn.Linear(num_experts, num_experts)
        self.num_layers = num_layers

    def forward(self, previous_activations):
        return torch.sigmoid(self.predictor(previous_activations))

    def train_predictor(self, activation_data):
        # 实现训练逻辑
        pass
  1. 修改 DeepseekMoE 类以包含专家管理功能:
class DeepseekMoE(nn.Module):
    def __init__(self, config, layer_idx):
        super().__init__()
        self.config = config
        self.layer_idx = layer_idx
        self.num_experts_per_tok = config.num_experts_per_tok
        self.n_routed_experts = config.n_routed_experts
        self.gate = MoEGate(config)
        
        self.expert_storage = ExpertStorage(config, f"expert_storage_layer_{layer_idx}")
        self.expert_buffer = {}
        self.expert_predictor = ExpertActivationPredictor(config.num_hidden_layers, self.n_routed_experts)
        
        self.max_buffer_size = config.max_expert_buffer_size
        self.activation_history = {}
        
        if config.n_shared_experts is not None:
            intermediate_size = config.moe_intermediate_size * config.n_shared_experts
            self.shared_experts = QuantizedDeepseekMLP(config=config, intermediate_size=intermediate_size)

    def _create_expert(self):
        return QuantizedDeepseekMLP(self.config, intermediate_size=self.config.moe_intermediate_size)

    def _get_expert(self, expert_id):
        if expert_id not in self.expert_buffer:
            if len(self.expert_buffer) >= self.max_buffer_size:
                self._evict_expert()
            expert_state = self.expert_storage.load_expert(expert_id)
            expert = self._create_expert()
            expert.load_state_dict(expert_state)
            if expert_id in self.quantized_bits:
                expert.quantize(self.quantized_bits[expert_id])
            self.expert_buffer[expert_id] = expert
        return self.expert_buffer[expert_id]

    def _evict_expert(self):
        # 实现缓存逐出策略
        expert_to_evict = min(self.activation_history, key=self.activation_history.get)
        del self.expert_buffer[expert_to_evict]
        del self.activation_history[expert_to_evict]

    def preload_experts(self, previous_activations):
        predicted_activations = self.expert_predictor(previous_activations)
        top_k_experts = predicted_activations.topk(self.num_experts_per_tok).indices
        for expert_id in top_k_experts:
            self._get_expert(expert_id.item())

    def update_activation_history(self, activated_experts):
        for expert_id in activated_experts:
            if expert_id in self.activation_history:
                self.activation_history[expert_id] += 1
            else:
                self.activation_history[expert_id] = 1

    def forward(self, hidden_states, previous_activations=None):
        if previous_activations is not None:
            self.preload_experts(previous_activations)

        identity = hidden_states
        orig_shape = hidden_states.shape
        topk_idx, topk_weight, aux_loss = self.gate(hidden_states)
        hidden_states = hidden_states.view(-1, hidden_states.shape[-1])
        flat_topk_idx = topk_idx.view(-1)
        
        outputs = []
        activated_experts = set()
        for i in range(self.n_routed_experts):
            mask = (flat_topk_idx == i)
            if mask.any():
                expert = self._get_expert(i)
                expert_output = expert(hidden_states[mask])
                outputs.append((mask, expert_output))
                activated_experts.add(i)
        
        self.update_activation_history(activated_experts)
        
        y = torch.zeros_like(hidden_states)
        for mask, expert_output in outputs:
            y[mask] = expert_output
        
        y = (y.view(*topk_weight.shape, -1) * topk_weight.unsqueeze(-1)).sum(dim=1)
        y = y.view(*orig_shape)
        
        if self.config.n_shared_experts is not None:
            y = y + self.shared_experts(identity)
        
        return y, torch.tensor([1 if i in activated_experts else 0 for i in range(self.n_routed_experts)])
  1. 修改 DeepseekModel 类以支持专家管理:
class DeepseekModel(DeepseekPreTrainedModel):
    def __init__(self, config: DeepseekConfig):
        super().__init__(config)
        # ...(其他初始化代码保持不变)
        
        self.layers = nn.ModuleList([DeepseekDecoderLayer(config, idx) for idx in range(config.num_hidden_layers)])
        
        # 初始化并存储专家
        self._init_and_store_experts()

    def _init_and_store_experts(self):
        for layer in self.layers:
            if isinstance(layer.mlp, DeepseekMoE):
                for i in range(layer.mlp.n_routed_experts):
                    expert = layer.mlp._create_expert()
                    expert_state = expert.state_dict()
                    layer.mlp.expert_storage.save_expert(i, expert_state)
                # 清除内存中的专家
                layer.mlp.expert_buffer.clear()

    def forward(self, input_ids, attention_mask=None, position_ids=None, past_key_values=None, inputs_embeds=None, use_cache=None, output_attentions=None, output_hidden_states=None, return_dict=None):
        # ...(保持其他代码不变)
        
        previous_activations = None
        for idx, decoder_layer in enumerate(self.layers):
            # ...(保持其他代码不变)
            
            if isinstance(decoder_layer.mlp, DeepseekMoE):
                layer_outputs, current_activations = decoder_layer.mlp(layer_outputs, previous_activations)
                previous_activations = current_activations
            else:
                layer_outputs = decoder_layer.mlp(layer_outputs)
            
            # ...(保持其他代码不变)
        
        # ...(保持其他代码不变)
  1. 实现离线训练过程以训练专家激活预测模型:
def train_expert_activation_predictors(model, dataset):
    activation_data = []
    
    # 收集激活数据
    model.eval()
    with torch.no_grad():
        for batch in dataset:
            inputs = batch['input_ids']
            outputs = model(inputs)
            activation_data.append(outputs.expert_activations)
    
    # 训练预测器
    for layer in model.model.layers:
        if isinstance(layer.mlp, DeepseekMoE):
            layer_activation_data = [data[layer.mlp.layer_idx] for data in activation_data]
            layer.mlp.expert_predictor.train_predictor(layer_activation_data)

# 使用示例
model = DeepseekForCausalLM.from_pretrained("path_to_your_model")
dataset = load_your_dataset()  # 加载数据集

train_expert_activation_predictors(model, dataset)

# 保存训练后的模型
model.save_pretrained("path_to_save_model_with_predictors")

该实现提供了专家管理技术的基本框架,包括:

  • 专家激活预测模型。
  • 专家预加载机制。
  • 简单的缓存逐出策略。

在实际应用中,可能需要进一步优化和调整,例如:

  • 实现更复杂的专家激活预测模型,使用更高级的机器学习技术。
  • 优化预加载策略,考虑预加载的成本和收益。
  • 实现更复杂的缓存逐出策略,考虑更多因素,如专家的大小、计算复杂度等。
  • 优化 I/O 和计算的并行性,以最大化效率。

请注意,该实现增加了模型的复杂性,可能会影响推理速度。需要根据具体需求和硬件限制权衡预加载带来的好处和额外的复杂性。

关于

OpenHarmony是由开放原子开源基金会(OpenAtom Foundation)孵化及运营的开源项目,目标是面向全场景、全连接、全智能时代,基于开源的方式,搭建一个智能终端设备操作系统的框架和平台,促进万物互联产业的繁荣发展。

13.6 MB
邀请码
    Gitlink(确实开源)
  • 加入我们
  • 官网邮箱:gitlink@ccf.org.cn
  • QQ群
  • QQ群
  • 公众号
  • 公众号

©Copyright 2023 CCF 开源发展委员会
Powered by Trustie& IntelliDE 京ICP备13000930号