提问人:Stand with Gaza 提问时间:7/31/2023 最后编辑:Stand with Gaza 更新时间:8/7/2023 访问量:137
在 Python 和 NumPy 中量化正态分布的浮点数
Quantizing normally distributed floats in Python and NumPy
问:
让数组中的值从高斯采样
分配。我想用“代表”之一替换每个值,以便总量化误差为
最小 化。A
A
n_R
R
下面是执行线性量化的 NumPy 代码:
n_A, n_R = 1_000_000, 256
mu, sig = 500, 250
A = np.random.normal(mu, sig, size = n_A)
lo, hi = np.min(A), np.max(A)
R = np.linspace(lo, hi, n_R)
I = np.round((A - lo) * (n_R - 1) / (hi - lo)).astype(np.uint32)
L = np.mean(np.abs(A - R[I]))
print('Linear loss:', L)
-> Linspace loss: 2.3303939600700603
虽然这可行,但量化误差很大。有没有更聪明的
怎么做?我认为人们可以利用
正态分布,或者可能使用迭代过程
最小化“损失”功能。A
更新在研究这个问题时,我发现了一个关于“加权”量化的相关问题。调整他们的方法有时会得到更好的量化结果:
from scipy.stats import norm
dist = norm(loc = mu, scale = sig)
bounds = dist.cdf([mu - 3*sig, mu + 3*sig])
pp = np.linspace(*bounds, n_R)
R = dist.ppf(pp)
# Find closest matches
lhits = np.clip(np.searchsorted(R, A, 'left'), 0, n_R - 1)
rhits = np.clip(np.searchsorted(R, A, 'right') - 1, 0, n_R - 1)
ldiff = R[lhits] - A
rdiff = A - R[rhits]
I = lhits
idx = np.where(rdiff < ldiff)[0]
I[idx] = rhits[idx]
L = np.mean(np.abs(A - R[I]))
print('Gaussian loss:', L)
-> Gaussian loss: 1.6521974945326285
K-means聚类可能更好,但似乎太慢了 在大型阵列上实用。
答:
K-均值
K-means 聚类可能更好,但似乎太慢,在大型数组上不实用。
对于 1D 聚类情况,有些算法比 K 均值更快。查看 https://stats.stackexchange.com/questions/40454/determine-different-clusters-of-1d-data-from-database
我选择了其中一种算法,Jenks Natural Breaks,并在数据集的随机子样本上运行它:
A_samp = np.random.choice(A, size=10000)
breaks = np.array(jenkspy.jenks_breaks(A_samp, n_classes=n_R))
R = (breaks[:-1] + breaks[1:]) / 2
这是相当快的,并且整个数据集的量化损失约为 1.28。
为了直观地了解这些方法中的每一个正在做什么,我绘制了每个方法提出的中断的 cdf 与中断的 R 中的索引。
根据定义,高斯是一条直线。这意味着它在分布的每个百分位数处具有相等数量的中断。线性方法在分布的中间花费很少的中断,并且大部分中断在尾部使用。詹克斯在他们两人之间找到了折衷方案。
自动搜索更低的损耗
看着上面的图表,我有了一个想法:所有这些选择断点的方法都是在分位数域中绘制的各种 S 形曲线。(如果你把它想象成一个真正伸展的 S 形结肠,那么高斯有点适合。
我编写了一个函数,该函数使用单个变量 strength 对每条曲线进行参数化,这是 sigmoid 曲线的速度。一旦我有了它,我就习惯于自动搜索一条曲线,将损失降到最低。scipy.optimize.minimize
事实证明,如果你让 Scipy 优化它,它会选择一条非常接近 Jenks 的曲线强度,它找到的曲线比 Jenks 的曲线略差,损失约为 1.33。
可以在此处查看采用此失败方法的笔记本。
使用 2^16 个浮点数进行量化
在需要创建 2^16 个不同代表的情况下,使用 Jenks 在计算上是不可行的。但是,您可以做一些非常接近的事情:使用少量类加上线性插值的 Jenks。
下面是代码:
import itertools
def pairwise(iterable):
"s -> (s0, s1), (s1, s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)
def linspace_jenks(A, n_R, jenks_classes, dist_lo, dist_hi):
assert n_R % jenks_classes == 0, "jenks_classes must be divisor of n_R"
simplify_factor = n_R // jenks_classes
assert jenks_classes ** 2 <= len(A), "Need more data to estimate"
breaks = jenkspy.jenks_breaks(A, n_classes=jenks_classes)
# Adjust lowest and highest break to match highest/lowest observed value
breaks[0] = dist_lo
breaks[-1] = dist_hi
linspace_classes = []
for lo, hi in pairwise(breaks):
linspace_classes.append(np.linspace(lo, hi, simplify_factor, endpoint=False))
linspace_classes = np.hstack(linspace_classes)
assert len(linspace_classes) == n_R
return linspace_classes
调用示例:
A_samp = np.random.choice(A, size = 2**16)
jenks_R = linspace_jenks(A_samp, n_R, 128, np.min(A), np.max(A))
性能与线性方法相比如何?在我的系统上,我得到 0.009421 的线性损失,n_R=2^16。下图显示了该方法针对每个值 jenks_classes 获取的损失。linspace_jenks
只有 32 个 Jenks 类,其余部分用线性插值填充,损失下降到 0.005031。
评论
(breaks[:-1] + breaks[1:]) / 2
部分是为了新颖性,主要是为了完整性,我展示了@Homer512 正确建议是可能的 - MILP 实现。我希望它的准确性非常出色,其性能介于“差”和“可怕”之间。
我用一个非常小的问题大小进行演示,这样当你调试和查看约束矩阵时,它们就清晰易读,这样我的RAM就不会爆炸。
import time
import numpy as np
import scipy.sparse as sp
from numpy.random import default_rng
from scipy.optimize import milp, Bounds, LinearConstraint
n_A, n_R = 10, 4
rand = default_rng(seed=0)
A = rand.normal(loc=100, scale=50, size=n_A)
lo, hi = A.min(), A.max()
A_order = A.argsort() # for use if you want to restore the original order later
# A = A[A_order] # if you want to modify the algorithm to assume ordered input
'''
decision variables:
nA*nR binary assignments (sparse)
nA discretized values (dense)
nR discretization levels (dense) aka. R
nA errors (dense)
'''
c = np.zeros(n_A*n_R + n_A + n_R + n_A)
c[-n_A:] = 1 # minimize errors
integrality = np.ones(n_A*n_R + n_A + n_R + n_A, dtype=np.uint8)
integrality[n_A*n_R:] = 0 # only assignments are integral
lb = np.full_like(c, -np.inf)
ub = np.full_like(c, +np.inf)
lb[:n_A*n_R] = 0 # assignments are binary
ub[:n_A*n_R] = 1
# Big-M magnitude based on range of input data, without assuming signs
M = 2*max(abs(lo), abs(hi))
# I- Each input value must be assigned exactly one discretized value (Kronecker)
exclusive_assignment = LinearConstraint(
A=sp.hstack((
sp.kron(sp.eye(n_A), np.ones(n_R)),
sp.csc_array((n_A, n_A)),
sp.csc_array((n_A, n_R)),
sp.csc_array((n_A, n_A)),
)),
lb=np.ones(n_A), ub=np.ones(n_A),
)
# II- If assigned, discretized output must be <= level (Kronecker)
# discretized_output <= level + (1-assigned)*M
# assigned*M + discretized_output - level <= M
# III- If assigned, discretized output must be >= level
# discretized_output >= level - (1-assigned)*M
# assigned*-M + discretized_output - level >= -M
discrete_sparse_to_level = LinearConstraint(
A=sp.bmat((
(
sp.eye(n_A*n_R) * M,
sp.kron(sp.eye(n_A), np.ones((n_R, 1))),
sp.csc_array(
np.tile(-np.eye(n_R), (n_A, 1))
),
sp.csc_array((n_A*n_R, n_A)),
),
(
sp.eye(n_A*n_R) * -M,
sp.kron(sp.eye(n_A), np.ones((n_R, 1))),
sp.csc_array(
np.tile(-np.eye(n_R), (n_A, 1))
),
sp.csc_array((n_A*n_R, n_A)),
),
), format='csc'),
lb=np.concatenate((np.full(n_A*n_R, -np.inf), np.full(n_A*n_R, -M))),
ub=np.concatenate((np.full(n_A*n_R, M), np.full(n_A*n_R, np.inf))),
)
# IV- error >= output - input (Kronecker)
# A.assign - output + error >= 0
# V- error >= input - output
# -A.assign + output + error >= 0
abs_error = LinearConstraint(
A=sp.bmat((
(
sp.kron(sp.diags(A), np.ones(n_R)),
-sp.eye(n_A),
sp.csc_array((n_A, n_R)),
np.eye(n_A),
),
(
sp.kron(sp.diags(-A), np.ones(n_R)),
sp.eye(n_A),
sp.csc_array((n_A, n_R)),
np.eye(n_A),
),
), format='csc'),
lb=np.concatenate((np.zeros(n_A), np.zeros(n_A))),
ub=np.concatenate((np.full(n_A, np.inf), np.full(n_A, np.inf))),
)
level_order = LinearConstraint(
A=sp.hstack((
sp.csc_array((n_R-1, n_A*n_R)),
sp.csc_array((n_R-1, n_A)),
sp.eye(n_R-1, n_R, k=1) - sp.eye(n_R-1, n_R),
sp.csc_array((n_R-1, n_A)),
)),
lb=np.zeros(n_R-1),
ub=np.full(n_R-1, np.inf),
)
start = time.perf_counter()
res = milp(
c=c, integrality=integrality, bounds=Bounds(lb=lb, ub=ub),
constraints=(
exclusive_assignment,
abs_error,
discrete_sparse_to_level,
# level_order,
),
)
stop = time.perf_counter()
print(f'Completed in {stop - start:.4f} s')
assert res.success
assign, discretized, levels, errors = np.split(
res.x, (n_A*n_R, n_A*n_R + n_A, -n_A)
)
assign = assign.reshape((n_A, n_R))
print(levels)
print(np.stack((A, discretized, errors), axis=1))
print(assign)
评论
R
R
R