Hunyuan.py vs hunyuan_modeling.py
563 lines
# coding=utf-8
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
# Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved.
#
#
# Licensed under the TENCENT HUNYUAN COMMUNITY LICENSE AGREEMENT (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://github.com/Tencent/Tencent-Hunyuan-Large/blob/main/License.docx
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
""" PyTorch HunYuan model."""
""" PyTorch HunYuan model."""
import math
import math
import warnings
import warnings
from typing import List, Optional, Tuple, Union
from typing import List, Optional, Tuple, Union
import torch
import torch
from torch import Tensor
from torch import Tensor
import torch.nn.functional as F
import torch.nn.functional as F
import torch.utils.checkpoint
import torch.utils.checkpoint
from torch import nn
from torch import nn
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from transformers.activations import ACT2FN
from transformers.activations import ACT2FN
from transformers.cache_utils import Cache, DynamicCache
from transformers.cache_utils import Cache, DynamicCache
from transformers.modeling_attn_mask_utils import (
from transformers.modeling_attn_mask_utils import (
AttentionMaskConverter,
AttentionMaskConverter,
_prepare_4d_attention_mask,
_prepare_4d_attention_mask,
_prepare_4d_causal_attention_mask,
_prepare_4d_causal_attention_mask,
_prepare_4d_causal_attention_mask_for_sdpa,
_prepare_4d_causal_attention_mask_for_sdpa,
)
)
from transformers.modeling_outputs import (
from transformers.modeling_outputs import (
BaseModelOutputWithPast,
BaseModelOutputWithPast,
CausalLMOutputWithPast,
CausalLMOutputWithPast,
SequenceClassifierOutputWithPast
SequenceClassifierOutputWithPast
)
)
from transformers.modeling_utils import PreTrainedModel
from transformers.modeling_utils import PreTrainedModel
from transformers.pytorch_utils import ALL_LAYERNORM_LAYERS, is_torch_greater_or_equal_than_1_13
from transformers.pytorch_utils import ALL_LAYERNORM_LAYERS, is_torch_greater_or_equal_than_1_13
from transformers.utils import (
from transformers.utils import (
add_start_docstrings,
add_start_docstrings,
add_start_docstrings_to_model_forward,
add_start_docstrings_to_model_forward,
is_flash_attn_2_available,
is_flash_attn_2_available,
is_flash_attn_greater_or_equal_2_10,
is_flash_attn_greater_or_equal_2_10,
logging,
logging,
replace_return_docstrings,
replace_return_docstrings,
)
)
from transformers.utils.import_utils import is_torch_fx_available
from transformers.utils.import_utils import is_torch_fx_available
from transformers.generation.utils import GenerateOutput
from .configuration_hunyuan import HunYuanConfig
from .configuration_hunyuan import HunYuanConfig
from .modeling_hunyuan import HunYuanDecoderLayer, HunYuanRMSNorm
if is_flash_attn_2_available():
if is_flash_attn_2_available():
from flash_attn import flash_attn_func, flash_attn_varlen_func
from flash_attn import flash_attn_func, flash_attn_varlen_func
from flash_attn.bert_padding import index_first_axis, pad_input, unpad_input # noqa
from flash_attn.bert_padding import index_first_axis, pad_input, unpad_input # noqa
# This makes `_prepare_4d_causal_attention_mask` a leaf function in the FX graph.
# This makes `_prepare_4d_causal_attention_mask` a leaf function in the FX graph.
# It means that the function will not be traced through and simply appear as a node in the graph.
# It means that the function will not be traced through and simply appear as a node in the graph.
if is_torch_fx_available():
if is_torch_fx_available():
if not is_torch_greater_or_equal_than_1_13:
if not is_torch_greater_or_equal_than_1_13:
import torch.fx
import torch.fx
_prepare_4d_causal_attention_mask = torch.fx.wrap(_prepare_4d_causal_attention_mask)
_prepare_4d_causal_attention_mask = torch.fx.wrap(_prepare_4d_causal_attention_mask)
logger = logging.get_logger(__name__)
_CONFIG_FOR_DOC = "HunYuanConfig"
_CONFIG_FOR_DOC = "HunYuanConfig"
HUNYUAN_START_DOCSTRING = r"""
def topkgating(logits: Tensor, topk: int):
This model inherits from [`PreTrainedModel`]. Check the superclass documentation for the generic methods the
logits = logits.float()
library implements for all its model (such as downloading or saving, resizing the input embeddings, pruning heads
gates = F.softmax(logits, dim=1)
etc.)
# expert_capacity = topk * gates.shape[0]
expert_capacity = max(topk, topk * gates.shape[0] // gates.shape[1])
num_experts = int(gates.shape[1])
# Top-k router probability and corresponding expert indices for each token.
# Shape: [tokens_per_group, num_selected_experts].
expert_gate, expert_index = torch.topk(gates, topk)
expert_mask = F.one_hot(expert_index, num_experts)
# For a given token, determine if it was routed to a given expert.
# Shape: [tokens_per_group, num_experts]
expert_mask_aux = expert_mask.max(dim=-2)[0]
tokens_per_group_and_expert = torch.mean(expert_mask_aux.float(), dim=-2)
router_prob_per_group_and_expert = torch.mean(gates.float(), dim=-2)
l_aux = num_experts**2 * torch.mean(tokens_per_group_and_expert * router_prob_per_group_and_expert)
This model is also a PyTorch [torch.nn.Module](https://pytorch.org/docs/stable/nn.html#torch.nn.Module) subclass.
gates_s = torch.clamp(
Use it as a regular PyTorch Module and refer to the PyTorch documentation for all matter related to general usage
torch.matmul(expert_mask.float(), gates.unsqueeze(-1)).sum(dim=1), min=torch.finfo(gates.dtype).eps
and behavior.
)
router_probs = gates / gates_s
# Make num_selected_experts the leading axis to ensure that top-1 choices
# have priority over top-2 choices, which have priority over top-3 choices,
# etc.
expert_index = torch.transpose(expert_index, 0, 1)
# Shape: [num_selected_experts * tokens_per_group]
expert_index = expert_index.reshape(-1)
Parameters:
# Create mask out of indices.
config ([`HunYuanConfig`]):
# Shape: [tokens_per_group * num_selected_experts, num_experts].
Model configuration class with all the parameters of the model. Initializing with a config file does not
expert_mask = F.one_hot(expert_index, num_experts).to(torch.int32)
load the weights associated with the model, only the configuration. Check out the
exp_counts = torch.sum(expert_mask, dim=0).detach()
[`~PreTrainedModel.from_pretrained`] method to load the model weights.
"""
# Experts have a fixed capacity that we cannot exceed. A token's priority
# within the expert's buffer is given by the masked, cumulative capacity of
# its target expert.
# Shape: [tokens_per_group * num_selected_experts, num_experts].
token_priority = torch.cumsum(expert_mask, dim=0) * expert_mask - 1
# Shape: [num_selected_experts, tokens_per_group, num_experts].
token_priority = token_priority.reshape((topk, -1, num_experts))
# Shape: [tokens_per_group, num_selected_experts, num_experts].
token_priority = torch.transpose(token_priority, 0, 1)
# For each token, across all selected experts, select the only non-negative
# (unmasked) priority. Now, for group G routing to expert E, token T has
# non-negative priority (i.e. token_priority[G,T,E] >= 0) if and only if E
# is its targeted expert.
# Shape: [tokens_per_group, num_experts].
token_priority = torch.max(token_priority, dim=1)[0]
@add_start_docstrings(
# Token T can only be routed to expert E if its priority is positive and
"The bare HunYuan Model outputting raw hidden-states without any specific head on top.",
# less than the expert capacity. One-hot matrix will ignore indices outside
HUNYUAN_START_DOCSTRING,
# the range [0, expert_capacity).
)
# Shape: [tokens_per_group, num_experts, expert_capacity].
class HunYuanPreTrainedModel(PreTrainedModel):
valid_mask = torch.logical_and(token_priority >= 0, token_priority < expert_capacity)
config_class = HunYuanConfig
token_priority = torch.masked_fill(token_priority, ~valid_mask, 0)
base_model_prefix = "model"
dispatch_mask = F.one_hot(token_priority, expert_capacity).to(torch.bool)
supports_gradient_checkpointing = True
valid_mask = valid_mask.unsqueeze(-1).expand(-1, -1, expert_capacity)
_no_split_modules = ["HunYuanDecoderLayer"]
dispatch_mask = torch.masked_fill(dispatch_mask, ~valid_mask, 0)
_skip_keys_device_placement = "past_key_values"
_supports_flash_attn_2 = True
_supports_sdpa = True
_supports_cache_class = True
def _init_weights(self, module):
# The combine array will be used for combining expert outputs, scaled by the
std = self.config.initializer_range
# router probabilities. Shape: [num_groups, tokens_per_group, num_experts,
if isinstance(module, nn.Linear):
# expert_capacity].
module.weight.data.normal_(mean=0.0, std=std)
combine_weights = torch.einsum("...te,...tec->...tec", router_probs, dispatch_mask)
if module.bias is not None:
exp_counts_capacity = torch.sum(dispatch_mask)
module.bias.data.zero_()
exp_capacity_rate = exp_counts_capacity / (logits.shape[0]*topk)
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=std)
return [l_aux, exp_capacity_rate], combine_weights, dispatch_mask, exp_counts
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
HUNYUAN_INPUTS_DOCSTRING = r"""
def top1gating(logits: Tensor, random_routing_dropped_token: bool = False):
Args:
"""Implements Top1Gating on logits."""
input_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`):
# everything is in fp32 in this function
Indices of input sequence tokens in the vocabulary. Padding will be ignored by default should you provide
logits = logits.float()
it.
gates = F.softmax(logits, dim=1)
capacity = gates.shape[0]
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
# Create a mask for 1st's expert per token
[`PreTrainedTokenizer.__call__`] for details.
# noisy gating
indices1_s = torch.argmax(gates, dim=1)
num_experts = int(gates.shape[1])
mask1 = F.one_hot(indices1_s, num_classes=num_experts)
[What are input IDs?](../glossary#input-ids)
# gating decisions
attention_mask (`torch.Tensor` of shape `(batch_size, sequence_length)`, *optional*):
# exp_counts = torch.sum(mask1, dim=0).detach().to('cpu')
Mask to avoid performing attention on padding token indices. Mask values selected in `[0, 1]`:
exp_counts = torch.sum(mask1, dim=0).detach()
- 1 for tokens that are **not masked**,
# Compute l_aux
- 0 for tokens that are **masked**.
me = torch.mean(gates, dim=0)
ce = torch.mean(mask1.float(), dim=0)
l_aux = torch.sum(me * ce) * num_experts
mask1_rand = mask1
[What are attention masks?](../glossary#attention-mask)
top_idx = torch.topk(mask1_rand, k=capacity, dim=0)[1]
Indices can be obtained using [`AutoTokenizer`]. See [`PreTrainedTokenizer.encode`] and
new_mask1 = mask1 * torch.zeros_like(mask1).scatter_(0, top_idx, 1)
[`PreTrainedTokenizer.__call__`] for details.
mask1 = new_mask1
mask1_bk = mask1
if random_routing_dropped_token:
not_full = capacity - new_mask1.sum(dim=0)
sorted_notfull, indices_notfull = torch.sort(not_full, descending=True)
sorted_notfull = sorted_notfull.to(torch.int64)
not_full_experts_ids = torch.repeat_interleave(indices_notfull, sorted_notfull)
shuffle_not_full_ids = torch.randperm(not_full_experts_ids.shape[0])
not_full_experts_ids = not_full_experts_ids[shuffle_not_full_ids]
indices1_s_after_drop = torch.argmax(new_mask1, dim=1)
# get drop idx
drop_mask = 1 - new_mask1.sum(dim=1)
drop_mask = drop_mask.bool()
drop_idx = drop_mask.nonzero().view(-1)
drop_num = drop_mask.sum().to(torch.int64)
indices1_s_after_drop.scatter_(0, drop_idx, not_full_experts_ids[:drop_num])
nodrop_mask1 = F.one_hot(indices1_s_after_drop, num_classes=num_experts)
mask1 = nodrop_mask1
If `past_key_values` is used, optionally only the last `input_ids` have to be input (see
# Compute locations in capacity buffer
`past_key_values`).
locations1 = torch.cumsum(mask1, dim=0) - 1
If you want to change padding behavior, you should read [`modeling_opt._prepare_decoder_attention_mask`]
# Store the capacity location for each token
and modify to your needs. See diagram 1 in [the paper](https://arxiv.org/abs/1910.13461) for more
locations1_s = torch.sum(locations1 * mask1, dim=1)
information on the default strategy.
- 1 indicates the head is **not masked**,
# Normalize gate probabilities
- 0 indicates the head is **masked**.
mask1_float = mask1.float()
position_ids (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
gates = gates * mask1_float
Indices of positions of each input sequence tokens in the position embeddings. Selected in the range `[0,
config.n_positions - 1]`.
[What are position IDs?](../glossary#position-ids)
locations1_sc = F.one_hot(locations1_s, num_classes=capacity).float() # one hot to float
past_key_values (`Cache` or `tuple(tuple(torch.FloatTensor))`, *optional*):
combine_weights = torch.einsum("se,sc->sec", gates, locations1_sc)
Pre-computed hidden-states (key and values in the self-attention blocks and in the cross-attention
blocks) that can be used to speed up sequential decoding. This typically consists in the `past_key_values`
returned by the model at a previous stage of decoding, when `use_cache=True` or `config.use_cache=True`.
Two formats are allowed:
dispatch_mask = combine_weights.bool()
- a [`~cache_utils.Cache`] instance;
- Tuple of `tuple(torch.FloatTensor)` of length `config.n_layers`, with each tuple having 2 tensors of
shape `(batch_size, num_heads, sequence_length, embed_size_per_head)`). This is also known as the legacy
cache format.
The model will output the same cache format that is fed as input. If no `past_key_values` are passed, the
exp_counts_capacity = torch.sum(mask1_bk)
legacy cache format will be returned.
exp_capacity_rate = exp_counts_capacity / (logits.shape[0])
return [l_aux, exp_capacity_rate], combine_weights, dispatch_mask, exp_counts
If `past_key_values` are used, the user can optionally input only the last `input_ids` (those that don't
have their past key value states given to this model) of shape `(batch_size, 1)` instead of all `input_ids`
def _get_unpad_data(attention_mask):
of shape `(batch_size, sequence_length)`.
seqlens_in_batch = attention_mask.sum(dim=-1, dtype=torch.int32)
inputs_embeds (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`, *optional*):
indices = torch.nonzero(attention_mask.flatten(), as_tuple=False).flatten()
Optionally, instead of passing `input_ids` you can choose to directly pass an embedded representation. This
max_seqlen_in_batch = seqlens_in_batch.max().item()
is useful if you want more control over how to convert `input_ids` indices into associated vectors than the
cu_seqlens = F.pad(torch.cumsum(seqlens_in_batch, dim=0, dtype=torch.torch.int32), (1, 0))
model's internal embedding lookup matrix.
return (
use_cache (`bool`, *optional*):
indices,
If set to `True`, `past_key_values` key value states are returned and can be used to speed up decoding (see
cu_seqlens,
`past_key_values`).
max_seqlen_in_batch,
output_attentions (`bool`, *optional*):
)
Whether or not to return the attentions tensors of all attention layers. See `attentions` under returned
tensors for more detail.
output_hidden_states (`bool`, *optional*):
Whether or not to return the hidden states of all layers. See `hidden_states` under returned tensors for
more detail.
return_dict (`bool`, *optional*):
Whether or not to return a [`~utils.ModelOutput`] instead of a plain tuple.
"""
@add_start_docstrings(
def _expand_mask(mask: torch.Tensor, dtype: torch.dtype, tgt_len: Optional[int] = None):
"The bare HunYuan Model outputting raw hidden-states without any specific head on top.",
warnings.warn(
HUNYUAN_START_DOCSTRING,
"Calling `transformers.models.llama.modeling_llama._prepare_4d_attention_mask` is deprecated and will be "
)
"removed in v4.37. Use `transformers.modeling_attn_mask_utils._prepare_4d_attention_mask"
class HunYuanModel(HunYuanPreTrainedModel):
)
"""
return _prepare_4d_attention_mask(mask=mask, dtype=dtype, tgt_len=tgt_len)
Transformer decoder consisting of *config.num_hidden_layers* layers. Each layer is a [`HunYuanDecoderLayer`]
Args:
config: HunYuanConfig
"""
def __init__(self, config: HunYuanConfig):
def _make_causal_mask(
super().__init__(config)
input_ids_shape: torch.Size, dtype: torch.dtype, device: torch.device, past_key_values_length: int = 0
self.padding_idx = config.pad_token_id
):
self.vocab_size = config.vocab_size
warnings.warn(
self.add_classification_head = config.add_classification_head
"Calling `transformers.models.llama.modeling_llama._make_causal_mask` is deprecated and will be removed in "
self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx)
"v4.37. Use `transformers.models.llama.modeling_llama.AttentionMaskConverter._make_causal_mask"
self.layers = nn.ModuleList(
)
[HunYuanDecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)]
return AttentionMaskConverter._make_causal_mask(
input_ids_shape=input_ids_shape, dtype=dtype, device=device, past_key_values_length=past_key_values_length
)
class HunYuanRMSNorm(nn.Module):
def __init__(self, hidden_size, eps=1e-6):
"""
HunYuanRMSNorm is equivalent to T5LayerNorm
"""
super().__init__()
self.weight = nn.Parameter(torch.ones(hidden_size))
self.variance_epsilon = eps
def forward(self, hidden_states):
input_dtype = hidden_states.dtype
hidden_states = hidden_states.to(torch.float32)
variance = hidden_states.pow(2).mean(-1, keepdim=True)
hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
return self.weight * hidden_states.to(input_dtype)
ALL_LAYERNORM_LAYERS.append(HunYuanRMSNorm)
class HunYuanRotaryEmbedding(nn.Module):
def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None):
super().__init__()
self.dim = dim
self.max_position_embeddings = max_position_embeddings
self.base = base
inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2).float().to(device) / self.dim))
# inv_freq = inv_freq.bfloat16()
self.register_buffer("inv_freq", inv_freq, persistent=False)
# Build here to make `torch.jit.trace` work.
self._set_cos_sin_cache(
seq_len=max_position_embeddings, device=self.inv_freq.device, dtype=torch.get_default_dtype()
)
)
self._use_sdpa = config._attn_implementation == "sdpa"
self._use_flash_attention_2 = config._attn_implementation == "flash_attention_2"
if not config.add_classification_head:
self.norm = HunYuanRMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.cla = config.use_cla
self.cla_share_factor = config.cla_share_factor
self.gradient_checkpointing = False
def _set_cos_sin_cache(self, seq_len, device, dtype):
# Initialize weights and apply final processing
self.max_seq_len_cached = seq_len
self.post_init()
t = torch.arange(self.max_seq_len_cached, device=device, dtype=torch.float32)
def get_input_embeddings(self):
self.inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2).float().to(device) / self.dim))
return self.embed_tokens
freqs = torch.outer(t, self.inv_freq)
# Different from paper, but it uses a different permutation in order to obtain the same calculation
emb = torch.cat((freqs, freqs), dim=-1).float()
self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)
def set_input_embeddings(self, value):
def forward(self, x, seq_len=None):
self.embed_tokens = value
# x: [bs, num_attention_heads, seq_len, head_size]
if seq_len > self.max_seq_len_cached or self.inv_freq.dtype != torch.float32:
self._set_cos_sin_cache(seq_len=seq_len, device=x.device, dtype=x.dtype)
@add_start_docstrings_to_model_forward(HUNYUAN_INPUTS_DOCSTRING)
return (
def forward(
self.cos_cached[:seq_len].to(dtype=x.dtype),
self,
self.sin_cached[:seq_len].to(dtype=x.dtype),
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[List[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, BaseModelOutputWithPast]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
)
use_cache = use_cache if use_cache is not None else self.config.use_cache
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# retrieve input_ids and inputs_embeds
class HunYuanLinearScalingRotaryEmbedding(HunYuanRotaryEmbedding):
# if input_ids is not None and inputs_embeds is not None:
"""HunYuanRotaryEmbedding extended with linear scaling. Credits to the Reddit user /u/kaiokendev"""
# raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
if input_ids is not None:
batch_size, seq_length = input_ids.shape[:2]
elif inputs_embeds is not None:
batch_size, seq_length = inputs_embeds.shape[:2]
else:
raise ValueError("You have to specify either input_ids or inputs_embeds")
if self.gradient_checkpointing and self.training:
def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None, scaling_factor=1.0):
if use_cache:
self.scaling_factor = scaling_factor
logger.warning_once(
super().__init__(dim, max_position_embeddings, base, device)
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
)
use_cache = False
past_key_values_length = 0
def _set_cos_sin_cache(self, seq_len, device, dtype):
if use_cache:
self.max_seq_len_cached = seq_len
use_legacy_cache = not isinstance(past_key_values, Cache)
t = torch.arange(self.max_seq_len_cached, device=device, dtype=self.inv_freq.dtype)
if use_legacy_cache:
t = t / self.scaling_factor
past_key_values = DynamicCache.from_legacy_cache(past_key_values)
past_key_values_length = past_key_values.get_usable_length(seq_length)
if position_ids is None:
freqs = torch.outer(t, self.inv_freq)
device = input_ids.device if input_ids is not None else inputs_embeds.device
# Different from paper, but it uses a different permutation in order to obtain the same calculation
position_ids = torch.arange(
emb = torch.cat((freqs, freqs), dim=-1)
past_key_values_length, seq_length + past_key_values_length, dtype=torch.long, device=device
self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
)
self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)
position_ids = position_ids.unsqueeze(0)
if inputs_embeds is None:
inputs_embeds = self.embed_tokens(input_ids)
# Fix lora with gradient checkpointing training
if self.training and inputs_embeds.is_leaf:
inputs_embeds.requires_grad = True
if self._use_flash_attention_2:
class HunYuanDynamicNTKScalingRotaryEmbedding(HunYuanRotaryEmbedding):
# 2d mask is passed through the layers
"""
attention_mask = attention_mask if (attention_mask is not None and 0 in attention_mask) else None
HunYuanRotaryEmbedding extended with Dynamic NTK scaling.
elif self._use_sdpa and not output_attentions:
Credits to the Reddit users /u/bloc97 and /u/emozilla
# output_attentions=True can not be supported when using SDPA, and we fall back on
"""
# the manual implementation that requires a 4D causal mask in all cases.
attention_mask = _prepare_4d_causal_attention_mask_for_sdpa(
attention_mask,
(batch_size, seq_length),
inputs_embeds,
past_key_values_length,
)
else:
# 4d mask is passed through the layers
attention_mask = _prepare_4d_causal_attention_mask(
attention_mask, (batch_size, seq_length), inputs_embeds, past_key_values_length
)
# embed positions
def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None, scaling_factor=1.0):
hidden_states = inputs_embeds
self.scaling_factor = scaling_factor
super().__init__(dim, max_position_embeddings, base, device)
# decoder layers
def _set_cos_sin_cache(self, seq_len, device, dtype):
all_hidden_states = () if output_hidden_states else None
self.max_seq_len_cached = seq_len
all_self_attns = () if output_attentions else None
next_decoder_cache = None
prev_kv_states = None
if seq_len > self.max_position_embeddings:
for layer_idx, decoder_layer in enumerate(self.layers):
base = self.base * (
if output_hidden_states:
(self.scaling_factor * seq_len / self.max_position_embeddings) - (self.scaling_factor - 1)
all_hidden_states += (hidden_states,)
) ** (self.dim / (self.dim - 2))
inv_freq = 1.0 / (base ** (torch.arange(0, self.dim, 2).float().to(device) / self.dim))
self.register_buffer("inv_freq", inv_freq, persistent=False)
if self.gradient_checkpointing and self.training:
t = torch.arange(self.max_seq_len_cached, device=device, dtype=self.inv_freq.dtype)
layer_outputs = self._gradient_checkpointing_func(
decoder_layer.__call__,
hidden_states,
attention_mask,
position_ids,
past_key_values,
output_attentions,
use_cache,
prev_kv_states,
)
else:
layer_outputs = decoder_layer(
hidden_states,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_value=past_key_values,
output_attentions=output_attentions,
use_cache=use_cache,
kv_states=prev_kv_states
)
hidden_states = layer_outputs[0]
freqs = torch.outer(t, self.inv_freq)
# Different from paper, but it uses a different permutation in order to obtain the same calculation
emb = torch.cat((freqs, freqs), dim=-1)
self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)
if use_cache:
next_decoder_cache = layer_outputs[2 if output_attentions else 1]
if output_attentions:
class HunYuanDynamicNTKAlphaRotaryEmbedding(HunYuanRotaryEmbedding):
all_self_attns += (layer_outputs[1],)
"""
HunYuanRotaryEmbedding extended with Dynamic NTK scaling.
Credits to the Reddit users /u/bloc97 and /u/emozilla
"""
kv_states = layer_outputs[-1]
def __init__(self, dim, max_position_embeddings=2048, base=10000, device=None, scaling_alpha=1.0):
self.scaling_alpha = scaling_alpha
super().__init__(dim, max_position_embeddings, base, device)
if self.cla and layer_idx % self.cla_share_factor == 0:
def _set_cos_sin_cache(self, seq_len, device, dtype):
prev_kv_states = kv_states
self.max_seq_len_cached = seq_len
if not self.add_classification_head:
base = self.base * self.scaling_alpha ** (self.dim / (self.dim-2))
hidden_states = self.norm(hidden_states)
inv_freq = 1.0 / (base ** (torch.arange(0, self.dim, 2).float().to(device) / self.dim))
# add hidden states from the last decoder layer
self.register_buffer("inv_freq", inv_freq, persistent=False)
if output_hidden_states:
all_hidden_states += (hidden_states,)
next_cache = None
t = torch.arange(self.max_seq_len_cached, device=device, dtype=self.inv_freq.dtype)
if use_cache:
next_cache = next_decoder_cache.to_legacy_cache() if use_legacy_cache else next_decoder_cache
if not return_dict:
return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None)
return BaseModelOutputWithPast(
last_hidden_state=hidden_states,
past_key_values=next_cache,
hidden_states=all_hidden_states,
attentions=all_self_attns,
)
freqs = torch.outer(t, self.inv_freq)
# Different from paper, but it uses a different permutation in order to obtain the same calculation
emb = torch.cat((freqs, freqs), dim=-1)
self.register_buffer("cos_cached", emb.cos().to(dtype), persistent=False)
self.register_buffer("sin_cached", emb.sin().to(dtype), persistent=False)
class HunYuanMoEV1ForCausalLM(HunYuanPreTrainedModel):
_tied_weights_keys = ["lm_head.weight"]
def __init__(self, config: HunYuanConfig):
def rotate_half(x):
super().__init__(config)
"""Rotates half the hidden dims of the input."""
x1 = x[..., : x.shape[-1] // 2]
self.config = config
x2 = x[..., x.shape[-1] // 2:]
self.model = HunYuanModel(config)
return torch.cat((-x2, x1), dim=-1)
self.add_classification_head = config.add_classification_head
self.pad_id = config.pad_id
self.vocab_size = config.vocab_size
self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
if config.add_classification_head:
self.pool_head = nn.Linear(config.hidden_size, config.hidden_size, bias=False)
self.pool_head2 = nn.Linear(config.hidden_size, config.class_num, bias=False)
# Initialize weights and apply final processing
self.post_init()
def get_input_embeddings(self):
return self.model.embed_tokens
def set_input_embeddings(self, value):
def apply_rotary_pos_emb(q, k, cos, sin, position_ids, unsqueeze_dim=1):
self.model.embed_tokens = value
"""Applies Rotary Position Embedding to the query and key tensors.
Args:
q (`torch.Tensor`): The query tensor.
k (`torch.Tensor`): The key tensor.
cos (`torch.Tensor`): The cosine part of the rotary embedding.
sin (`torch.Tensor`): The sine part of the rotary embedding.
position_ids (`torch.Tensor`):
The position indices of the tokens corresponding to the query and key tensors. For example, this can be
used to pass offsetted position ids when working with a KV-cache.
unsqueeze_dim (`int`, *optional*, defaults to 1):
The 'unsqueeze_dim' argument specifies the dimension along which to unsqueeze cos[position_ids] and
sin[position_ids] so that they can be properly broadcasted to the dimensions of q and k. For example, note
that cos[position_ids] and sin[position_ids] have the shape [batch_size, seq_len, head_dim]. Then, if q and
k have the shape [batch_size, heads, seq_len, head_dim], then setting unsqueeze_dim=1 makes
cos[position_ids] and sin[position_ids] broadcastable to the shapes of q and k. Similarly, if q and k have
the shape [batch_size, seq_len, heads, head_dim], then set unsqueeze_dim=2.
Returns:
`tuple(torch.Tensor)` comprising of the query and key tensors rotated using the Rotary Position Embedding.
"""
cos = cos[position_ids].unsqueeze(unsqueeze_dim)
sin = sin[position_ids].unsqueeze(unsqueeze_dim)
q_embed = (q * cos) + (rotate_half(q) * sin)
k_embed = (k * cos) + (rotate_half(k) * sin)
return q_embed, k_embed
def get_output_embeddings(self):
return self.lm_head
def set_output_embeddings(self, new_embeddings):
class HunYuanMLP(nn.Module):
self.lm_head = new_embeddings
def __init__(self, config: HunYuanConfig, layer_idx=None, is_shared_mlp=False):
super().__init__()
self.config = config
self.layer_idx = layer_idx
self.hidden_size = config.hidden_size
if is_shared_mlp:
self.intermediate_size = config.intermediate_size * config.num_shared_expert[0]
else:
self.intermediate_size = config.intermediate_size
self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False)
self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False)
self.act_fn = ACT2FN[config.hidden_act]
def set_decoder(self, decoder):
def forward(self, x):
self.model = decoder
if self.config.pretraining_tp > 1:
slice = self.intermediate_size // self.config.pretraining_tp
gate_proj_slices = self.gate_proj.weight.split(slice, dim=0)
up_proj_slices = self.up_proj.weight.split(slice, dim=0)
down_proj_slices = self.down_proj.weight.split(slice, dim=1)
def get_decoder(self):
gate_proj = torch.cat(
return self.model
[F.linear(x, gate_proj_slices[i]) for i in range(self.config.pretraining_tp)], dim=-1
)
up_proj = torch.cat([F.linear(x, up_proj_slices[i]) for i in range(self.config.pretraining_tp)], dim=-1)
@add_start_docstrings_to_model_forward(HUNYUAN_INPUTS_DOCSTRING)
intermediate_states = (self.act_fn(gate_proj) * up_proj).split(slice, dim=2)
@replace_return_docstrings(output_type=CausalLMOutputWithPast, config_class=_CONFIG_FOR_DOC)
down_proj = [
def forward(
F.linear(intermediate_states[i], down_proj_slices[i]) for i in range(self.config.pretraining_tp)
self,
]
input_ids: torch.LongTensor = None,
down_proj = sum(down_proj)
attention_mask: Optional[torch.Tensor] = None,
else:
position_ids: Optional[torch.LongTensor] = None,
down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x))
past_key_values: Optional[List[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
labels: Optional[torch.LongTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
) -> Union[Tuple, CausalLMOutputWithPast]:
r"""
Args:
labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
Labels for computing the masked language modeling loss. Indices should either be in `[0, ...,
config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored
(masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`.
Returns:
return down_proj
Example:
```python
class HunYuanTopKGate(nn.Module):
>>> from transformers import AutoTokenizer, AutoModelForCausalLM
def __init__(self, config: HunYuanConfig, layer_idx: Optional[int] = None):
super().__init__()
self.config = config
self.layer_idx = layer_idx
self.moe_topk = config.moe_topk
self.drop_tokens = config.moe_drop_tokens
self.min_capacity = 8
self.random_routing_dropped_token = config.moe_random_routing_dropped_token
self.wg = nn.Linear(config.hidden_size, config.num_experts, bias=False, dtype=torch.float32)
>>> model = AutoModelForCausalLM.from_pretrained(PATH_TO_CONVERTED_WEIGHTS)
def forward(self, hidden_states):
>>> tokenizer = AutoTokenizer.from_pretrained(PATH_TO_CONVERTED_TOKENIZER)
bsz, seq_len, hidden_size = hidden_states.shape
hidden_states = hidden_states.reshape(-1, hidden_size)
if self.wg.weight.dtype == torch.float32:
hidden_states = hidden_states.float()
logits = self.wg(hidden_states)
if self.moe_topk == 1:
gate_output = top1gating(logits, random_routing_dropped_token=self.random_routing_dropped_token)
else:
gate_output = topkgating(logits, self.moe_topk[0])
>>> prompt = "Hey, are you conscious? Can you talk to me?"
return gate_output
>>> inputs = tokenizer(prompt, return_tensors="pt")
>>> # Generate
>>> generate_ids = model.generate(inputs.input_ids, max_length=30)
>>> tokenizer.batch_decode(generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0]
"Hey, are you conscious? Can you talk to me?\nI'm not conscious, but I can talk to you."
```"""
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
# decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn)
class HunYuanMoE(nn.Module):
outputs = self.model(
def __init__(self, config: HunYuanConfig, layer_idx: Optional[int] = None):
input_ids=input_ids,
super().__init__()
attention_mask=attention_mask,
self.config = config
position_ids=position_ids,
self.layer_idx = layer_idx
past_key_values=past_key_values,
self.moe_topk = config.moe_topk
inputs_embeds=inputs_embeds,
self.num_experts = config.num_experts
use_cache=use_cache,
if config.use_mixed_mlp_moe:
output_attentions=output_attentions,
self.shared_mlp = HunYuanMLP(config, layer_idx=layer_idx, is_shared_mlp=True)
output_hidden_states=output_hidden_states,
self.gate = HunYuanTopKGate(config, layer_idx=layer_idx)
return_dict=return_dict,
self.experts = nn.ModuleList(
[HunYuanMLP(config, layer_idx=layer_idx, is_shared_mlp=False) for _ in range(config.num_experts)]
)
)
hidden_states = outputs[0]
def forward(self, hidden_states):
bsz, seq_len, hidden_size = hidden_states.shape
if not self.add_classification_head:
if self.config.use_mixed_mlp_moe:
if self.config.pretraining_tp > 1:
hidden_states_mlp = self.shared_mlp(hidden_states)
lm_head_slices = self.lm_head.weight.split(self.vocab_size // self.config.pretraining_tp, dim=0)
logits = [F.linear(hidden_states, lm_head_slices[i]) for i in range(self.config.pretraining_tp)]
logits = torch.cat(logits, dim=-1)
else:
logits = self.lm_head(hidden_states)
logits = logits.float()
else:
logits = hidden_states
logits = logits.float()
pooled_output = self.pool_head(logits)
pooled_output = torch.tanh(pooled_output)
pooled_output = self.pool_head2(pooled_output).contiguous() # bs * class_num
if len(pooled_output.shape) < 2:
raise ValueError("pooled_output does not have enough dimensions for transpose")
if self.config.pool_type == "mean":
l_moe, combine_weights, dispatch_mask, exp_counts = self.gate(hidden_states)
reward = pooled_output.mean(dim=1).squeeze(-1)
elif self.config.pool_type == "last":
# bs * hidden_size
seq_length = (input_ids != self.pad_id).long().sum(dim=1) - 1
batch_size = input_ids.size(0)
reward = pooled_output[torch.arange(batch_size, device=pooled_output.device), seq_length].squeeze(-1)
else:
reward = pooled_output[:, 0].squeeze(-1)
loss = None
reshaped_input = hidden_states.reshape(-1, hidden_size)
if labels is not None:
# Shift so that tokens < n predict n
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = labels[..., 1:].contiguous()
# Flatten the tokens
loss_fct = CrossEntropyLoss()
shift_logits = shift_logits.reshape(-1, self.config.vocab_size)
shift_labels = shift_labels.reshape(-1)
# Enable model parallelism
shift_labels = shift_labels.to(shift_logits.device)
loss = loss_fct(shift_logits, shift_labels)
if not return_dict:
dispatched_input = torch.einsum("sec,sm->ecm", dispatch_mask.type_as(hidden_states), reshaped_input)
output = (logits,) + outputs[1:]
return (loss,) + output if loss is not None else output
output = CausalLMOutputWithPast(
chunks = dispatched_input.chunk(self.num_experts, dim=0)
loss=loss,
expert_outputs = []
logits=logits,
for chunk, expert in zip(chunks, self.experts):
past_key_values=outputs.past_key_values,
expert_outputs.append(expert(chunk))
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
expert_output = torch.cat(expert_outputs, dim=0)
)
combined_output = torch.einsum("sec,ecm->sm", combine_weights.type_as(hidden_states), expert_output)
if self.add_classification_head:
combined_output = combined_output.reshape(bsz, seq_len, hidden_size)
output['reward'] = reward
if self.config.use_mixed_mlp_moe:
output = hidden_states_mlp + combined_output
else:
output = combined_output
return output
return output
def prepare_inputs_for_generation(
self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs
):
if past_key_values is not None:
if isinstance(past_key_values, Cache):
cache_length = past_key_values.get_seq_length()
past_length = past_key_values.seen_tokens
max_cache_length = past_key_values.get_max_cache_shape()
else:
cache_length = past_length = past_key_values[0][0].shape[2]
max_cache_length = None
# Keep only the unprocessed tokens:
def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor:
# 1 - If the length of the attention_mask exceeds the length of input_ids, then we are in a setting where
"""
# some of the inputs are exclusivelly passed as part of the cache (e.g. when passing input_embeds as
This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch,
# input)
num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim)
if attention_mask is not None and attention_mask.shape[1] > input_ids.shape[1]:
"""
input_ids = input_ids[:, -(attention_mask.shape[1] - past_length):]
batch, num_key_value_heads, slen, head_dim = hidden_states.shape
# 2 - If the past_length is smaller than input_ids', then input_ids holds all input tokens. We can discard
if n_rep == 1:
# input_ids based on the past_length.
return hidden_states
elif past_length < input_ids.shape[1]:
hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim)
input_ids = input_ids[:, past_length:]
return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim)
# 3 - Otherwise (past_length >= input_ids.shape[1]), let's assume input_ids only has unprocessed tokens.
# If we are about to go beyond the maximum cache length, we need to crop the input attention mask.
if (
max_cache_length is not None
and attention_mask is not None
and cache_length + input_ids.shape[1] > max_cache_length
):
attention_mask = attention_mask[:, -max_cache_length:]
position_ids = kwargs.get("position_ids", None)
class HunYuanAttention(nn.Module):
if attention_mask is not None and position_ids is None:
"""Multi-headed attention from 'Attention Is All You Need' paper"""
# create position_ids on the fly for batch generation
position_ids = attention_mask.long().cumsum(-1) - 1
position_ids.masked_fill_(attention_mask == 0, 1)
if past_key_values:
position_ids = position_ids[:, -input_ids.shape[1]:]
# if `inputs_embeds` are passed, we only want to use them in the 1st generation step
def __init__(self, config: HunYuanConfig, layer_idx: Optional[int] = None):
if inputs_embeds is not None and past_key_values is None:
super().__init__()
model_inputs = {"inputs_embeds": inputs_embeds}
self.config = config
else:
self.layer_idx = layer_idx
model_inputs = {"input_ids": input_ids}
# layer_idx 从 0 开始
self.attention_type = 'cross' if config.use_cla and layer_idx % config.cla_share_factor != 0 else 'self'
if layer_idx is None:
logger.warning_once(
f"Instantiating {self.__class__.__name__} without passing `layer_idx` is not recommended and will "
"to errors during the forward call, if caching is used. Please make sure to provide a `layer_idx` "
"when creating this class."
)
model_inputs.update(
self.attention_dropout = config.attention_dropout
{
self.hidden_size = config.hidden_size
"position_ids": position_ids,
self.num_heads = config.num_attention_heads
"past_key_values": past_key_values,
self.head_dim = self.hidden_size // self.num_heads
"
self.num_key_value_heads = config.num_key_value_heads
self.num_key_value_groups = self.num_heads // self.num_key_value_heads
self.max_position_embeddings = config.max_position_embeddings
self.rope_theta = config.rope_theta
self.is_causal = True
self.use_qk_norm = config.use_qk_norm
if (self.head_dim * self.num_heads) != self.hidden_size:
raise ValueError(
f"hidden_size must be divisible by num_heads (got `hidden_size`: {self.hidden_size}"
f" and `num_heads`: {self.num_heads})."
)
self.q_proj = nn.Linear(self.hidden_size, self.num_he