Files
BitNet/utils/convert-hf-to-gguf-bitnet.py
2024-12-09 16:45:31 +00:00

1166 lines
50 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import logging
import argparse
import contextlib
import json
import os
import re
import sys
from abc import ABC, abstractmethod
from enum import IntEnum
from pathlib import Path
from hashlib import sha256
from typing import TYPE_CHECKING, Any, Callable, ContextManager, Iterator, Sequence, TypeVar, cast
import configparser
import numpy as np
import torch
if TYPE_CHECKING:
from torch import Tensor
if 'NO_LOCAL_GGUF' not in os.environ:
sys.path.insert(1, str(Path(__file__).parent / 'gguf-py'))
import gguf
from convert import LlamaHfVocab, permute
logger = logging.getLogger("hf-to-gguf")
###### MODEL DEFINITIONS ######
class SentencePieceTokenTypes(IntEnum):
NORMAL = 1
UNKNOWN = 2
CONTROL = 3
USER_DEFINED = 4
UNUSED = 5
BYTE = 6
AnyModel = TypeVar("AnyModel", bound="type[Model]")
class Model(ABC):
_model_classes: dict[str, type[Model]] = {}
def __init__(self, dir_model: Path, ftype: int, fname_out: Path, is_big_endian: bool, use_temp_file: bool):
self.dir_model = dir_model
self.ftype = ftype
self.fname_out = fname_out
self.is_big_endian = is_big_endian
self.endianess = gguf.GGUFEndian.BIG if is_big_endian else gguf.GGUFEndian.LITTLE
self.use_temp_file = use_temp_file
self.is_safetensors = self._is_model_safetensors()
self.num_parts = Model.count_model_parts(self.dir_model, ".safetensors" if self.is_safetensors else ".bin")
self.part_names = self._get_part_names()
self.hparams = Model.load_hparams(self.dir_model)
self.gguf_writer = gguf.GGUFWriter(fname_out, gguf.MODEL_ARCH_NAMES[self.model_arch], endianess=self.endianess, use_temp_file=self.use_temp_file)
self.block_count = self.find_hparam(["n_layers", "num_hidden_layers", "n_layer"])
self.tensor_map = gguf.get_tensor_name_map(self.model_arch, self.block_count)
@property
@abstractmethod
def model_arch(self) -> gguf.MODEL_ARCH:
pass
def find_hparam(self, keys: Sequence[str], optional: bool = False) -> Any:
key = next((k for k in keys if k in self.hparams), None)
if key is not None:
return self.hparams[key]
if optional:
return None
raise KeyError(f"could not find any of: {keys}")
def set_vocab(self):
self._set_vocab_gpt2()
def get_tensors(self) -> Iterator[tuple[str, Tensor]]:
for part_name in self.part_names:
logger.info(f"gguf: loading model part '{part_name}'")
ctx: ContextManager[Any]
if self.is_safetensors:
from safetensors import safe_open
ctx = cast(ContextManager[Any], safe_open(self.dir_model / part_name, framework="pt", device="cpu"))
else:
ctx = contextlib.nullcontext(torch.load(str(self.dir_model / part_name), map_location="cpu", mmap=True, weights_only=True))
with ctx as model_part:
for name in model_part.keys():
data = model_part.get_tensor(name) if self.is_safetensors else model_part[name]
yield name, data
def match_model_tensor_name(self, name: str, key: gguf.MODEL_TENSOR, bid: int | None, suffix: str = ".weight") -> bool:
if key not in gguf.MODEL_TENSORS[self.model_arch]:
return False
key_name: str = gguf.TENSOR_NAMES[key]
if "{bid}" in key_name:
if bid is None:
return False
key_name = key_name.format(bid=bid)
else:
if bid is not None:
return False
return name == (key_name + suffix)
def map_tensor_name(self, name: str, try_suffixes: Sequence[str] = (".weight", ".bias")) -> str:
new_name = self.tensor_map.get_name(key=name, try_suffixes=try_suffixes)
if new_name is None:
raise ValueError(f"Can not map tensor {name!r}")
return new_name
def set_gguf_parameters(self):
self.gguf_writer.add_name(self.dir_model.name)
self.gguf_writer.add_block_count(self.block_count)
if (n_ctx := self.find_hparam(["max_position_embeddings", "n_ctx"], optional=True)) is not None:
self.gguf_writer.add_context_length(n_ctx)
logger.info(f"gguf: context length = {n_ctx}")
n_embd = self.find_hparam(["hidden_size", "n_embd"])
self.gguf_writer.add_embedding_length(n_embd)
logger.info(f"gguf: embedding length = {n_embd}")
if (n_ff := self.find_hparam(["intermediate_size", "n_inner"], optional=True)) is not None:
self.gguf_writer.add_feed_forward_length(n_ff)
logger.info(f"gguf: feed forward length = {n_ff}")
n_head = self.find_hparam(["num_attention_heads", "n_head"])
self.gguf_writer.add_head_count(n_head)
logger.info(f"gguf: head count = {n_head}")
if (n_head_kv := self.hparams.get("num_key_value_heads")) is not None:
self.gguf_writer.add_head_count_kv(n_head_kv)
logger.info(f"gguf: key-value head count = {n_head_kv}")
if (rope_theta := self.hparams.get("rope_theta")) is not None:
self.gguf_writer.add_rope_freq_base(rope_theta)
logger.info(f"gguf: rope theta = {rope_theta}")
if (f_rms_eps := self.hparams.get("rms_norm_eps")) is not None:
self.gguf_writer.add_layer_norm_rms_eps(f_rms_eps)
logger.info(f"gguf: rms norm epsilon = {f_rms_eps}")
if (f_norm_eps := self.find_hparam(["layer_norm_eps", "layer_norm_epsilon", "norm_epsilon"], optional=True)) is not None:
self.gguf_writer.add_layer_norm_eps(f_norm_eps)
logger.info(f"gguf: layer norm epsilon = {f_norm_eps}")
if (n_experts := self.hparams.get("num_local_experts")) is not None:
self.gguf_writer.add_expert_count(n_experts)
logger.info(f"gguf: expert count = {n_experts}")
if (n_experts_used := self.hparams.get("num_experts_per_tok")) is not None:
self.gguf_writer.add_expert_used_count(n_experts_used)
logger.info(f"gguf: experts used count = {n_experts_used}")
self.gguf_writer.add_file_type(self.ftype)
logger.info(f"gguf: file type = {self.ftype}")
def write_tensors(self):
block_count = self.hparams.get("n_layers", self.hparams.get("num_hidden_layers", self.hparams.get("n_layer")))
tensor_map = gguf.get_tensor_name_map(self.model_arch, block_count)
for name, data_torch in self.get_tensors():
# we don't need these
if name.endswith((".attention.masked_bias", ".attention.bias", ".attention.rotary_emb.inv_freq")):
continue
old_dtype = data_torch.dtype
# convert any unsupported data types to float32
if data_torch.dtype not in (torch.float16, torch.float32):
data_torch = data_torch.to(torch.float32)
data = data_torch.squeeze().numpy()
# map tensor names
new_name = tensor_map.get_name(name, try_suffixes=(".weight", ".bias"))
if new_name is None:
raise ValueError(f"Can not map tensor {name!r}")
n_dims = len(data.shape)
data_dtype = data.dtype
# if f32 desired, convert any float16 to float32
if self.ftype == 0 and data_dtype == np.float16:
data = data.astype(np.float32)
# TODO: Why cant we use these float16 as-is? There should be not reason to store float16 as float32
if self.ftype == 1 and data_dtype == np.float16 and (n_dims == 1 or new_name.endswith("_norm.weight")):
data = data.astype(np.float32)
# if f16 desired, convert any float32 2-dim weight tensors to float16
if self.ftype == 1 and data_dtype == np.float32 and name.endswith(".weight") and n_dims == 2:
data = data.astype(np.float16)
logger.info(f"{new_name}, n_dims = {n_dims}, {old_dtype} --> {data.dtype}")
self.gguf_writer.add_tensor(new_name, data)
def write(self):
self.write_tensors()
self.gguf_writer.write_header_to_file()
self.gguf_writer.write_kv_data_to_file()
self.gguf_writer.write_tensors_to_file()
self.gguf_writer.close()
def write_vocab(self):
self.gguf_writer.write_header_to_file()
self.gguf_writer.write_kv_data_to_file()
self.gguf_writer.close()
@staticmethod
def count_model_parts(dir_model: Path, prefix: str) -> int:
num_parts = 0
for filename in os.listdir(dir_model):
if filename.endswith(prefix):
num_parts += 1
return num_parts
@staticmethod
def load_hparams(dir_model):
with open(dir_model / "config.json", "r", encoding="utf-8") as f:
return json.load(f)
@classmethod
def register(cls, *names: str) -> Callable[[AnyModel], AnyModel]:
assert names
def func(modelcls: type[Model]):
for name in names:
cls._model_classes[name] = modelcls
return modelcls
return func
@classmethod
def from_model_architecture(cls, arch):
try:
return cls._model_classes[arch]
except KeyError:
raise NotImplementedError(f'Architecture {arch!r} not supported!') from None
def _is_model_safetensors(self) -> bool:
return Model.count_model_parts(self.dir_model, ".safetensors") > 0
def _get_part_names(self):
if self.is_safetensors:
if self.num_parts == 1: # there's only one .safetensors file
return ("model.safetensors",)
return (f"model-{n:05}-of-{self.num_parts:05}.safetensors" for n in range(1, self.num_parts + 1))
if self.num_parts == 1: # there's only one .bin file
return ("pytorch_model.bin",)
return (f"pytorch_model-{n:05}-of-{self.num_parts:05}.bin" for n in range(1, self.num_parts + 1))
# used for GPT-2 BPE and WordPiece vocabs
def get_vocab_base(self) -> tuple[list[str], list[int], str]:
tokens: list[str] = []
toktypes: list[int] = []
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(self.dir_model)
vocab_size = self.hparams.get("vocab_size", len(tokenizer.vocab))
assert max(tokenizer.vocab.values()) < vocab_size
tokpre = self.get_vocab_base_pre(tokenizer)
reverse_vocab = {id_: encoded_tok for encoded_tok, id_ in tokenizer.vocab.items()}
added_vocab = tokenizer.get_added_vocab()
for i in range(vocab_size):
if i not in reverse_vocab:
tokens.append(f"[PAD{i}]")
toktypes.append(gguf.TokenType.USER_DEFINED)
elif reverse_vocab[i] in added_vocab:
# We need to manually encode and decode the added tokens in case special characters
# used for `\n` / `\t` have been manually added in the added tokens
encoded_decoded_token = tokenizer.decode(tokenizer.encode(reverse_vocab[i]))
tokens.append(encoded_decoded_token)
if tokenizer.added_tokens_decoder[i].special:
toktypes.append(gguf.TokenType.CONTROL)
else:
toktypes.append(gguf.TokenType.USER_DEFINED)
else:
tokens.append(reverse_vocab[i])
toktypes.append(gguf.TokenType.NORMAL)
return tokens, toktypes, tokpre
# NOTE: this function is generated by convert-hf-to-gguf-update.py
# do not modify it manually!
# ref: https://github.com/ggerganov/llama.cpp/pull/6920
def get_vocab_base_pre(self, tokenizer) -> str:
# encoding this string and hashing the resulting tokens would (hopefully) give us a unique identifier that
# is specific for the BPE pre-tokenizer used by the model
# we will use this unique identifier to write a "tokenizer.ggml.pre" entry in the GGUF file which we can
# use in llama.cpp to implement the same pre-tokenizer
chktxt = '\n \n\n \n\n\n \t \t\t \t\n \n \n \n \n🚀 (normal) 😶\u200d🌫️ (multiple emojis concatenated) ✅ 🦙🦙 3 33 333 3333 33333 333333 3333333 33333333 3.3 3..3 3...3 កាន់តែពិសេសអាច😁 ?我想在apple工作1314151天~ ------======= нещо на Български \'\'\'\'\'\'```````""""......!!!!!!?????? I\'ve been \'told he\'s there, \'RE you sure? \'M not sure I\'ll make it, \'D you like some tea? We\'Ve a\'lL'
chktok = tokenizer.encode(chktxt)
chkhsh = sha256(str(chktok).encode()).hexdigest()
logger.debug(f"chktok: {chktok}")
logger.debug(f"chkhsh: {chkhsh}")
res = None
# NOTE: if you get an error here, you need to update the convert-hf-to-gguf-update.py script
# or pull the latest version of the model from Huggingface
# don't edit the hashes manually!
if chkhsh == "0ef9807a4087ebef797fc749390439009c3b9eda9ad1a097abbe738f486c01e5":
# ref: https://huggingface.co/meta-llama/Meta-Llama-3-8B
res = "llama-bpe"
if chkhsh == "049ecf7629871e3041641907f3de7c733e4dbfdc736f57d882ba0b0845599754":
# ref: https://huggingface.co/deepseek-ai/deepseek-llm-7b-base
res = "deepseek-llm"
if chkhsh == "347715f544604f9118bb75ed199f68779f423cabb20db6de6f31b908d04d7821":
# ref: https://huggingface.co/deepseek-ai/deepseek-coder-6.7b-base
res = "deepseek-coder"
if chkhsh == "8aeee3860c56296a157a1fe2fad249ec40aa59b1bb5709f4ade11c4e6fe652ed":
# ref: https://huggingface.co/tiiuae/falcon-7b
res = "falcon"
if chkhsh == "0876d13b50744004aa9aeae05e7b0647eac9d801b5ba4668afc01e709c15e19f":
# ref: https://huggingface.co/BAAI/bge-small-en-v1.5
res = "bert-bge"
if chkhsh == "b6dc8df998e1cfbdc4eac8243701a65afe638679230920b50d6f17d81c098166":
# ref: https://huggingface.co/mosaicml/mpt-7b
res = "mpt"
if chkhsh == "35d91631860c815f952d711435f48d356ebac988362536bed955d43bfa436e34":
# ref: https://huggingface.co/bigcode/starcoder2-3b
res = "starcoder"
if chkhsh == "3ce83efda5659b07b1ad37ca97ca5797ea4285d9b9ab0dc679e4a720c9da7454":
# ref: https://huggingface.co/openai-community/gpt2
res = "gpt-2"
if chkhsh == "6221ad2852e85ce96f791f476e0b390cf9b474c9e3d1362f53a24a06dc8220ff":
# ref: https://huggingface.co/smallcloudai/Refact-1_6-base
res = "refact"
if chkhsh == "9c2227e4dd922002fb81bde4fc02b0483ca4f12911410dee2255e4987644e3f8":
# ref: https://huggingface.co/CohereForAI/c4ai-command-r-v01
res = "command-r"
if chkhsh == "9d032fcbd5501f4a38150912590928bfb36091efb5df11b8e2124b0390e3fb1e":
res = "falcon3"
if res is None:
logger.warning("\n")
logger.warning("**************************************************************************************")
logger.warning("** WARNING: The BPE pre-tokenizer was not recognized!")
logger.warning("** There are 2 possible reasons for this:")
logger.warning("** - the model has not been added to convert-hf-to-gguf-update.py yet")
logger.warning("** - the pre-tokenization config has changed upstream")
logger.warning("** Check your model files and convert-hf-to-gguf-update.py and update them accordingly.")
logger.warning("** ref: https://github.com/ggerganov/llama.cpp/pull/6920")
logger.warning("**")
logger.warning(f"** chkhsh: {chkhsh}")
logger.warning("**************************************************************************************")
logger.warning("\n")
raise NotImplementedError("BPE pre-tokenizer was not recognized - update get_vocab_base_pre()")
logger.debug(f"tokenizer.ggml.pre: {repr(res)}")
logger.debug(f"chkhsh: {chkhsh}")
return res
def _set_vocab_gpt2(self) -> None:
tokens, toktypes, tokpre = self.get_vocab_base()
self.gguf_writer.add_tokenizer_model("gpt2")
self.gguf_writer.add_tokenizer_pre(tokpre)
self.gguf_writer.add_token_list(tokens)
self.gguf_writer.add_token_types(toktypes)
special_vocab = gguf.SpecialVocab(self.dir_model, load_merges=True)
special_vocab.add_to_gguf(self.gguf_writer)
def _set_vocab_sentencepiece(self):
from sentencepiece import SentencePieceProcessor
tokenizer_path = self.dir_model / 'tokenizer.model'
tokens: list[bytes] = []
scores: list[float] = []
toktypes: list[int] = []
if not tokenizer_path.is_file():
raise FileNotFoundError(f"File not found: {tokenizer_path}")
tokenizer = SentencePieceProcessor(str(tokenizer_path))
vocab_size = self.hparams.get('vocab_size', tokenizer.vocab_size())
for token_id in range(tokenizer.vocab_size()):
piece = tokenizer.id_to_piece(token_id)
text = piece.encode("utf-8")
score = tokenizer.get_score(token_id)
toktype = SentencePieceTokenTypes.NORMAL
if tokenizer.is_unknown(token_id):
toktype = SentencePieceTokenTypes.UNKNOWN
elif tokenizer.is_control(token_id):
toktype = SentencePieceTokenTypes.CONTROL
elif tokenizer.is_unused(token_id):
toktype = SentencePieceTokenTypes.UNUSED
elif tokenizer.is_byte(token_id):
toktype = SentencePieceTokenTypes.BYTE
tokens.append(text)
scores.append(score)
toktypes.append(toktype)
added_tokens_file = self.dir_model / 'added_tokens.json'
if added_tokens_file.is_file():
with open(added_tokens_file, "r", encoding="utf-8") as f:
added_tokens_json = json.load(f)
for key in added_tokens_json:
key = key.encode("utf-8")
if key not in tokens:
tokens.append(key)
scores.append(-1000.0)
toktypes.append(SentencePieceTokenTypes.USER_DEFINED)
if vocab_size > len(tokens):
pad_count = vocab_size - len(tokens)
logger.debug(f"Padding vocab with {pad_count} token(s) - [PAD1] through [PAD{pad_count}]")
for i in range(1, pad_count + 1):
tokens.append(f"[PAD{i}]")
scores.append(-1000.0)
toktypes.append(SentencePieceTokenTypes.UNUSED)
assert len(tokens) == vocab_size
self.gguf_writer.add_tokenizer_model("llama")
self.gguf_writer.add_tokenizer_pre("default")
self.gguf_writer.add_token_list(tokens)
self.gguf_writer.add_token_scores(scores)
self.gguf_writer.add_token_types(toktypes)
special_vocab = gguf.SpecialVocab(self.dir_model, n_vocab=len(tokens))
special_vocab.add_to_gguf(self.gguf_writer)
def _set_vocab_llama_hf(self):
vocab = LlamaHfVocab(self.dir_model)
tokens = []
scores = []
toktypes = []
for text, score, toktype in vocab.all_tokens():
tokens.append(text)
scores.append(score)
toktypes.append(toktype)
assert len(tokens) == vocab.vocab_size
self.gguf_writer.add_tokenizer_model("llama")
self.gguf_writer.add_tokenizer_pre("default")
self.gguf_writer.add_token_list(tokens)
self.gguf_writer.add_token_scores(scores)
self.gguf_writer.add_token_types(toktypes)
special_vocab = gguf.SpecialVocab(self.dir_model, n_vocab=len(tokens))
special_vocab.add_to_gguf(self.gguf_writer)
# TL1
def process_tl1(weight, BM, BY, bm, by, M, K):
weight = weight.reshape((M, K // 2)).astype(np.uint8)
weight = weight.reshape((M // BM, BM, K // 2)).transpose(0, 2, 1)
weight = weight.reshape((M // BM, K // BY, BY // 2, BM)).transpose(0, 1, 3, 2)
weight = weight.reshape((M // BM, K // BY, BM // bm, bm, BY // 2)).transpose(0, 1, 2, 4, 3)
weight = weight.reshape((M // BM, K // BY, BM // bm, BY // by, by // 2, bm)).transpose(0, 1, 2, 3, 5, 4)
weight = weight.reshape((M // BM, K // BY, BM // bm, BY // by, bm // 16, 16, by // 2)).transpose(0, 1, 2, 3, 4, 6, 5)
weight = weight.reshape((M // BM, K // BY, BM // bm, BY // by, bm // 16, by // 4, 4 // 2, 16)).transpose(0, 1, 2, 3, 4, 5, 7, 6)
weight = weight.reshape((M * K // 16 // 4, 16, 4 // 2))
weight_0 = weight[:, :, 0] << 4
weight_1 = weight[:, :, 1]
weight = weight_0 + weight_1
return weight
def preprocess_weights_tl1(
w: np.ndarray,
bits = 2,
g = 4,
) -> Tuple[np.ndarray, np.ndarray]:
from configparser import ConfigParser
config = ConfigParser()
M, K = w.shape
weight = w
weight = np.where(np.abs(weight) < 1e-6, 0, weight).astype(np.float32)
weight = np.sign(weight)
weight_num = np.prod(weight.shape)
config.read('include/kernel_config.ini')
BM = -1
BY = -1
bm = -1
for kernel in config.sections():
if int(config.get(kernel, 'm')) == M and int(config.get(kernel, 'k')) == K:
BM = int(config.get(kernel, 'bm'))
BY = int(config.get(kernel, 'bk'))
bm = int(config.get(kernel, 'bmm'))
by = 256 // bm
break
if BM == -1:
raise NotImplementedError
weight = np.reshape(weight, (weight_num // 2, 2))
hi_weight = np.multiply(np.split(weight, 2, axis=1)[0], 3)
lo_weight = np.split(weight, 2, axis=1)[1]
weight = np.reshape((hi_weight + lo_weight), weight_num // 2)
weight = weight + 4
weight = np.reshape(weight, (M, K // 2)).astype(np.uint8)
weight = process_tl1(weight, BM, BY, bm, by, M, K)
return weight
def preprocess_two_weights_tl2(M, K, weight_num, BM, BY, bm, by, weight, final_weight):
weight = np.reshape(weight, (weight_num // 2, 2))
hi_weight = np.multiply(np.split(weight, 2, axis=1)[0], 3)
lo_weight = np.split(weight, 2, axis=1)[1]
weight = np.reshape((hi_weight + lo_weight), weight_num // 2)
weight = weight + 4
weight = np.reshape(weight, (M, K // 2)).astype(np.uint8)
weight = weight.reshape((M // BM, BM, K // 2)).transpose(0, 2, 1)
weight = weight.reshape((M // BM, K // BY, BY // 2, BM)).transpose(0, 1, 3, 2)
weight = weight.reshape((M // BM, K // BY, BM // bm, bm, BY // 2)).transpose(0, 1, 2, 4, 3)
weight = weight.reshape((M // BM, K // BY, BM // bm, BY // by, by // 2, bm)).transpose(0, 1, 2, 3, 5, 4)
weight = weight.reshape((M // BM, K // BY, BM // bm, BY // by, bm, by // 2))
weight_0 = weight[:, :, :, :, :, 0]
weight_1 = weight[:, :, :, :, :, 1]
weight_0 = weight_0 << 4
weight_1 = weight_1
weight = weight_0 + weight_1
weight = weight.reshape((M * K // bm // by, bm // 8, 8))
weight[:, [0, 1, 2, 3], :] = weight[:, [0, 2, 1, 3], :]
weight = weight.reshape(M * K // bm // by, bm)
for i in range(weight.shape[0]):
final_weight.append(weight[i, :])
def preprocess_three_weights_tl2(M, K, weight_num, BM, BY, bm, by, weight, final_weight):
weight = np.reshape(weight, (weight_num // 3, 3))
split_weights = np.split(weight, 3, axis=1)
first_weight = np.multiply(split_weights[0], 9)
second_weight = np.multiply(split_weights[1], 3)
third_weight = split_weights[2]
weight = np.reshape((first_weight + second_weight + third_weight), weight_num // 3)
sign_weight = np.sign(weight) + 2
sign_weight = np.where(sign_weight > 1, 0, sign_weight)
weight = np.abs(weight)
weight = np.reshape(weight, (M, K // 3)).astype(np.uint8)
sign_weight = np.reshape(sign_weight, (M, K // 3)).astype(np.uint8)
weight = weight.reshape((M // BM, BM, K // 3)).transpose(0, 2, 1)
weight = weight.reshape((M // BM, K // BY, BY // 3, BM)).transpose(0, 1, 3, 2)
weight = weight.reshape((M // BM, K // BY, BM // bm, bm, BY // 3)).transpose(0, 1, 2, 4, 3)
weight = weight.reshape((M // BM, K // BY, BM // bm, BY // by, by // 3, bm)).transpose(0, 1, 2, 3, 5, 4)
weight = weight.reshape((M // BM, K // BY, BM // bm, BY // by, bm, by // 3))
weight_0 = weight[:, :, :, :, :, 0]
weight_1 = weight[:, :, :, :, :, 1]
weight_0 = weight_0 << 4
weight_1 = weight_1
weight = weight_0 + weight_1
weight = weight.reshape((M * K // bm // by, bm // 8, 8))
weight[:, [0, 1, 2, 3], :] = weight[:, [0, 2, 1, 3], :]
weight = weight.reshape(M * K // bm // by, bm)
for i in range(weight.shape[0]):
final_weight.append(weight[i, :])
sign_weight = sign_weight.reshape((M // BM, BM, K // 3)).transpose(0, 2, 1)
sign_weight = sign_weight.reshape((M // BM, K // BY, BY // 3, BM)).transpose(0, 1, 3, 2)
sign_weight = sign_weight.reshape((M // BM, K // BY, BM // bm, bm, BY // 3)).transpose(0, 1, 2, 4, 3)
sign_weight = sign_weight.reshape((M // BM, K // BY, BM // bm, BY // (by * 4), by // 3 * 4, bm)).transpose(0, 1, 2, 3, 5, 4)
sign_weight = sign_weight.reshape((M // BM, K // BY, BM // bm, BY // (by * 4), bm, by // 3 * 4)).transpose(0, 1, 2, 3, 5, 4)
sign_weight = sign_weight.reshape((M // BM, K // BY, BM // bm, BY // (by * 4), by // 3 * 8, bm // 2)).astype(np.uint16)
combine_weight = np.zeros((M // BM, K // BY, BM // bm, BY // (by * 4), bm // 2), dtype=np.uint16)
for i in range(16):
temp_weight = sign_weight[:, :, :, :, i, :] << 15 - i
combine_weight += temp_weight
combine_weight = combine_weight.view(np.uint8)
combine_weight = combine_weight.reshape((M * K // bm // (by * 4)), bm)
for i in range(combine_weight.shape[0]):
final_weight.append(combine_weight[i, :])
def preprocess_weights_tl2(
w: np.ndarray,
bits = 2,
g = 4,
) -> Tuple[np.ndarray, np.ndarray]:
from configparser import ConfigParser
config = ConfigParser()
M, K = w.shape
weight = w
weight = np.where(np.abs(weight) < 1e-6, 0, weight).astype(np.float32)
weight = np.sign(weight)
weight_num = np.prod(weight.shape)
config.read('include/kernel_config.ini')
BM = -1
BY = -1
bm = -1
for kernel in config.sections():
if int(config.get(kernel, 'm')) == M and int(config.get(kernel, 'k')) == K:
BM = int(config.get(kernel, 'bm'))
BY = int(config.get(kernel, 'bk'))
bm = int(config.get(kernel, 'bmm'))
by = 192 // bm
break
if BM == -1:
raise NotImplementedError
if (weight.shape[1] % BY != 0):
slice_k_idx = weight.shape[1] - weight.shape[1] % BY
slice_weights = np.split(weight, [slice_k_idx], axis=1)
three_weight = slice_weights[0]
two_weight = slice_weights[1]
else:
three_weight = weight
final_weight = []
preprocess_three_weights_tl2(three_weight.shape[0],
three_weight.shape[1],
three_weight.shape[0] * three_weight.shape[1],
BM,
BY,
bm,
by,
three_weight,
final_weight)
if (weight.shape[1] % BY != 0):
preprocess_two_weights_tl2( two_weight.shape[0],
two_weight.shape[1],
two_weight.shape[0] * two_weight.shape[1],
BM,
32,
32,
4,
two_weight,
final_weight)
weight = np.array(final_weight, dtype=np.uint8).reshape(-1)
weight = np.pad(weight, (0, (K - 256) * M // 3 * 5 // 8 + 256 * M // 2 * 4 // 8 -
weight.shape[0]), mode='constant', constant_values=0)
return weight
def transform_to_tl1(x: np.ndarray):
scale = np.max(np.abs(x))
# res = np.round(x / scale + 2).astype(np.uint8)
res = preprocess_weights_tl1(x)
return res, scale
def transform_to_tl2(x: np.ndarray):
scale = np.max(np.abs(x))
# res = np.round(x / scale + 2).astype(np.uint8)
res = preprocess_weights_tl2(x)
return res, scale
def read_model_config(model_dir: str) -> dict[str, Any]:
config = os.path.join(model_dir, "config.json")
if not os.path.exists(config):
raise FileNotFoundError(f"Model config file not found: {config}")
with open(config, "r") as f:
return json.load(f)
@Model.register("LLaMAForCausalLM", "LlamaForCausalLM", "MistralForCausalLM", "MixtralForCausalLM")
class LlamaModel(Model):
model_arch = gguf.MODEL_ARCH.LLAMA
def set_vocab(self):
try:
self._set_vocab_sentencepiece()
except FileNotFoundError:
try:
self._set_vocab_llama_hf()
except (FileNotFoundError, TypeError):
# Llama 3
self._set_vocab_gpt2()
# Apply to CodeLlama only (and ignore for Llama 3 with a vocab size of 128256)
if self.hparams.get("vocab_size", 32000) == 32016:
special_vocab = gguf.SpecialVocab(
self.dir_model, load_merges=False,
special_token_types = ['prefix', 'suffix', 'middle', 'eot']
)
special_vocab._set_special_token("prefix", 32007)
special_vocab._set_special_token("suffix", 32008)
special_vocab._set_special_token("middle", 32009)
special_vocab._set_special_token("eot", 32010)
special_vocab.add_to_gguf(self.gguf_writer)
def write_tensors(self):
max_name_len = max(len(s) for _, s in self.tensor_map.mapping.values()) + len(".weight,")
scale_map = dict()
for name, data_torch in self.get_tensors():
if name.endswith(("weight_scale")):
data_torch = data_torch.to(torch.float32)
name = name.replace(".weight_scale", "")
scale_map[name] = data_torch
for name, data_torch in self.get_tensors():
if name.endswith(("weight_scale")):
continue
# we don't need these
if name.endswith((".attention.masked_bias", ".attention.bias", ".rotary_emb.inv_freq")):
continue
old_dtype = data_torch.dtype
# convert any unsupported data types to float32
if data_torch.dtype not in (torch.float16, torch.float32):
data_torch = data_torch.to(torch.float32)
if name.replace(".weight", "") in scale_map:
data_torch = data_torch.to(torch.uint8)
origin_shape = data_torch.shape
shift = torch.tensor([0, 2, 4, 6], dtype=torch.uint8).reshape((4, *(1 for _ in range(len(origin_shape)))))
data_torch = data_torch.unsqueeze(0).expand((4, *origin_shape)) >> shift
data_torch = data_torch & 3
data_torch = (data_torch.float() - 1).reshape((origin_shape[0] * 4, *origin_shape[1:]))
data_torch = data_torch / scale_map[name.replace(".weight", "")].float()
# use the first number-like part of the tensor name as the block id
bid = None
for part in name.split("."):
if part.isdecimal():
bid = int(part)
break
# old gguf bf16 not implenmented
# if data_torch.dtype == torch.bfloat16:
# for new_name, data in ((n, d) for n, d in self.modify_tensors(data_torch, name, bid)):
# shape_str = f"{{{', '.join(str(n) for n in reversed(data.shape))}}}"
# # n_dims is implicit in the shape
# logger.info(f"{f'%-{max_name_len}s' % f'{new_name},'} {old_dtype}, shape = {shape_str}")
# self.gguf_writer.add_tensor(new_name, data, raw_shape=data.shape, raw_dtype=data.dtype)
# continue
for new_name, data in ((n, d.squeeze().numpy()) for n, d in self.modify_tensors(data_torch, name, bid)):
data: np.ndarray = data # type hint
data_shape = data.shape
n_dims = len(data.shape)
data_dtype = data.dtype
data_qtype: gguf.GGMLQuantizationType | None = None
# when both are True, f32 should win
# extra_f32 = self.extra_f32_tensors(name, new_name, bid, n_dims)
# extra_f16 = self.extra_f16_tensors(name, new_name, bid, n_dims)
extra_f32 = False
extra_f16 = False
# Most of the codebase that takes in 1D tensors or norms only handles F32 tensors
# Conditions should closely match those in llama_model_quantize_internal in llama.cpp
extra_f32 = any(cond for cond in (
extra_f32,
n_dims == 1,
new_name.endswith("_norm.weight"),
))
# Some tensor types are always in float32
tensors_f32 = [
gguf.MODEL_TENSOR.FFN_GATE_INP,
gguf.MODEL_TENSOR.FFN_GATE_INP,
gguf.MODEL_TENSOR.POS_EMBD,
gguf.MODEL_TENSOR.TOKEN_TYPES,
]
if not args.quant_embd:
tensors_f32.append(gguf.MODEL_TENSOR.TOKEN_EMBD)
extra_f32 = extra_f32 or any(self.match_model_tensor_name(new_name, key, bid) for key in tensors_f32)
# if f16 desired, convert any float32 2-dim weight tensors to float16
extra_f16 = any(cond for cond in (
extra_f16,
(name.endswith(".weight") and n_dims >= 2),
))
suit_i2 = True
if name.endswith('lm_head.weight') or name.endswith('norm.weight') or name.endswith('embed_tokens.weight'):
suit_i2 = False
i2_scale = None
if self.ftype != gguf.GGMLQuantizationType.F32 and extra_f16 and not extra_f32:
if self.ftype == gguf.GGMLQuantizationType.TL1 and suit_i2:
data, i2_scale = transform_to_tl1(data)
assert data.dtype == np.uint8
assert i2_scale.dtype == np.float32
data_qtype = gguf.GGMLQuantizationType.TL1
elif self.ftype == gguf.GGMLQuantizationType.TL2 and suit_i2:
data, i2_scale = transform_to_tl2(data)
assert data.dtype == np.uint8
assert i2_scale.dtype == np.float32
data_qtype = gguf.GGMLQuantizationType.TL2
else: # default to float16 for quantized tensors
if data_dtype != np.float16:
data = data.astype(np.float16)
data_qtype = gguf.GGMLQuantizationType.F16
if data_qtype is None: # by default, convert to float32
if data_dtype != np.float32:
data = data.astype(np.float32)
data_qtype = gguf.GGMLQuantizationType.F32
shape = data_shape
# shape = gguf.quant_shape_from_byte_shape(data.shape, data_qtype) if data.dtype == np.uint8 else data.shape
# reverse shape to make it similar to the internal ggml dimension order
shape_str = f"{{{', '.join(str(n) for n in reversed(shape))}}}"
# n_dims is implicit in the shape
logger.info(f"{f'%-{max_name_len}s' % f'{new_name},'} {old_dtype} --> {data_qtype.name}, shape = {shape_str}")
self.gguf_writer.add_tensor(new_name, data, raw_shape=shape, raw_dtype=data_qtype)
if i2_scale is not None:
self.gguf_writer.add_tensor(new_name + "_scale", i2_scale, raw_dtype=gguf.GGMLQuantizationType.F32)
def set_gguf_parameters(self):
super().set_gguf_parameters()
hparams = self.hparams
self.gguf_writer.add_vocab_size(hparams["vocab_size"])
if "head_dim" in hparams:
rope_dim = hparams["head_dim"]
else:
rope_dim = hparams["hidden_size"] // hparams["num_attention_heads"]
self.gguf_writer.add_rope_dimension_count(rope_dim)
if self.hparams.get("rope_scaling") is not None and "factor" in self.hparams["rope_scaling"]:
if self.hparams["rope_scaling"].get("type") == "linear":
self.gguf_writer.add_rope_scaling_type(gguf.RopeScalingType.LINEAR)
self.gguf_writer.add_rope_scaling_factor(self.hparams["rope_scaling"]["factor"])
tokenizer_config_file = self.dir_model / 'tokenizer_config.json'
if tokenizer_config_file.is_file():
with open(tokenizer_config_file, "r", encoding="utf-8") as f:
tokenizer_config_json = json.load(f)
if "add_prefix_space" in tokenizer_config_json:
self.gguf_writer.add_add_space_prefix(tokenizer_config_json["add_prefix_space"])
# Apply to granite small models only
if self.hparams.get("vocab_size", 32000) == 49152:
self.gguf_writer.add_add_bos_token(False)
@staticmethod
def permute(weights: Tensor, n_head: int, n_head_kv: int | None):
if n_head_kv is not None and n_head != n_head_kv:
n_head = n_head_kv
return (weights.reshape(n_head, 2, weights.shape[0] // n_head // 2, *weights.shape[1:])
.swapaxes(1, 2)
.reshape(weights.shape))
_experts: list[dict[str, Tensor]] | None = None
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
n_head = self.hparams["num_attention_heads"]
n_kv_head = self.hparams.get("num_key_value_heads")
if name.endswith(("q_proj.weight", "q_proj.bias")):
data_torch = LlamaModel.permute(data_torch, n_head, n_head)
if name.endswith(("k_proj.weight", "k_proj.bias")):
data_torch = LlamaModel.permute(data_torch, n_head, n_kv_head)
# process the experts separately
if name.find("block_sparse_moe.experts") != -1:
n_experts = self.hparams["num_local_experts"]
assert bid is not None
if self._experts is None:
self._experts = [{} for _ in range(self.block_count)]
self._experts[bid][name] = data_torch
if len(self._experts[bid]) >= n_experts * 3:
tensors: list[tuple[str, Tensor]] = []
# merge the experts into a single 3d tensor
for wid in ["w1", "w2", "w3"]:
datas: list[Tensor] = []
for xid in range(n_experts):
ename = f"model.layers.{bid}.block_sparse_moe.experts.{xid}.{wid}.weight"
datas.append(self._experts[bid][ename])
del self._experts[bid][ename]
data_torch = torch.stack(datas, dim=0)
merged_name = f"layers.{bid}.feed_forward.experts.{wid}.weight"
new_name = self.map_tensor_name(merged_name)
tensors.append((new_name, data_torch))
return tensors
else:
return []
return [(self.map_tensor_name(name), data_torch)]
def generate_extra_tensors(self) -> Iterable[tuple[str, Tensor]]:
if rope_scaling := self.find_hparam(["rope_scaling"], optional=True):
if rope_scaling.get("rope_type", '').lower() == "llama3":
base = self.hparams.get("rope_theta", 10000.0)
dim = self.hparams.get("head_dim", self.hparams["hidden_size"] // self.hparams["num_attention_heads"])
freqs = 1.0 / (base ** (torch.arange(0, dim, 2, dtype=torch.float32) / dim))
factor = rope_scaling.get("factor", 8.0)
low_freq_factor = rope_scaling.get("low_freq_factor", 1.0)
high_freq_factor = rope_scaling.get("high_freq_factor", 4.0)
old_context_len = self.hparams.get("original_max_position_embeddings", 8192)
low_freq_wavelen = old_context_len / low_freq_factor
high_freq_wavelen = old_context_len / high_freq_factor
assert low_freq_wavelen != high_freq_wavelen
rope_factors = []
for freq in freqs:
wavelen = 2 * math.pi / freq
if wavelen < high_freq_wavelen:
rope_factors.append(1)
elif wavelen > low_freq_wavelen:
rope_factors.append(factor)
else:
smooth = (old_context_len / wavelen - low_freq_factor) / (high_freq_factor - low_freq_factor)
rope_factors.append(1 / ((1 - smooth) / factor + smooth))
yield (self.format_tensor_name(gguf.MODEL_TENSOR.ROPE_FREQS), torch.tensor(rope_factors, dtype=torch.float32))
def prepare_tensors(self):
super().prepare_tensors()
if self._experts is not None:
# flatten `list[dict[str, Tensor]]` into `list[str]`
experts = [k for d in self._experts for k in d.keys()]
if len(experts) > 0:
raise ValueError(f"Unprocessed experts: {experts}")
@Model.register("BitnetForCausalLM")
class BitnetModel(Model):
model_arch = gguf.MODEL_ARCH.BITNET
def set_vocab(self):
self._set_vocab_sentencepiece()
def set_gguf_parameters(self):
super().set_gguf_parameters()
self.gguf_writer.add_vocab_size(self.hparams["vocab_size"])
self.gguf_writer.add_rope_scaling_type(gguf.RopeScalingType.LINEAR)
self.gguf_writer.add_rope_scaling_factor(1.0)
def weight_quant(self, weight):
dtype = weight.dtype
weight = weight.float()
s = 1 / weight.abs().mean().clamp(min=1e-5)
result = (weight * s).round().clamp(-1, 1) / s
return result.type(dtype)
def modify_tensors(self, data_torch: Tensor, name: str, bid: int | None) -> Iterable[tuple[str, Tensor]]:
# quant weight to i2 (in fp16)
if name.endswith(("q_proj.weight", "k_proj.weight", "v_proj.weight",
"down_proj.weight", "up_proj.weight", "gate_proj.weight",
"o_proj.weight")):
data_torch = self.weight_quant(data_torch)
return [(self.map_tensor_name(name), data_torch)]
def write_tensors(self):
max_name_len = max(len(s) for _, s in self.tensor_map.mapping.values()) + len(".weight,")
for name, data_torch in self.get_tensors():
# we don't need these
if name.endswith((".attention.masked_bias", ".attention.bias", ".rotary_emb.inv_freq")):
continue
old_dtype = data_torch.dtype
# convert any unsupported data types to float32
if data_torch.dtype not in (torch.float16, torch.float32):
data_torch = data_torch.to(torch.float32)
# use the first number-like part of the tensor name as the block id
bid = None
for part in name.split("."):
if part.isdecimal():
bid = int(part)
break
for new_name, data in ((n, d.squeeze().numpy()) for n, d in self.modify_tensors(data_torch, name, bid)):
data: np.ndarray = data # type hint
data_shape = data.shape
n_dims = len(data.shape)
data_dtype = data.dtype
data_qtype: gguf.GGMLQuantizationType | None = None
# when both are True, f32 should win
# extra_f32 = self.extra_f32_tensors(name, new_name, bid, n_dims)
# extra_f16 = self.extra_f16_tensors(name, new_name, bid, n_dims)
extra_f32 = False
extra_f16 = False
# Most of the codebase that takes in 1D tensors or norms only handles F32 tensors
# Conditions should closely match those in llama_model_quantize_internal in llama.cpp
extra_f32 = any(cond for cond in (
extra_f32,
n_dims == 1,
new_name.endswith("_norm.weight"),
))
# Some tensor types are always in float32
tensors_f32 = [
gguf.MODEL_TENSOR.FFN_GATE_INP,
gguf.MODEL_TENSOR.FFN_GATE_INP,
gguf.MODEL_TENSOR.POS_EMBD,
gguf.MODEL_TENSOR.TOKEN_TYPES,
]
if not args.quant_embd:
tensors_f32.append(gguf.MODEL_TENSOR.TOKEN_EMBD)
extra_f32 = extra_f32 or any(self.match_model_tensor_name(new_name, key, bid) for key in tensors_f32)
# if f16 desired, convert any float32 2-dim weight tensors to float16
extra_f16 = any(cond for cond in (
extra_f16,
(name.endswith(".weight") and n_dims >= 2),
))
suit_i2 = True
if name.endswith('embed_tokens.weight') or name.endswith('norm.weight'):
suit_i2 = False
i2_scale = None
if self.ftype != gguf.GGMLQuantizationType.F32 and extra_f16 and not extra_f32:
if self.ftype == gguf.GGMLQuantizationType.TL1 and suit_i2:
data, i2_scale = transform_to_tl1(data)
assert data.dtype == np.uint8
assert i2_scale.dtype == np.float32
data_qtype = gguf.GGMLQuantizationType.TL1
elif self.ftype == gguf.GGMLQuantizationType.TL2 and suit_i2:
data, i2_scale = transform_to_tl2(data)
assert data.dtype == np.uint8
assert i2_scale.dtype == np.float32
data_qtype = gguf.GGMLQuantizationType.TL2
else: # default to float16 for quantized tensors
if data_dtype != np.float16:
data = data.astype(np.float16)
data_qtype = gguf.GGMLQuantizationType.F16
if data_qtype is None: # by default, convert to float32
if data_dtype != np.float32:
data = data.astype(np.float32)
data_qtype = gguf.GGMLQuantizationType.F32
shape = data_shape
# shape = gguf.quant_shape_from_byte_shape(data.shape, data_qtype) if data.dtype == np.uint8 else data.shape
# reverse shape to make it similar to the internal ggml dimension order
shape_str = f"{{{', '.join(str(n) for n in reversed(shape))}}}"
# n_dims is implicit in the shape
logger.info(f"{f'%-{max_name_len}s' % f'{new_name},'} {old_dtype} --> {data_qtype.name}, shape = {shape_str}")
self.gguf_writer.add_tensor(new_name, data, raw_shape=shape, raw_dtype=data_qtype)
if i2_scale is not None:
self.gguf_writer.add_tensor(new_name + "_scale", i2_scale, raw_dtype=gguf.GGMLQuantizationType.F32)
###### CONVERSION LOGIC ######
ftype_map = {
"f32": gguf.GGMLQuantizationType.F32,
"f16": gguf.GGMLQuantizationType.F16,
"tl1" : gguf.GGMLQuantizationType.TL1,
"tl2" : gguf.GGMLQuantizationType.TL2,
}
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Convert a huggingface model to a GGML compatible file")
parser.add_argument(
"--vocab-only", action="store_true",
help="extract only the vocab",
)
parser.add_argument(
"--awq-path", type=Path, default=None,
help="Path to scale awq cache file")
parser.add_argument(
"--outfile", type=Path,
help="path to write to; default: based on input",
)
parser.add_argument(
"--outtype", type=str, choices=ftype_map.keys(), default="f32",
help="output format - use f32 for float32, f16 for float16",
)
parser.add_argument("--bigendian", action="store_true", help="model is executed on big endian machine")
parser.add_argument(
"model", type=Path,
help="directory containing model file",
)
parser.add_argument("--use-temp-file", action="store_true", help="use the tempfile library while processing (helpful when running out of memory, process killed)")
parser.add_argument("--model-name", type=str, default=None, help="name of the model")
parser.add_argument("--verbose", action="store_true", help="increase output verbosity")
parser.add_argument("--quant-embd", action="store_true", help="quantize the embedding layer")
return parser.parse_args()
def main() -> None:
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO)
dir_model = args.model
if not dir_model.is_dir():
logger.error(f'Error: {args.model} is not a directory')
sys.exit(1)
if args.outfile is not None:
fname_out = args.outfile
else:
# output in the same directory as the model by default
fname_out = dir_model / f'ggml-model-{args.outtype}.gguf'
logger.info(f"Loading model: {dir_model.name}")
hparams = Model.load_hparams(dir_model)
with torch.inference_mode():
model_class = Model.from_model_architecture(hparams["architectures"][0])
model_instance = model_class(dir_model, ftype_map[args.outtype], fname_out, args.bigendian, args.use_temp_file)
logger.info("Set model parameters")
model_instance.set_gguf_parameters()
logger.info("Set model tokenizer")
model_instance.set_vocab()
if args.vocab_only:
logger.info(f"Exporting model vocab to '{fname_out}'")
model_instance.write_vocab()
else:
logger.info(f"Exporting model to '{fname_out}'")
model_instance.write()
logger.info(f"Model successfully exported to '{fname_out}'")
if __name__ == '__main__':
args = parse_args()
main()