提问人:Dante 提问时间:6/5/2016 最后编辑:CommunityDante 更新时间:6/6/2016 访问量:4000
无法从线程类获取 100% 的 CPU 使用率
Cannot get 100% CPU usage from thread class
问:
我试图通过线程类从 C 程序问题中获取 100% CPU 使用率。这是我的代码
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
using namespace std;
static int primes = 0;
void prime(int a, int b);
mutex mtx;
int main()
{
unsigned int nthreads = thread::hardware_concurrency();
vector<thread> threads;
int limit = 1000000;
int intrvl = (int) limit / nthreads;
for (int i = 0; i < nthreads; i++)
{
threads.emplace_back(prime, i*intrvl+1, i*intrvl+intrvl);
}
cout << "Number of logical cores: " << nthreads << "\n";
cout << "Calculating number of primes less than " << limit << "... \n";
for (thread & t : threads) {
t.join();
}
cout << "There are " << primes << " prime numbers less than " << limit << ".\n";
return 0;
}
void prime(int a, int b)
{
for (a; a <= b; a++) {
int i = 2;
while(i <= a) {
if(a % i == 0)
break;
i++;
}
if(i == a) {
mtx.lock();
primes++;
mtx.unlock();
}
}
}
但是当我运行它时,我得到下图
那是正弦波。但是当我运行@Mysticial使用 openmp 的答案时,我得到了这个
我检查了这两个程序,它们都使用 8 个线程。为什么我得到这个不稳定的图表,我怎样才能得到与 openmp 对线程相同的结果?ps -eLf
答:
OpenMP 示例对 sum 变量使用“reduction”,这意味着每个任务都对自己的局部变量求和。
OpenMP 将 的线程本地副本 添加到并行部分的末尾,以获得总计。
这意味着它不需要锁定。
正如 @Sam 所说,如果线程无法获取互斥锁,它将进入睡眠状态。
因此,在您的情况下,线程将花费相当多的时间处于睡眠状态。
如果您不想使用 OpenMP,请尝试不需要互斥锁和解锁。primes
primes
primes
static std::atomic<int> primes = 0;
或者,您可以使用一个数组来模拟 OpenMP 缩减,其中线程求和到最后的总和。primes[numThreads]
i
primes[i]
primes[]
Mystical 的答案和你的代码之间存在一些根本差异。
区别 #1
您的代码为每个 CPU 创建一个工作块,并让它运行到完成。这意味着一旦一个线程完成,CPU 使用率将急剧下降,因为当其他线程运行完成时,CPU 将处于空闲状态。发生这种情况是因为日程安排并不总是公平的。一个线程的进度和完成速度可能比其他线程快得多。
OpenMP 解决方案通过声明 schedule(dynamic)
来解决这个问题,它告诉 OpenMP 在内部创建一个工作队列,所有线程都将从中消耗工作。当一大块工作完成时,代码中退出的线程会消耗另一块工作并忙于它。
最终,这变成了挑选足够大小的块的平衡行为。太大,并且 CPU 可能不会在任务结束时达到最大值。太小,可能会有很大的开销。
区别 #2
您正在写入一个变量,该变量在所有线程之间共享。
这有 2 个后果:primes
- 它需要同步以防止数据争用。
- 它使现代 CPU 上的缓存非常不愉快,因为在一个线程上的写入对另一个线程可见之前,需要进行缓存刷新。
OpenMP 解决方案通过减少 将每个线程的单个值的结果减少到最终结果中来解决这个问题。这就是 reduction(+ : primes)
的作用。operator+()
primes
通过了解 OpenMP 如何拆分、安排工作和合并结果,我们可以修改您的代码以使其具有类似的行为。
#include <iostream>
#include <thread>
#include <vector>
#include <utility>
#include <algorithm>
#include <functional>
#include <mutex>
#include <future>
using namespace std;
int prime(int a, int b)
{
int primes = 0;
for (a; a <= b; a++) {
int i = 2;
while (i <= a) {
if (a % i == 0)
break;
i++;
}
if (i == a) {
primes++;
}
}
return primes;
}
int workConsumingPrime(vector<pair<int, int>>& workQueue, mutex& workMutex)
{
int primes = 0;
unique_lock<mutex> workLock(workMutex);
while (!workQueue.empty()) {
pair<int, int> work = workQueue.back();
workQueue.pop_back();
workLock.unlock(); //< Don't hold the mutex while we do our work.
primes += prime(work.first, work.second);
workLock.lock();
}
return primes;
}
int main()
{
int nthreads = thread::hardware_concurrency();
int limit = 1000000;
// A place to put work to be consumed, and a synchronisation object to protect it.
vector<pair<int, int>> workQueue;
mutex workMutex;
// Put all of the ranges into a queue for the threads to consume.
int chunkSize = max(limit / (nthreads*16), 10); //< Handwaving came picking 16 and a good factor.
for (int i = 0; i < limit; i += chunkSize) {
workQueue.push_back(make_pair(i, min(limit, i + chunkSize)));
}
// Start the threads.
vector<future<int>> futures;
for (int i = 0; i < nthreads; ++i) {
packaged_task<int()> task(bind(workConsumingPrime, ref(workQueue), ref(workMutex)));
futures.push_back(task.get_future());
thread(move(task)).detach();
}
cout << "Number of logical cores: " << nthreads << "\n";
cout << "Calculating number of primes less than " << limit << "... \n";
// Sum up all the results.
int primes = 0;
for (future<int>& f : futures) {
primes += f.get();
}
cout << "There are " << primes << " prime numbers less than " << limit << ".\n";
}
这仍然不是 OpenMP 示例行为方式的完美再现。例如,这更接近 OpenMP 的时间表,因为工作块是固定大小的。此外,OpenMP 根本不使用工作队列。所以我可能撒了一点谎—— 称其为善意的谎言,因为我想更明确地展示被拆分的作品。它可能在幕后做的是存储下一个线程应该在可用时开始的迭代,以及下一个块大小的启发式。static
即使存在这些差异,我也能够在很长一段时间内最大化我所有的 CPU。
展望未来...
您可能注意到 OpenMP 版本更具可读性。这是因为它旨在解决这样的问题。因此,当我们试图在没有库或编译器扩展的情况下解决它们时,我们最终会重新发明轮子。幸运的是,为了将这种功能直接引入C++,已经做了很多工作。具体来说,并行 TS
可以帮助我们解决是否可以将其表示为标准 C++ 算法的问题。然后,我们可以告诉库将算法分布在它认为合适的所有 CPU 上,这样它就为我们完成了所有繁重的工作。
在 C++ 11 中,在 Boost 的一点帮助下,这个算法可以写成:
#include <iostream>
#include <iterator>
#include <algorithm>
#include <boost/range/irange.hpp>
using namespace std;
bool isPrime(int n)
{
if (n < 2)
return false;
for (int i = 2; i < n; ++i) {
if (n % i == 0)
return false;
}
return true;
}
int main()
{
auto range = boost::irange(0, 1000001);
auto numPrimes = count_if(begin(range), end(range), isPrime);
cout << "There are " << numPrimes << " prime numbers less than " << range.back() << ".\n";
}
要并行化算法,您只需要将 std::p ar
作为第一个参数传递给count_if
。#include <execution_policy>
auto numPrimes = count_if(par, begin(range), end(range), isPrime);
这就是让我乐于阅读的代码。
注意:绝对没有花时间优化这个算法。如果我们要进行任何优化,我会研究像埃拉托色尼筛子这样的东西,它使用以前的素数计算来帮助未来的计算。
首先,您需要意识到 OpenMP 通常有一个相当复杂的线程池,因此匹配它(确切)可能至少有些困难。
其次,在我看来,在优化线程之前,您应该尝试从至少一半的中规中矩的基本算法开始。在这种情况下,你实现的基本算法基本上是非常糟糕的。它正在检查数字是否是素数,但做了很多没有完成任何有用工作的工作。
- 它正在检查偶数是否为质数。除了 2 之外,它们都不是。 永远。
- 它检查奇数是否可以被偶数整除。同样,他们不是。
- 它检查数字是否可以被大于其平方根的数字整除。如果没有小于平方根的除数,则也不能有比平方根大的除数。
虽然它可能不影响速度,但我也发现有一个函数来检查单个数字是否为素数,并只返回 / 来指示结果,这比使用一些复杂的代码来确定前一个循环是运行到完成还是提前退出要容易得多。true
false
你可以通过消除更多的东西来优化算法,但在我看来,这并不是“优化”,而是简单地避免了完全不必要的悲观化。
至少在我看来,使用它(在这种情况下)来启动线程也更容易一些。这让我们可以很容易地从线程中返回一个值(我们想要返回的计数)。std::async
因此,让我们从基于这些观察结果的修复开始:prime
int prime(int a, int b)
{
int count = 0;
if (a == 2)
++count;
if (a % 2 == 0)
++a;
auto check = [](int i) -> bool {
for (int j = 3; j*j <= i; j += 2)
if (i % j == 0)
return false;
return true;
};
for (a; a <= b; a+=2) {
if (check(a))
++count;
}
return count;
}
现在,让我指出,这已经足够快了(甚至是单线程的),如果我们只是想让工作完成的速度比我们从完美的线程扩展中获得的快 4 倍(左右),我们就完成了,即使根本不使用线程。对于您给出的限制,这将在 1 秒内完成。
然而,为了论证,让我们假设我们想要获得更多,并且也使用多个内核。这里要意识到的一件事是,我们通常至少需要比内核多几个线程。问题相当简单:由于每个内核只有一个线程,我们没有什么可以弥补这样一个事实,即我们甚至没有真正在线程之间分配负载——处理最大数字的线程比处理最小数字的线程要做更多的工作——但是如果我们(例如)有一台 4 核机器, 一旦一个线程完成,我们只能使用 75% 的 CPU。然后,当另一个线程完成时,它会下降到 50%。然后是 25%,最后完成,只使用一个内核。
我们也许可以做一些计算来尝试更均匀地分配负载,但是将负载分成 6 到 8 倍于内核的线程要容易得多。这样,计算可以继续使用所有内核,直到只剩下三个线程1.
将所有这些放入代码中,我们最终可以得到如下内容:
int main() {
using namespace chrono;
int limit = 50000000;
unsigned int nthreads = 8 * thread::hardware_concurrency();
cout << "\nComputing multi-threaded:\n";
cout << "Number of threads: " << nthreads << "\n";
cout << "Calculating number of primes less than " << limit << "... \n";
auto start2 = high_resolution_clock::now();
vector<future<int>> threads;
int intrvl = limit / nthreads;
for (int i = 0; i < nthreads; i++)
threads.emplace_back(std::async(std::launch::async, prime, i*intrvl + 1, (i + 1)*intrvl));
int primes = 0;
for (auto &t : threads)
primes += t.get();
auto end2 = high_resolution_clock::now();
cout << "Primes: " << primes << ", Time: " << duration_cast<milliseconds>(end2 - start2).count() << "\n";
}
请注意以下几点:
- 这运行得足够快,以至于我已经将上限增加了一个相当大的因素,所以它会运行足够长的时间,我们至少可以看到它在完成2 之前使用 100% 的 CPU 时间几秒钟。
- 我添加了一些计时代码,以便更准确地了解它运行了多长时间。
至少当我运行它时,它似乎按照我们的预期/希望行事:它使用了 100% 的 CPU 时间,直到它非常接近尾声,当它在完成之前开始下降时(即,当我们要执行的线程少于我们执行它们的内核时)。
- 如果您想知道 OpenMP 如何避免这种情况:它通常使用线程池,因此将循环的一定数量的迭代作为任务调度到线程池。这使它能够生成大量任务,而无需同时争用大量线程来争用 CPU 时间。
- 根据您使用的上限,它在我的机器上大约需要 90 毫秒才能完成,这还不足以让它在 CPU 使用率图表上留下明显的闪光点。
上一个:IEEE 754 舍入或斩波
下一个:树的深度优先预排序遍历
评论
openmp