很久之前把c++标准库中提供的并发API都看了一遍。最近项目上写ts写的多,很久没写c++代码了,偶尔来排查下问题,居然得回去翻翻代码片段才能回想起细节,想想干脆整一些文章记录下来吧,加深下记忆。

future是c++标准库中提供的一个高级API,用于执行异步任务。其相关的API可以通过#include <future>来引入。

使用future创建并发任务

使用future的API可以很简单的创建一个异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>
#include <thread>
#include <chrono>
#include <random>
#include <exception>
#include <future>

int doSomething(char c)
{


std::default_random_engine dre(c);
std::uniform_int_distribution<int> id(10, 1000);
for (int i = 0; i < 5; i++)
{
auto sleep_time = std::chrono::milliseconds(id(dre));
std::this_thread::sleep_for(sleep_time);
std::cout.put(c).flush();
}
return c;
}

int func1()
{

return doSomething('.');
}

int func2()
{

return doSomething('+');
}

// 测试普通async异步
// 默认由系统调度合适开始异步任务,任何时间都有可能开启;当调用 .get()时,强制开启线程任务
// 此时会先阻塞执行完func2,再阻塞执行完func1,实际上是没有异步操作的
void TestAsync()
{

std::cout << "start func1 in background and func2 foreground" << std::endl;
std::future<int> result1(std::async(func1));
int result2 = func2();
int result = result1.get() + result2;
std::cout << " final result :" << result << std::endl;
}

这里doSomething是一个耗时任务,它执行时会随机的 sleep 50~5000ms。
在上面的示例中,基于func1创建一个并发任务,程序会尽量尝试在独立线程中执行func1的任务,如果不行就在当前线程执行,实际上我在gcc4.8下运行这段程序,所有的计算都是在主线程中完成的。这里的要点是:

  • 调用get的时候,若已经跑完了,则直接拿到结果;若没跑完则会阻塞当前线程直到跑完
  • gcc4.8下如果不调用get,并发任务会一直不启动,主动调用get后,任务相当于还是串行执行的

launch策略

上面的基本用法中,任务实际上还是串行执行的,那是因为我们没有给应用launch策略,也就是并发任务的启动策略。

std::launch::async 异步启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//测试通过 async launch
//如果传入 std::launch::async,则会立即开始异步任务,不行则抛异常。此时多个任务是真正的并行执行的
void TestLaunchAsync()
{

//launch async可以强制直接异步启动,可看到两个符号交替打出
std::cout << "start func1 in background and func2 background, launch by async" << std::endl;
// 注意,这里如果不支持异步,会抛出一个system error,例如编译的时候没有连接 libpthread.a
std::future<int> result1(std::async(std::launch::async, func1));
std::future<int> result2(std::async(std::launch::async, func2));
//int result2 = func2();
//注意,当用launch async的时候,这里即使不get,生命周期结束时,异步函数也会被执行
std::cout << " before final " << std::endl;
int result = result1.get() + result2.get();
std::cout << " final result :" << result << std::endl;
}
  • 使用std::launch::async启动策略时,会立即强制在独立线程中启动并发任务,此时任务是真正并行的
  • 如果不支持异步,会抛出一个system error

std::launch::deferred 延迟启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 测试通过deferred launch
// 当用launch deferred的时候,调用get之前绝对不会执行异步任务
// 因为最终还是调用get时启动且阻塞当前线程的,所以这俩任务实际上也是串行,而不是异步并行的。
void TestLaunchDeferred()
{

std::cout << "start func1 in background and func2 background, launch by deferred" << std::endl;
std::future<int> result1(std::async(std::launch::deferred, func1));
std::future<int> result2(std::async(std::launch::deferred, func2));
//int result2 = func2();
//std::default_random_engine dre('x');
std::default_random_engine dre('x');
std::uniform_int_distribution<int> id(10, 1000);
int random = id(dre);
while (random < 500)
{
std::cout << " not to call get, and not start deferred task, random:" << random << std::endl;
random = id(dre);
}
std::cout << " random: " << random << std::endl;
int result = result1.get() + result2.get();
std::cout << " call get and start deferred task, final result :" << result << std::endl;
}
  • 调用get之前绝对不会执行异步任务
  • gcc4.8下此启动策略和默认的模式一样,都是在主线程中串行完成所有任务

wait、wait_for、wait_until等待、轮询

1
2
3
4
5
6
7
8
9
10
// 测试wait
void TestWait()
{

std::future<int> result1 = std::async(func1);
std::future<int> result2 = std::async(func2);
result1.wait();
result2.wait();
std::cout << " async task done " << std::endl;
std::cout << " wait会强制启动async任务:" << (result1.get()+result2.get()) << std::endl;
}
  • wait会强制启动一个async任务,wait时会阻塞当前线程,所以这两个wait的任务是同步串行的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    std::future<int> f;
    void TestWaitFor()
    {

    int result = 1;
    //不加launch,因为wait_for不会主动开启任务,这里任务都不会开始执行,怎么等都是timeout
    //std::future<int> f = std::async(func1);

    //加launch::async,这里根据等待时间不同,可能是ready或者timeout,但是有个问题,无论是哪种情况,函数析构的时候都会等待async的任务执行完毕
    //std::future<int> f = std::async(std::launch::async, func1);

    //这种写法,把future对象放外面,在此函数结束的时候,不会等待f执行结束。 不过此demo中,在程序退出前仍然会等待退出
    //f = std::async(std::launch::async, func1);

    //通过deferred,wait_for之后也总是timeout
    //TODO: std::future_status::deferred如何触发?
    f = std::async(std::launch::deferred, func1);

    //等一段时间,若异步的有结果了则返回异步的,否则返回别的
    std::future_status status = f.wait_for(std::chrono::milliseconds(500));
    switch (status)
    {
    case std::future_status::deferred:
    std::cout << "还未开启异步任务" << std::endl;
    break;
    case std::future_status::ready:
    std::cout << "异步任务已完成" << std::endl;
    break;
    case std::future_status::timeout:
    std::cout << "异步任务等待时间到了,还没结束" << std::endl;
    break;
    default:
    break;
    }
    }
    }
  • wait_for会阻塞当前线程,等待固定的时间,不会强制启动任务

  • wait_until也一样,只不过可以等待到某具体时间点
  • 等待超时后,当前线程会继续执行不再等待,而future线程中的任务并不会终止,还是会继续执行完
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 测试利用等待0时间来轮询
void TestPolling()
{

//这样会一直卡住,c++标准库中对不带launch选项的async的解释,是会自动在合适的时机拉起,但是这里gcc4.8中,看起来永远不会开启任务,一直卡在循环中
// std::future<int> result = std::async(func1);
// while(result.wait_for(std::chrono::seconds(0)) != std::future_status::ready){
// std::cout << "wait for 0, polling..." << (result.wait_for(std::chrono::seconds(0)) == std::future_status::timeout) << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(1));
// }
// std::cout << "wait for polling done:" << result.get() << std::endl;

//这样也会一直卡住,或许是运行环境的原因,这里和《C++标准库》里的描述不同。实际运行时,这个wait_for(0)一直会返回timeout
//可能在某些编译器或者运行环境下,不带launch的async是会立即启动的
// std::future<int> result = std::async(func1);
// if(result.wait_for(std::chrono::seconds(0)) != std::future_status::deferred){
// while(result.wait_for(std::chrono::seconds(1)) != std::future_status::ready){
// std::cout << "wait for 0, polling..." << std::endl;
// std::this_thread::sleep_for(std::chrono::seconds(1));
// }
// }

// 貌似只有这样,主动用async 来launch,才能polling成功
std::future<int> result = std::async(std::launch::async, func1);
if (result.wait_for(std::chrono::seconds(0)) != std::future_status::deferred)
{
while (result.wait_for(std::chrono::milliseconds(100)) != std::future_status::ready)
{
std::cout << "wait for 100ms, polling..." << std::endl;
std::this_thread::yield();
}
}
std::cout << "wait for polling done:" << result.get() << std::endl;
}

可以通过不断的sleep和wait_for,来轮询等待,直到任务完成。

std::shared_future多次获取结果

默认情况下一个std::future只能get一次,多次get的话会抛出std::future_error异常。标准库提供了shared_future来供需要多次get的场景使用。

1
2
3
4
5
6
7
8
9
10
void TestSharedAsync()
{

std::cout << "main thread:" << std::this_thread::get_id() << std::endl;
std::cout << "start func1 in background and func2 foreground" << std::endl;
//std::future<int> result1(std::async(func1));
std::shared_future<int> result1(std::async(func1));
int result2 = func2();
int result = result1.get() + result1.get() + result2;
std::cout << " final result :" << result << std::endl;
}

std::future多次调用get会抛出std::future_error异常,换成std::shared_future就好了。

参数传递

通过future创建并发任务时,有时候需要给被调函数传入参数,下面的代码测试了通过 lamda值传递、函数值传递、lamda引用传递、函数引用传递 几种方式来传参:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void TestPassingArguments()
{

char a = 'a', b='b', c='c', d='d';
//lamda值传递
std::future<int> res1 = std::async(std::launch::async, [=]{return doSomething(a);});
//函数值传递
std::future<int> res2 = std::async(std::launch::async, doSomething, b);

//lamda引用传递
std::future<int> res3 = std::async(std::launch::async, [&]{return doSomething(c);});
//函数引用传递
std::future<int> res4 = std::async(std::launch::async, doSomething, std::ref(d));

std::cout << "res1:" << res1.get() << "res2:" << res2.get()<< "res3:" << res3.get()<< "res4:" << res4.get() << std::endl;
}

异常处理

如果异步任务中抛出了异常,程序应该怎么处理呢

1
2
3
4
5
6
7
8
9
10
11
12
13
//测试异常处理
void TestHandleException()
{

try
{
std::future<void> ex(std::async(func_ex));
ex.get();
}
catch (std::exception &e)
{
std::cout << "e:" << e.what() << std::endl;
}
}

  • 像普通的异常一样catch
  • 这个异常是在调用get后,才能得到并抛出的,就像get一个普通的结果一样

☞ 参与评论