Files
BitNet/utils/generate-dummy-bitnet-model.py
potassiummmm 6cfd8831fd initial commit
2024-10-17 21:21:10 +08:00

1048 lines
44 KiB
Python

#!/usr/bin/env python3
# dummy model generation script based on convert-hf-to-gguf-bitnet.py
from __future__ import annotations
import sys
from pathlib import Path
import numpy as np
import configparser
import logging
import argparse
import contextlib
import json
import os
import re
import sys
from abc import ABC, abstractmethod
from enum import IntEnum
from pathlib import Path
from hashlib import sha256
from typing import TYPE_CHECKING, Any, Callable, ContextManager, Iterator, Sequence, TypeVar, cast, Tuple, Iterable
# Necessary to load the local gguf package
sys.path.insert(0, str(Path(__file__).parent.parent))
from gguf import GGUFWriter, GGUFReader, RopeScalingType, TokenType, GGMLQuantizationType # noqa: E402
if TYPE_CHECKING:
from torch import Tensor
import torch
import gguf
logger = logging.getLogger("generate-dummy-bitnet-model")
###### MODEL HPARAMS CONFIGURATION ######
model_config = {
"125M": {
"hidden_size": 768,
"intermediate_size": 3072,
"num_hidden_layers": 11,
"num_attention_heads": 12
},
"350M": {
"hidden_size": 1024,
"intermediate_size": 3072,
"num_hidden_layers": 24,
"num_attention_heads": 16
},
"1B": {
"hidden_size": 2048,
"intermediate_size": 3584,
"num_hidden_layers": 24,
"num_attention_heads": 32
},
"1.5B": {
"hidden_size": 1536,
"intermediate_size": 9216,
"num_hidden_layers": 28,
"num_attention_heads": 32
},
"2.7B": {
"hidden_size": 3072,
"intermediate_size": 7680,
"num_hidden_layers": 24,
"num_attention_heads": 32
},
"3.8B": {
"hidden_size": 3840,
"intermediate_size": 8192,
"num_hidden_layers": 24,
"num_attention_heads": 32
},
"7B": {
"hidden_size": 4096,
"intermediate_size": 12032,
"num_hidden_layers": 32,
"num_attention_heads": 32
},
"13B": {
"hidden_size": 5120,
"intermediate_size": 13824,
"num_hidden_layers": 40,
"num_attention_heads": 40
},
"30B": {
"hidden_size": 6656,
"intermediate_size": 16384,
"num_hidden_layers": 60,
"num_attention_heads": 52
},
"70B": {
"hidden_size": 8192,
"intermediate_size": 24576,
"num_hidden_layers": 80,
"num_attention_heads": 64
},
"100B": {
"hidden_size": 8192,
"intermediate_size": 45568,
"num_hidden_layers": 72,
"num_attention_heads": 64
}
}
###### MODEL DEFINITIONS ######
class SentencePieceTokenTypes(IntEnum):
NORMAL = 1
UNKNOWN = 2
CONTROL = 3
USER_DEFINED = 4
UNUSED = 5
BYTE = 6
AnyModel = TypeVar("AnyModel", bound="type[Model]")
class Model(ABC):
_model_classes: dict[str, type[Model]] = {}
def __init__(self, dir_model: Path, ftype: int, fname_out: Path, is_big_endian: bool, use_temp_file: bool):
self.dir_model = dir_model
self.ftype = ftype
self.fname_out = fname_out
self.is_big_endian = is_big_endian
self.endianess = gguf.GGUFEndian.BIG if is_big_endian else gguf.GGUFEndian.LITTLE
self.use_temp_file = use_temp_file
self.is_safetensors = self._is_model_safetensors()
self.num_parts = Model.count_model_parts(self.dir_model, ".safetensors" if self.is_safetensors else ".bin")
self.part_names = self._get_part_names()
self.hparams = Model.load_hparams(self.dir_model)
self.gguf_writer = gguf.GGUFWriter(fname_out, gguf.MODEL_ARCH_NAMES[self.model_arch], endianess=self.endianess, use_temp_file=self.use_temp_file)
self.block_count = self.find_hparam(["n_layers", "num_hidden_layers", "n_layer"])
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
@property
@abstractmethod
def model_arch(self) -> gguf.MODEL_ARCH:
pass
def find_hparam(self, keys: Sequence[str], optional: bool = False) -> Any:
key = next((k for k in keys if k in self.hparams), None)
if key is not None:
return self.hparams[key]
if optional:
return None
raise KeyError(f"could not find any of: {keys}")
def set_vocab(self):
self._set_vocab_gpt2()
def get_tensors(self) -> Iterator[tuple[str, Tensor]]:
for part_name in self.part_names:
logger.info(f"gguf: loading model part '{part_name}'")
ctx: ContextManager[Any]
if self.is_safetensors:
from safetensors import safe_open
ctx = cast(ContextManager[Any], safe_open(self.dir_model / part_name, framework="pt", device="cpu"))
else:
ctx = contextlib.nullcontext(torch.load(str(self.dir_model / part_name), map_location="cpu", mmap=True, weights_only=True))
with ctx as model_part:
for name in model_part.keys():
data = model_part.get_tensor(name) if self.is_safetensors else model_part[name]
yield name, data
def match_model_tensor_name(self, name: str, key: gguf.MODEL_TENSOR, bid: int | None, suffix: str = ".weight") -> bool:
if key not in gguf.MODEL_TENSORS[self.model_arch]:
return False
key_name: str = gguf.TENSOR_NAMES[key]
if "{bid}" in key_name:
if bid is None:
return False
key_name = key_name.format(bid=bid)
else:
if bid is not None:
return False
return name == (key_name + suffix)
def map_tensor_name(self, name: str, try_suffixes: Sequence[str] = (".weight", ".bias")) -> str:
new_name = self.tensor_map.get_name(key=name, try_suffixes=try_suffixes)
if new_name is None:
raise ValueError(f"Can not map tensor {name!r}")
return new_name
def set_gguf_parameters(self):
self.gguf_writer.add_name(self.dir_model.name)
self.gguf_writer.add_block_count(self.block_count)
if (n_ctx := self.find_hparam(["max_position_embeddings", "n_ctx"], optional=True)) is not None:
self.gguf_writer.add_context_length(n_ctx)
logger.info(f"gguf: context length = {n_ctx}")
n_embd = self.find_hparam(["hidden_size", "n_embd"])
self.gguf_writer.add_embedding_length(n_embd)
logger.info(f"gguf: embedding length = {n_embd}")
if (n_ff := self.find_hparam(["intermediate_size", "n_inner"], optional=True)) is not None:
self.gguf_writer.add_feed_forward_length(n_ff)
logger.info(f"gguf: feed forward length = {n_ff}")
n_head = self.find_hparam(["num_attention_heads", "n_head"])
self.gguf_writer.add_head_count(n_head)
logger.info(f"gguf: head count = {n_head}")
if (n_head_kv := self.hparams.get("num_key_value_heads")) is not None:
self.gguf_writer.add_head_count_kv(n_head_kv)
logger.info(f"gguf: key-value head count = {n_head_kv}")
if (rope_theta := self.hparams.get("rope_theta")) is not None:
self.gguf_writer.add_rope_freq_base(rope_theta)
logger.info(f"gguf: rope theta = {rope_theta}")
if (f_rms_eps := self.hparams.get("rms_norm_eps")) is not None:
self.gguf_writer.add_layer_norm_rms_eps(f_rms_eps)
logger.info(f"gguf: rms norm epsilon = {f_rms_eps}")
if (f_norm_eps := self.find_hparam(["layer_norm_eps", "layer_norm_epsilon", "norm_epsilon"], optional=True)) is not None:
self.gguf_writer.add_layer_norm_eps(f_norm_eps)
logger.info(f"gguf: layer norm epsilon = {f_norm_eps}")
if (n_experts := self.hparams.get("num_local_experts")) is not None:
self.gguf_writer.add_expert_count(n_experts)
logger.info(f"gguf: expert count = {n_experts}")
if (n_experts_used := self.hparams.get("num_experts_per_tok")) is not None:
self.gguf_writer.add_expert_used_count(n_experts_used)
logger.info(f"gguf: experts used count = {n_experts_used}")
self.gguf_writer.add_file_type(self.ftype)
logger.info(f"gguf: file type = {self.ftype}")
def write_tensors(self):
block_count = self.hparams.get("n_layers", self.hparams.get("num_hidden_layers", self.hparams.get("n_layer")))
tensor_map = gguf.get_tensor_name_map(self.model_arch, block_count)
for name, data_torch in self.get_tensors():
# we don't need these
if name.endswith((".attention.masked_bias", ".attention.bias", ".attention.rotary_emb.inv_freq")):
continue
old_dtype = data_torch.dtype
# convert any unsupported data types to float32
if data_torch.dtype not in (torch.float16, torch.float32):
data_torch = data_torch.to(torch.float32)
data = data_torch.squeeze().numpy()
# map tensor names
new_name = tensor_map.get_name(name, try_suffixes=(".weight", ".bias"))
if new_name is None:
raise ValueError(f"Can not map tensor {name!r}")
n_dims = len(data.shape)
data_dtype = data.dtype
# if f32 desired, convert any float16 to float32
if self.ftype == 0 and data_dtype == np.float16:
data = data.astype(np.float32)
# TODO: Why cant we use these float16 as-is? There should be not reason to store float16 as float32
if self.ftype == 1 and data_dtype == np.float16 and (n_dims == 1 or new_name.endswith("_norm.weight")):
data = data.astype(np.float32)
# if f16 desired, convert any float32 2-dim weight tensors to float16
if self.ftype == 1 and data_dtype == np.float32 and name.endswith(".weight") and n_dims == 2:
data = data.astype(np.float16)
logger.info(f"{new_name}, n_dims = {n_dims}, {old_dtype} --> {data.dtype}")
self.gguf_writer.add_tensor(new_name, data)
def write(self):
self.write_tensors()
self.gguf_writer.write_header_to_file()
self.gguf_writer.write_kv_data_to_file()
self.gguf_writer.write_tensors_to_file()
self.gguf_writer.close()
def write_vocab(self):
self.gguf_writer.write_header_to_file()
self.gguf_writer.write_kv_data_to_file()
self.gguf_writer.close()
@staticmethod
def count_model_parts(dir_model: Path, prefix: str) -> int:
num_parts = 0
for filename in os.listdir(dir_model):
if filename.endswith(prefix):
num_parts += 1
return num_parts
@staticmethod
def load_hparams(dir_model):
with open(dir_model / "config.json", "r", encoding="utf-8") as f:
return json.load(f)
@classmethod
def register(cls, *names: str) -> Callable[[AnyModel], AnyModel]:
assert names
def func(modelcls: type[Model]):
for name in names:
cls._model_classes[name] = modelcls
return modelcls
return func
@classmethod
def from_model_architecture(cls, arch):
try:
return cls._model_classes[arch]
except KeyError:
raise NotImplementedError(f'Architecture {arch!r} not supported!') from None
def _is_model_safetensors(self) -> bool:
return Model.count_model_parts(self.dir_model, ".safetensors") > 0
def _get_part_names(self):
if self.is_safetensors:
if self.num_parts == 1: # there's only one .safetensors file
return ("model.safetensors",)
return (f"model-{n:05}-of-{self.num_parts:05}.safetensors" for n in range(1, self.num_parts + 1))
if self.num_parts == 1: # there's only one .bin file
return ("pytorch_model.bin",)
return (f"pytorch_model-{n:05}-of-{self.num_parts:05}.bin" for n in range(1, self.num_parts + 1))
# used for GPT-2 BPE and WordPiece vocabs
def get_vocab_base(self) -> tuple[list[str], list[int], str]:
tokens: list[str] = []
toktypes: list[int] = []
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(self.dir_model)
vocab_size = self.hparams.get("vocab_size", len(tokenizer.vocab))
assert max(tokenizer.vocab.values()) < vocab_size
tokpre = self.get_vocab_base_pre(tokenizer)
reverse_vocab = {id_: encoded_tok for encoded_tok, id_ in tokenizer.vocab.items()}
added_vocab = tokenizer.get_added_vocab()
for i in range(vocab_size):
if i not in reverse_vocab:
tokens.append(f"[PAD{i}]")
toktypes.append(gguf.TokenType.USER_DEFINED)
elif reverse_vocab[i] in added_vocab:
tokens.append(reverse_vocab[i])
if tokenizer.added_tokens_decoder[i].special:
toktypes.append(gguf.TokenType.CONTROL)
else:
toktypes.append(gguf.TokenType.USER_DEFINED)
else:
tokens.append(reverse_vocab[i])
toktypes.append(gguf.TokenType.NORMAL)
return tokens, toktypes, tokpre
# NOTE: this function is generated by convert-hf-to-gguf-update.py
# do not modify it manually!
# ref: https://github.com/ggerganov/llama.cpp/pull/6920
def get_vocab_base_pre(self, tokenizer) -> str:
# encoding this string and hashing the resulting tokens would (hopefully) give us a unique identifier that
# is specific for the BPE pre-tokenizer used by the model
# we will use this unique identifier to write a "tokenizer.ggml.pre" entry in the GGUF file which we can
# use in llama.cpp to implement the same pre-tokenizer
chktxt = '\n \n\n \n\n\n \t \t\t \t\n \n \n \n \n🚀 (normal) 😶\u200d🌫️ (multiple emojis concatenated) ✅ 🦙🦙 3 33 333 3333 33333 333333 3333333 33333333 3.3 3..3 3...3 កាន់តែពិសេសអាច😁 ?我想在apple工作1314151天~ ------======= нещо на Български \'\'\'\'\'\'```````""""......!!!!!!?????? I\'ve been \'told he\'s there, \'RE you sure? \'M not sure I\'ll make it, \'D you like some tea? We\'Ve a\'lL'
chktok = tokenizer.encode(chktxt)
chkhsh = sha256(str(chktok).encode()).hexdigest()
logger.debug(f"chktok: {chktok}")
logger.debug(f"chkhsh: {chkhsh}")
res = None
# NOTE: if you get an error here, you need to update the convert-hf-to-gguf-update.py script
# or pull the latest version of the model from Huggingface
# don't edit the hashes manually!
if chkhsh == "0ef9807a4087ebef797fc749390439009c3b9eda9ad1a097abbe738f486c01e5":
# ref: https://huggingface.co/meta-llama/Meta-Llama-3-8B
res = "llama-bpe"
if chkhsh == "049ecf7629871e3041641907f3de7c733e4dbfdc736f57d882ba0b0845599754":
# ref: https://huggingface.co/deepseek-ai/deepseek-llm-7b-base
res = "deepseek-llm"
if chkhsh == "347715f544604f9118bb75ed199f68779f423cabb20db6de6f31b908d04d7821":
# ref: https://huggingface.co/deepseek-ai/deepseek-coder-6.7b-base
res = "deepseek-coder"
if chkhsh == "8aeee3860c56296a157a1fe2fad249ec40aa59b1bb5709f4ade11c4e6fe652ed":
# ref: https://huggingface.co/tiiuae/falcon-7b
res = "falcon"
if chkhsh == "0876d13b50744004aa9aeae05e7b0647eac9d801b5ba4668afc01e709c15e19f":
# ref: https://huggingface.co/BAAI/bge-small-en-v1.5
res = "bert-bge"
if chkhsh == "b6dc8df998e1cfbdc4eac8243701a65afe638679230920b50d6f17d81c098166":
# ref: https://huggingface.co/mosaicml/mpt-7b
res = "mpt"
if chkhsh == "35d91631860c815f952d711435f48d356ebac988362536bed955d43bfa436e34":
# ref: https://huggingface.co/bigcode/starcoder2-3b
res = "starcoder"
if chkhsh == "3ce83efda5659b07b1ad37ca97ca5797ea4285d9b9ab0dc679e4a720c9da7454":
# ref: https://huggingface.co/openai-community/gpt2
res = "gpt-2"
if chkhsh == "6221ad2852e85ce96f791f476e0b390cf9b474c9e3d1362f53a24a06dc8220ff":
# ref: https://huggingface.co/smallcloudai/Refact-1_6-base
res = "refact"
if chkhsh == "9c2227e4dd922002fb81bde4fc02b0483ca4f12911410dee2255e4987644e3f8":
# ref: https://huggingface.co/CohereForAI/c4ai-command-r-v01
res = "command-r"
if res is None:
logger.warning("\n")
logger.warning("**************************************************************************************")
logger.warning("** WARNING: The BPE pre-tokenizer was not recognized!")
logger.warning("** There are 2 possible reasons for this:")
logger.warning("** - the model has not been added to convert-hf-to-gguf-update.py yet")
logger.warning("** - the pre-tokenization config has changed upstream")
logger.warning("** Check your model files and convert-hf-to-gguf-update.py and update them accordingly.")
logger.warning("** ref: https://github.com/ggerganov/llama.cpp/pull/6920")
logger.warning("**")
logger.warning(f"** chkhsh: {chkhsh}")
logger.warning("**************************************************************************************")
logger.warning("\n")
raise NotImplementedError("BPE pre-tokenizer was not recognized - update get_vocab_base_pre()")
logger.debug(f"tokenizer.ggml.pre: {repr(res)}")
logger.debug(f"chkhsh: {chkhsh}")
return res
def _set_vocab_sentencepiece(self):
from sentencepiece import SentencePieceProcessor
tokenizer_path = self.dir_model / 'tokenizer.model'
tokens: list[bytes] = []
scores: list[float] = []
toktypes: list[int] = []
if not tokenizer_path.is_file():
raise FileNotFoundError(f"File not found: {tokenizer_path}")
tokenizer = SentencePieceProcessor(str(tokenizer_path))
vocab_size = self.hparams.get('vocab_size', tokenizer.vocab_size())
for token_id in range(tokenizer.vocab_size()):
piece = tokenizer.id_to_piece(token_id)
text = piece.encode("utf-8")
score = tokenizer.get_score(token_id)
toktype = SentencePieceTokenTypes.NORMAL
if tokenizer.is_unknown(token_id):
toktype = SentencePieceTokenTypes.UNKNOWN
elif tokenizer.is_control(token_id):
toktype = SentencePieceTokenTypes.CONTROL
elif tokenizer.is_unused(token_id):
toktype = SentencePieceTokenTypes.UNUSED
elif tokenizer.is_byte(token_id):
toktype = SentencePieceTokenTypes.BYTE
tokens.append(text)
scores.append(score)
toktypes.append(toktype)
added_tokens_file = self.dir_model / 'added_tokens.json'
if added_tokens_file.is_file():
with open(added_tokens_file, "r", encoding="utf-8") as f:
added_tokens_json = json.load(f)
for key in added_tokens_json:
key = key.encode("utf-8")
if key not in tokens:
tokens.append(key)
scores.append(-1000.0)
toktypes.append(SentencePieceTokenTypes.USER_DEFINED)
if vocab_size > len(tokens):
pad_count = vocab_size - len(tokens)
logger.debug(f"Padding vocab with {pad_count} token(s) - [PAD1] through [PAD{pad_count}]")
for i in range(1, pad_count + 1):
tokens.append(f"[PAD{i}]")
scores.append(-1000.0)
toktypes.append(SentencePieceTokenTypes.UNUSED)
assert len(tokens) == vocab_size
self.gguf_writer.add_tokenizer_model("llama")
self.gguf_writer.add_tokenizer_pre("default")
self.gguf_writer.add_token_list(tokens)
self.gguf_writer.add_token_scores(scores)
self.gguf_writer.add_token_types(toktypes)
special_vocab = gguf.SpecialVocab(self.dir_model, n_vocab=len(tokens))
special_vocab.add_to_gguf(self.gguf_writer)
# TL1
def process_tl1(weight, BM, BY, bm, by, M, K):
final_weight = []
# split in row with size of BM (160)
outer_BM_weights = np.split(weight, (M // BM), axis=0)
for outer_BM_weight in outer_BM_weights:
# split in col with size of by (16index * 2 == 32nums)
outer_BY_weights = np.split(outer_BM_weight, (K // BY), axis=1)
for outer_BY_weight in outer_BY_weights:
# split in row with size of bm (32)
inner_bm_weights = np.split(outer_BY_weight, (BM // bm), axis=0)
for inner_bm_weight in inner_bm_weights:
# split in col with size of by (2index * 2 == 4nums)
inner_by_weights = np.split(inner_bm_weight, (BY // by), axis=1)
for inner_by_weight in inner_by_weights:
# 16 * 6 minor
minor_bm_weights = np.split(inner_by_weight, (bm // 16), axis=0)
for minor_bm_weight in minor_bm_weights:
minor_by_weights = np.split(minor_bm_weight, (by // 4), axis=1)
for minor in minor_by_weights:
minor_weight = np.split(minor, 2, axis=1)
hi_weight = minor_weight[0].astype(np.uint8) << 4
lo_weight = minor_weight[1].astype(np.uint8)
func_weight = lo_weight + hi_weight
final_weight.append(func_weight)
weight = np.array(final_weight, dtype=np.uint8)
return weight
# based on t_mac.utils.preprocess_weights
def preprocess_weights_tl1(
w: np.ndarray,
bits = 2,
g = 4,
) -> Tuple[np.ndarray, np.ndarray]:
M, K = w.shape
weight = w
weight = np.where(np.abs(weight) < 1e-6, 0, weight).astype(np.float32)
weight = np.sign(weight)
weight_num = np.prod(weight.shape)
model_size = args.model_size
KEMD = model_config[model_size]['hidden_size']
# outer loop
BMEMD = 256
BYEMD = 256
# inner loop (32row 32num/16index)
bmEMD = 32
byEMD = 8
KGQA = model_config[model_size]['intermediate_size']
BMGQA = 256
BYGQA = 256
bmGQA = 32
byGQA = 8
weight = np.reshape(weight, (weight_num // 2, 2))
hi_weight = np.multiply(np.split(weight, 2, axis=1)[0], 3)
lo_weight = np.split(weight, 2, axis=1)[1]
weight = np.reshape((hi_weight + lo_weight), weight_num // 2)
# row-major index
weight = weight + 4
weight = np.reshape(weight, (M, K // 2)).astype(np.uint8)
if K == KEMD:
weight = process_tl1(weight, BMEMD, BYEMD, bmEMD, byEMD, M, K)
elif K == KGQA:
weight = process_tl1(weight, BMGQA, BYGQA, bmGQA, byGQA, M, K)
else:
raise NotImplementedError
return weight
def preprocess_two_weights_tl2(M, K, weight_num, BM, BY, bm, by, weight, final_weight):
weight = np.reshape(weight, (weight_num // 2, 2))
hi_weight = np.multiply(np.split(weight, 2, axis=1)[0], 3)
lo_weight = np.split(weight, 2, axis=1)[1]
weight = np.reshape((hi_weight + lo_weight), weight_num // 2)
# row-major index
weight = weight + 4
weight = np.reshape(weight, (M, K // 2)).astype(np.uint8)
outer_BM_weights = np.split(weight, (M // BM), axis=0)
for outer_BM_weight in outer_BM_weights:
# split in col with size of by (32index * 3 == 96nums)
outer_BY_weights = np.split(outer_BM_weight, (K // BY), axis=1)
for outer_BY_weight in outer_BY_weights:
# split in row with size of bm (32)
inner_bm_weights = np.split(outer_BY_weight, (BM // bm), axis=0)
for inner_bm_weight in inner_bm_weights:
# split in col with size of by (2index * 2 == 4nums)
inner_by_weights = np.split(inner_bm_weight, (BY // by), axis=1)
for inner_by_weight in inner_by_weights:
func_weights = np.split(inner_by_weight, 2, axis=1)
left_weight = func_weights[0]
left_sub_weights = np.split(left_weight, 4, axis=0)
new_left_weight = np.reshape(
np.concatenate([left_sub_weights[0], left_sub_weights[2],
left_sub_weights[1], left_sub_weights[3]], axis=0, dtype=np.uint8),
(bm))
right_weight = func_weights[1]
right_sub_weights = np.split(right_weight, 4, axis=0)
new_right_weight = np.reshape(
np.concatenate([right_sub_weights[0], right_sub_weights[2],
right_sub_weights[1], right_sub_weights[3]], axis=0, dtype=np.uint8),
(bm))
hi_weight = new_left_weight.astype(np.uint8) << 4
lo_weight = new_right_weight
func_weight = hi_weight + lo_weight
func_weight = np.reshape(func_weight, bm * by // 4)
final_weight.append(func_weight)
def preprocess_three_weights_tl2(M, K, weight_num, BM, BY, bm, by, weight, final_weight):
weight = np.reshape(weight, (weight_num // 3, 3))
split_weights = np.split(weight, 3, axis=1)
first_weight = np.multiply(split_weights[0], 9)
second_weight = np.multiply(split_weights[1], 3)
third_weight = split_weights[2]
weight = np.reshape((first_weight + second_weight + third_weight), weight_num // 3)
sign_weight = np.sign(weight) + 2
sign_weight = np.where(sign_weight > 1, 0, sign_weight)
weight = np.abs(weight)
# row-major index
weight = np.reshape(weight, (M, K // 3)).astype(np.uint8)
sign_weight = np.reshape(sign_weight, (M, K // 3)).astype(np.uint8)
# print(weight)
# split in row with size of BM (160)
outer_BM_weights = np.split(weight, (M // BM), axis=0)
for outer_BM_weight in outer_BM_weights:
# split in col with size of by (32index * 3 == 96nums)
outer_BY_weights = np.split(outer_BM_weight, (K // BY), axis=1)
for outer_BY_weight in outer_BY_weights:
# split in row with size of bm (32)
inner_bm_weights = np.split(outer_BY_weight, (BM // bm), axis=0)
for inner_bm_weight in inner_bm_weights:
# split in col with size of by (2index * 3 == 6nums)
inner_by_weights = np.split(inner_bm_weight, (BY // by), axis=1)
for inner_by_weight in inner_by_weights:
func_weights = np.split(inner_by_weight, 2, axis=1)
left_weight = func_weights[0]
left_sub_weights = np.split(left_weight, 4, axis=0)
new_left_weight = np.reshape(
np.concatenate([left_sub_weights[0], left_sub_weights[2],
left_sub_weights[1], left_sub_weights[3]], axis=0, dtype=np.uint8),
(bm))
right_weight = func_weights[1]
right_sub_weights = np.split(right_weight, 4, axis=0)
new_right_weight = np.reshape(
np.concatenate([right_sub_weights[0], right_sub_weights[2],
right_sub_weights[1], right_sub_weights[3]], axis=0, dtype=np.uint8),
(bm))
hi_weight = new_left_weight.astype(np.uint8) << 4
lo_weight = new_right_weight
func_weight = hi_weight + lo_weight
func_weight = np.reshape(func_weight, bm * by // 6)
final_weight.append(func_weight)
sign_weight_list = []
sign_outer_BM_weights = np.split(sign_weight, (M // BM), axis=0)
for sign_outer_BM_weight in sign_outer_BM_weights:
# split in col with size of by (32index * 3 == 96nums)
sign_outer_BY_weights = np.split(sign_outer_BM_weight, (K // BY), axis=1)
for sign_outer_BY_weight in sign_outer_BY_weights:
# split in row with size of bm (32)
sign_inner_bm_weights = np.split(sign_outer_BY_weight, (BM // bm), axis=0)
for sign_inner_bm_weight in sign_inner_bm_weights:
# split in col with size of by (4index * 3 == 12nums)
sign_inner_by_weights = np.split(sign_inner_bm_weight, (BY // (by * 4)), axis=1)
for sign_inner_by_weight in sign_inner_by_weights:
func_weight = np.split(sign_inner_by_weight, 8, axis=1)
combine_weight = np.zeros((16, 1), dtype=np.uint16)
for i in range(len(func_weight)):
min_weight = np.split(func_weight[i], 2)
min_top_weight = min_weight[0].astype(np.uint16) << 15 - (2 * i)
min_bot_weight = min_weight[1].astype(np.uint16) << 15 - (2 * i + 1)
combine_weight += min_top_weight
combine_weight += min_bot_weight
combine_weight = combine_weight.view(np.uint8)
# combine_weight = combine_weight[:, [1, 0]]
combine_weight = np.reshape(combine_weight, bm)
sign_weight_list.append(combine_weight)
final_weight.extend(sign_weight_list)
final_weight.extend(sign_weight_list)
def preprocess_weights_tl2(
w: np.ndarray,
bits = 2,
g = 4,
) -> Tuple[np.ndarray, np.ndarray]:
M, K = w.shape
weight = w
weight = np.where(np.abs(weight) < 1e-6, 0, weight).astype(np.float32)
weight = np.sign(weight)
weight_num = np.prod(weight.shape)
# for three num 6 bit ->
# outer loop
KEMD = 1536
BMEMD = 256
BYEMD = 96
KGQA = 4096
BMGQA = 128
BYGQA = 96
# inner loop (32row 32num/16index)
bm3 = 32
by3 = 6
if K == KEMD:
BM3 = BMEMD
BY3 = BYEMD
elif K == KGQA:
BM3 = BMGQA
BY3 = BYGQA
else:
raise NotImplementedError
BM2 = BM3
BY2 = 32
# inner loop (32row 32num/16index)
bm2 = 32
by2 = 4
if (weight.shape[1] % BY3 != 0):
slice_k_idx = weight.shape[1] - weight.shape[1] % BY3
slice_weights = np.split(weight, [slice_k_idx], axis=1)
three_weight = slice_weights[0]
two_weight = slice_weights[1]
else:
three_weight = weight
final_weight = []
preprocess_three_weights_tl2(three_weight.shape[0],
three_weight.shape[1],
three_weight.shape[0] * three_weight.shape[1],
BM3,
BY3,
bm3,
by3,
three_weight,
final_weight)
if (weight.shape[1] % BY3 != 0):
preprocess_two_weights_tl2( two_weight.shape[0],
two_weight.shape[1],
two_weight.shape[0] * two_weight.shape[1],
BM2,
BY2,
bm2,
by2,
two_weight,
final_weight)
weight = np.array(final_weight, dtype=np.uint8)
return weight
@Model.register("BitnetForCausalLM")
class BitnetModel(Model):
model_arch = gguf.MODEL_ARCH.BITNET
params: str = ""
def set_params(self, params: str):
self.params = params
hp_config = model_config[self.params]
self.hparams["hidden_size"] = hp_config["hidden_size"]
self.hparams["intermediate_size"] = hp_config["intermediate_size"]
self.hparams["num_hidden_layers"] = hp_config["num_hidden_layers"]
self.hparams["num_attention_heads"] = hp_config["num_attention_heads"]
self.hparams["num_key_value_heads"] = hp_config["num_attention_heads"]
self.block_count = self.find_hparam(["n_layers", "num_hidden_layers", "n_layer"])
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
def set_vocab(self):
self._set_vocab_sentencepiece()
def set_gguf_parameters(self):
super().set_gguf_parameters()
self.gguf_writer.add_vocab_size(self.hparams["vocab_size"])
self.gguf_writer.add_rope_scaling_type(gguf.RopeScalingType.LINEAR)
self.gguf_writer.add_rope_scaling_factor(1.0)
def weight_quant(self, weight):
dtype = weight.dtype
weight = weight.float()
s = 1 / weight.abs().mean().clamp(min=1e-5)
result = (weight * s).round().clamp(-1, 1) / s
return result.type(dtype)
def transform_to_tl1(self, x: np.ndarray):
scale = np.max(np.abs(x))
# res = np.round(x / scale + 2).astype(np.uint8)
res = preprocess_weights_tl1(x)
return res, scale
def transform_to_tl2(self, x: np.ndarray):
scale = np.max(np.abs(x))
# res = np.round(x / scale + 2).astype(np.uint8)
res = preprocess_weights_tl2(x)
return res, scale
# generate dummy model
def generate_tensors(self) -> Iterator[tuple[str, np.ndarray]]:
hp_config = model_config[self.params]
hidden_size = hp_config["hidden_size"]
intermediate_size = hp_config["intermediate_size"]
num_hidden_layers = hp_config["num_hidden_layers"]
num_attention_heads = hp_config["num_attention_heads"]
# generate dummy tensors
tensor = torch.randn((32002, hidden_size), dtype=torch.float32)
yield ("model.embed_tokens.weight", tensor)
for i in range(num_hidden_layers):
yield f"model.layers.{i}.input_layernorm.weight", torch.randn((hidden_size,), dtype=torch.float32)
yield f"model.layers.{i}.mlp.down_proj.weight", torch.randn((hidden_size, intermediate_size), dtype=torch.float32)
yield f"model.layers.{i}.mlp.ffn_layernorm.weight", torch.randn((intermediate_size,), dtype=torch.float32)
yield f"model.layers.{i}.mlp.gate_proj.weight", torch.randn((intermediate_size, hidden_size), dtype=torch.float32)
yield f"model.layers.{i}.mlp.up_proj.weight", torch.randn((intermediate_size, hidden_size), dtype=torch.float32)
yield f"model.layers.{i}.post_attention_layernorm.weight", torch.randn((hidden_size), dtype=torch.float32)
yield f"model.layers.{i}.self_attn.inner_attn_ln.weight", torch.randn((hidden_size,), dtype=torch.float32)
yield f"model.layers.{i}.self_attn.k_proj.weight", torch.randn((hidden_size, hidden_size), dtype=torch.float32)
yield f"model.layers.{i}.self_attn.o_proj.weight", torch.randn((hidden_size, hidden_size), dtype=torch.float32)
yield f"model.layers.{i}.self_attn.q_proj.weight", torch.randn((hidden_size, hidden_size), dtype=torch.float32)
yield f"model.layers.{i}.self_attn.rotary_emb.inv_freq", torch.randn((hidden_size // (num_attention_heads * 2),), dtype=torch.float32)
yield f"model.layers.{i}.self_attn.v_proj.weight", torch.randn((hidden_size, hidden_size), dtype=torch.float32)
tensor = torch.randn((hidden_size,), dtype=torch.float32)
yield("model.norm.weight", tensor)
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
# quant weight to i2 (in fp16)
if name.endswith(("q_proj.weight", "k_proj.weight", "v_proj.weight",
"down_proj.weight", "up_proj.weight", "gate_proj.weight",
"o_proj.weight")):
data_torch = self.weight_quant(data_torch)
return [(self.map_tensor_name(name), data_torch)]
def write_tensors(self):
max_name_len = max(len(s) for _, s in self.tensor_map.mapping.values()) + len(".weight,")
for name, data_torch in self.generate_tensors():
# we don't need these
if name.endswith((".attention.masked_bias", ".attention.bias", ".rotary_emb.inv_freq")):
continue
old_dtype = data_torch.dtype
# convert any unsupported data types to float32
if data_torch.dtype not in (torch.float16, torch.float32):
data_torch = data_torch.to(torch.float32)
# use the first number-like part of the tensor name as the block id
bid = None
for part in name.split("."):
if part.isdecimal():
bid = int(part)
break
for new_name, data in ((n, d.squeeze().numpy()) for n, d in self.modify_tensors(data_torch, name, bid)):
data: np.ndarray = data # type hint
data_shape = data.shape
n_dims = len(data.shape)
data_dtype = data.dtype
data_qtype: gguf.GGMLQuantizationType | None = None
# when both are True, f32 should win
# extra_f32 = self.extra_f32_tensors(name, new_name, bid, n_dims)
# extra_f16 = self.extra_f16_tensors(name, new_name, bid, n_dims)
extra_f32 = False
extra_f16 = False
# Most of the codebase that takes in 1D tensors or norms only handles F32 tensors
# Conditions should closely match those in llama_model_quantize_internal in llama.cpp
extra_f32 = any(cond for cond in (
extra_f32,
n_dims == 1,
new_name.endswith("_norm.weight"),
))
# Some tensor types are always in float32
extra_f32 = extra_f32 or any(self.match_model_tensor_name(new_name, key, bid) for key in (
gguf.MODEL_TENSOR.FFN_GATE_INP,
gguf.MODEL_TENSOR.POS_EMBD,
gguf.MODEL_TENSOR.TOKEN_TYPES,
# for debug / delete when inference
gguf.MODEL_TENSOR.TOKEN_EMBD,
))
# if f16 desired, convert any float32 2-dim weight tensors to float16
extra_f16 = any(cond for cond in (
extra_f16,
(name.endswith(".weight") and n_dims >= 2),
))
suit_i2 = True
if name.endswith('embed_tokens.weight') or name.endswith('norm.weight'):
suit_i2 = False
i2_scale = None
if self.ftype != gguf.GGMLQuantizationType.F32 and extra_f16 and not extra_f32:
if self.ftype == gguf.GGMLQuantizationType.TL1 and suit_i2:
data, i2_scale = self.transform_to_tl1(data)
assert data.dtype == np.uint8
assert i2_scale.dtype == np.float32
data_qtype = gguf.GGMLQuantizationType.TL1
elif self.ftype == gguf.GGMLQuantizationType.TL2 and suit_i2:
data, i2_scale = self.transform_to_tl2(data)
assert data.dtype == np.uint8
assert i2_scale.dtype == np.float32
data_qtype = gguf.GGMLQuantizationType.TL2
else: # default to float16 for quantized tensors
if data_dtype != np.float16:
data = data.astype(np.float16)
data_qtype = gguf.GGMLQuantizationType.F16
if data_qtype is None: # by default, convert to float32
if data_dtype != np.float32:
data = data.astype(np.float32)
data_qtype = gguf.GGMLQuantizationType.F32
shape = data_shape
# shape = gguf.quant_shape_from_byte_shape(data.shape, data_qtype) if data.dtype == np.uint8 else data.shape
# reverse shape to make it similar to the internal ggml dimension order
shape_str = f"{{{', '.join(str(n) for n in reversed(shape))}}}"
# n_dims is implicit in the shape
logger.info(f"{f'%-{max_name_len}s' % f'{new_name},'} {old_dtype} --> {data_qtype.name}, shape = {shape_str}")
self.gguf_writer.add_tensor(new_name, data, raw_shape=shape, raw_dtype=data_qtype)
if i2_scale is not None:
self.gguf_writer.add_tensor(new_name + "_scale", i2_scale, raw_dtype=gguf.GGMLQuantizationType.F32)
ftype_map = {
"f32": gguf.GGMLQuantizationType.F32,
"f16": gguf.GGMLQuantizationType.F16,
"tl1" : gguf.GGMLQuantizationType.TL1,
"tl2" : gguf.GGMLQuantizationType.TL2,
}
def main() -> None:
dir_model = args.model
fname_out = args.outfile
model_size = args.model_size
hparams = Model.load_hparams(dir_model)
with torch.inference_mode():
model_class = Model.from_model_architecture(hparams["architectures"][0])
model_instance = model_class(dir_model, ftype_map[args.outtype], fname_out, args.bigendian, args.use_temp_file)
model_instance.set_params(model_size)
logger.info("Set model parameters")
model_instance.set_gguf_parameters()
logger.info("Set model tokenizer")
model_instance.set_vocab()
if args.vocab_only:
logger.info(f"Exporting model vocab to '{fname_out}'")
model_instance.write_vocab()
else:
logger.info(f"Exporting model to '{fname_out}'")
model_instance.write()
logger.info(f"Model successfully exported to '{fname_out}'")
def read_gguf_file(gguf_file_path):
"""
Reads and prints key-value pairs and tensor information from a GGUF file in an improved format.
Parameters:
- gguf_file_path: Path to the GGUF file.
"""
reader = GGUFReader(gguf_file_path)
# List all key-value pairs in a columnized format
print("Key-Value Pairs:") # noqa: NP100
max_key_length = max(len(key) for key in reader.fields.keys())
for key, field in reader.fields.items():
value = field.parts[field.data[0]]
print(f"{key:{max_key_length}} : {value}") # noqa: NP100
print("----") # noqa: NP100
# List all tensors
print("Tensors:") # noqa: NP100
tensor_info_format = "{:<30} | Shape: {:<15} | Size: {:<12} | Quantization: {}"
print(tensor_info_format.format("Tensor Name", "Shape", "Size", "Quantization")) # noqa: NP100
print("-" * 80) # noqa: NP100
for tensor in reader.tensors:
shape_str = "x".join(map(str, tensor.shape))
size_str = str(tensor.n_elements)
quantization_str = tensor.tensor_type.name
print(tensor_info_format.format(tensor.name, shape_str, size_str, quantization_str)) # noqa: NP100
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Generate a dummy bitnet model with GGUF format")
parser.add_argument(
"--vocab-only", action="store_true",
help="extract only the vocab",
)
parser.add_argument(
"--outfile", type=Path,
help="path to write to; default: based on input",
)
parser.add_argument(
"--outtype", type=str, choices=ftype_map.keys(), default="f16",
help="output format - use f32 for float32, f16 for float16",
)
parser.add_argument("--bigendian", action="store_true", help="model is executed on big endian machine")
parser.add_argument(
"model", type=Path,
help="directory containing model file",
)
parser.add_argument("--use-temp-file", action="store_true", help="use the tempfile library while processing (helpful when running out of memory, process killed)")
parser.add_argument("--model-name", type=str, default=None, help="name of the model")
parser.add_argument("--model-size", type=str, default="7B", help="size of the model")
parser.add_argument("--verbose", action="store_true", help="increase output verbosity")
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
main()