在 Python 和 NumPy 中量化正态分布的浮点数

Quantizing normally distributed floats in Python and NumPy

提问人:Stand with Gaza 提问时间:7/31/2023 最后编辑:Stand with Gaza 更新时间:8/7/2023 访问量:137

问:

让数组中的值从高斯采样 分配。我想用“代表”之一替换每个值,以便总量化误差为 最小 化。AAn_RR

下面是执行线性量化的 NumPy 代码:

n_A, n_R = 1_000_000, 256
mu, sig = 500, 250
A = np.random.normal(mu, sig, size = n_A)
lo, hi = np.min(A), np.max(A)
R = np.linspace(lo, hi, n_R)
I = np.round((A - lo) * (n_R - 1) / (hi - lo)).astype(np.uint32)

L = np.mean(np.abs(A - R[I]))
print('Linear loss:', L)
-> Linspace loss: 2.3303939600700603

虽然这可行,但量化误差很大。有没有更聪明的 怎么做?我认为人们可以利用 正态分布,或者可能使用迭代过程 最小化“损失”功能。A

更新在研究这个问题时,我发现了一个关于“加权”量化的相关问题。调整他们的方法有时会得到更好的量化结果:

from scipy.stats import norm

dist = norm(loc = mu, scale = sig)
bounds = dist.cdf([mu - 3*sig, mu + 3*sig])
pp = np.linspace(*bounds, n_R)
R = dist.ppf(pp)

# Find closest matches
lhits = np.clip(np.searchsorted(R, A, 'left'), 0, n_R - 1)
rhits = np.clip(np.searchsorted(R, A, 'right') - 1, 0, n_R - 1)

ldiff = R[lhits] - A
rdiff = A - R[rhits]
I = lhits
idx = np.where(rdiff < ldiff)[0]
I[idx] = rhits[idx]

L = np.mean(np.abs(A - R[I]))
print('Gaussian loss:', L)
-> Gaussian loss: 1.6521974945326285

K-means聚类可能更好,但似乎太慢了 在大型阵列上实用。

Python NumPy 浮点 k-means 量化

评论

1赞 Homer512 7/31/2023
听起来模糊不清,应该可以将其改写为整数线性规划 (ILP) 问题。
1赞 Stand with Gaza 7/31/2023
@Reinderien我更新了我的问题。 不保证是线性的。线性量化是我的基线,我试图弄清楚是否有更好的方法来量化正态分布的数据。R
0赞 user2357112 7/31/2023
您是否正在尝试计算最佳数组?如果没有,您正在尝试计算什么?R
1赞 Stand with Gaza 7/31/2023
完全。我正在尝试找到可以最大程度地减少“编码错误”的值。R

答:

3赞 Nick ODell 7/31/2023 #1

K-均值

K-means 聚类可能更好,但似乎太慢,在大型数组上不实用。

对于 1D 聚类情况,有些算法比 K 均值更快。查看 https://stats.stackexchange.com/questions/40454/determine-different-clusters-of-1d-data-from-database

我选择了其中一种算法,Jenks Natural Breaks,并在数据集的随机子样本上运行它:

A_samp = np.random.choice(A, size=10000)
breaks = np.array(jenkspy.jenks_breaks(A_samp, n_classes=n_R))
R = (breaks[:-1] + breaks[1:]) / 2

这是相当快的,并且整个数据集的量化损失约为 1.28。

为了直观地了解这些方法中的每一个正在做什么,我绘制了每个方法提出的中断的 cdf 与中断的 R 中的索引。

QQ plot

根据定义,高斯是一条直线。这意味着它在分布的每个百分位数处具有相等数量的中断。线性方法在分布的中间花费很少的中断,并且大部分中断在尾部使用。詹克斯在他们两人之间找到了折衷方案。

自动搜索更低的损耗

看着上面的图表,我有了一个想法:所有这些选择断点的方法都是在分位数域中绘制的各种 S 形曲线。(如果你把它想象成一个真正伸展的 S 形结肠,那么高斯有点适合。

我编写了一个函数,该函数使用单个变量 strength 对每条曲线进行参数化,这是 sigmoid 曲线的速度。一旦我有了它,我就习惯于自动搜索一条曲线,将损失降到最低。scipy.optimize.minimize

事实证明,如果你让 Scipy 优化它,它会选择一条非常接近 Jenks 的曲线强度,它找到的曲线比 Jenks 的曲线略差,损失约为 1.33。

可以在此处查看采用此失败方法的笔记本。

使用 2^16 个浮点数进行量化

在需要创建 2^16 个不同代表的情况下,使用 Jenks 在计算上是不可行的。但是,您可以做一些非常接近的事情:使用少量类加上线性插值的 Jenks。

下面是代码:

import itertools


def pairwise(iterable):
    "s -> (s0, s1), (s1, s2), (s2, s3), ..."
    a, b = itertools.tee(iterable)
    next(b, None)
    return zip(a, b)


def linspace_jenks(A, n_R, jenks_classes, dist_lo, dist_hi):
    assert n_R % jenks_classes == 0, "jenks_classes must be divisor of n_R"
    simplify_factor = n_R // jenks_classes
    assert jenks_classes ** 2 <= len(A), "Need more data to estimate"
    breaks = jenkspy.jenks_breaks(A, n_classes=jenks_classes)
    # Adjust lowest and highest break to match highest/lowest observed value
    breaks[0] = dist_lo
    breaks[-1] = dist_hi
    linspace_classes = []
    for lo, hi in pairwise(breaks):
        linspace_classes.append(np.linspace(lo, hi, simplify_factor, endpoint=False))
    linspace_classes = np.hstack(linspace_classes)
    assert len(linspace_classes) == n_R
    return linspace_classes

调用示例:

A_samp = np.random.choice(A, size = 2**16)
jenks_R = linspace_jenks(A_samp, n_R, 128, np.min(A), np.max(A))

性能与线性方法相比如何?在我的系统上,我得到 0.009421 的线性损失,n_R=2^16。下图显示了该方法针对每个值 jenks_classes 获取的损失。linspace_jenks

linspace jenks loss

只有 32 个 Jenks 类,其余部分用线性插值填充,损失下降到 0.005031。

评论

0赞 Stand with Gaza 7/31/2023
超级有趣!我必须对此进行更多调查。不过有一条评论。既然 Jenks 产生了类边界,你不应该像这样计算质心吗: ?(breaks[:-1] + breaks[1:]) / 2
0赞 Nick ODell 7/31/2023
@BjörnLindqvist嗯,你是对的。使用质心而不是断点似乎可以减少 ~0.015 的损失。
0赞 Stand with Gaza 8/6/2023
不幸的是,Jenks 算法仅在类数量较少时才可处理。我的用例是将浮点数量化为 16 位,这需要 65536 个类,对 Jenks 来说太多了。尽管如此,还是非常有趣的方法。
0赞 Nick ODell 8/7/2023
@BjörnLindqvist 参见编辑。 将 Jenks 与 32 个类一起使用就足以捕获分布的形状。
2赞 Reinderien 8/1/2023 #2

部分是为了新颖性,主要是为了完整性,我展示了@Homer512 正确建议是可能的 - MILP 实现。我希望它的准确性非常出色,其性能介于“差”和“可怕”之间。

我用一个非常小的问题大小进行演示,这样当你调试和查看约束矩阵时,它们就清晰易读,这样我的RAM就不会爆炸。

import time

import numpy as np
import scipy.sparse as sp
from numpy.random import default_rng
from scipy.optimize import milp, Bounds, LinearConstraint

n_A, n_R = 10, 4
rand = default_rng(seed=0)
A = rand.normal(loc=100, scale=50, size=n_A)
lo, hi = A.min(), A.max()

A_order = A.argsort()  # for use if you want to restore the original order later
# A = A[A_order]       # if you want to modify the algorithm to assume ordered input

'''
decision variables:
    nA*nR binary assignments (sparse)
    nA discretized values (dense)
    nR discretization levels (dense) aka. R
    nA errors (dense)
'''
c = np.zeros(n_A*n_R + n_A + n_R + n_A)
c[-n_A:] = 1  # minimize errors

integrality = np.ones(n_A*n_R + n_A + n_R + n_A, dtype=np.uint8)
integrality[n_A*n_R:] = 0  # only assignments are integral

lb = np.full_like(c, -np.inf)
ub = np.full_like(c, +np.inf)
lb[:n_A*n_R] = 0  # assignments are binary
ub[:n_A*n_R] = 1

# Big-M magnitude based on range of input data, without assuming signs
M = 2*max(abs(lo), abs(hi))

# I- Each input value must be assigned exactly one discretized value (Kronecker)
exclusive_assignment = LinearConstraint(
    A=sp.hstack((
        sp.kron(sp.eye(n_A), np.ones(n_R)),
        sp.csc_array((n_A, n_A)),
        sp.csc_array((n_A, n_R)),
        sp.csc_array((n_A, n_A)),
    )),
    lb=np.ones(n_A), ub=np.ones(n_A),
)

# II- If assigned, discretized output must be <= level (Kronecker)
# discretized_output <= level + (1-assigned)*M
# assigned*M + discretized_output - level <= M
# III- If assigned, discretized output must be >= level
# discretized_output >= level - (1-assigned)*M
# assigned*-M + discretized_output - level >= -M
discrete_sparse_to_level = LinearConstraint(
    A=sp.bmat((
        (
            sp.eye(n_A*n_R) * M,
            sp.kron(sp.eye(n_A), np.ones((n_R, 1))),
            sp.csc_array(
                np.tile(-np.eye(n_R), (n_A, 1))
            ),
            sp.csc_array((n_A*n_R, n_A)),
        ),
        (
            sp.eye(n_A*n_R) * -M,
            sp.kron(sp.eye(n_A), np.ones((n_R, 1))),
            sp.csc_array(
                np.tile(-np.eye(n_R), (n_A, 1))
            ),
            sp.csc_array((n_A*n_R, n_A)),
        ),
    ), format='csc'),
    lb=np.concatenate((np.full(n_A*n_R, -np.inf), np.full(n_A*n_R, -M))),
    ub=np.concatenate((np.full(n_A*n_R, M),       np.full(n_A*n_R, np.inf))),
)

# IV- error >= output - input (Kronecker)
# A.assign - output + error >= 0
# V- error >= input - output
# -A.assign + output + error >= 0
abs_error = LinearConstraint(
    A=sp.bmat((
        (
            sp.kron(sp.diags(A), np.ones(n_R)),
            -sp.eye(n_A),
            sp.csc_array((n_A, n_R)),
            np.eye(n_A),
        ),
        (
            sp.kron(sp.diags(-A), np.ones(n_R)),
            sp.eye(n_A),
            sp.csc_array((n_A, n_R)),
            np.eye(n_A),
        ),
    ), format='csc'),
    lb=np.concatenate((np.zeros(n_A),        np.zeros(n_A))),
    ub=np.concatenate((np.full(n_A, np.inf), np.full(n_A, np.inf))),
)

level_order = LinearConstraint(
    A=sp.hstack((
        sp.csc_array((n_R-1, n_A*n_R)),
        sp.csc_array((n_R-1, n_A)),
        sp.eye(n_R-1, n_R, k=1) - sp.eye(n_R-1, n_R),
        sp.csc_array((n_R-1, n_A)),
    )),
    lb=np.zeros(n_R-1),
    ub=np.full(n_R-1, np.inf),
)

start = time.perf_counter()
res = milp(
    c=c, integrality=integrality, bounds=Bounds(lb=lb, ub=ub),
    constraints=(
        exclusive_assignment,
        abs_error,
        discrete_sparse_to_level,
        # level_order,
    ),
)
stop = time.perf_counter()
print(f'Completed in {stop - start:.4f} s')
assert res.success

assign, discretized, levels, errors = np.split(
    res.x, (n_A*n_R, n_A*n_R + n_A, -n_A)
)
assign = assign.reshape((n_A, n_R))
print(levels)
print(np.stack((A, discretized, errors), axis=1))
print(assign)