前回にひきつづきストリーム処理を扱うReactiveXの話題です。
参考)Reactive Extensions for C++
https://github.com/ReactiveX/RxCpp
「Reactive Extensions for Unity」
以前はC#/Unityでイベントの時間軸での制御をテストしましたが、似たことをC++でもやってみました。やはりクロスプラットホームで実装することを考えるとC++はどうしても考慮に入れておく必要があります。
今回マーブル図というものを作ってみました。
タイムインターバルで生成される自然数の値とキーボードで入力されるコードをマージして、指定したタイミングで受け取ります。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
#include "rxcpp/rx.hpp" #include <iostream> #include <thread> #include <string> #include <termios.h> #include <fcntl.h> #include <time.h> #include <unistd.h> #include <sys/time.h> #include <sys/types.h> std::string get_pid() { std::stringstream s; s << std::this_thread::get_id(); return s.str(); } static long prev = 0; int main(){ struct termios save_settings; struct termios settings; fd_set readfs; tcgetattr(0,&save_settings); settings = save_settings; settings.c_lflag &= ~(ECHO|ICANON); settings.c_cc[VTIME] = 0; settings.c_cc[VMIN] = 1; tcsetattr(0,TCSANOW,&settings); fcntl(0,F_SETFL,O_NONBLOCK); FD_SET(0, &readfs); auto scheduler = rxcpp::identity_current_thread(); auto start = scheduler.now() + std::chrono::milliseconds(1); auto period = std::chrono::milliseconds(500); auto skip = std::chrono::milliseconds(500); auto v1 = rxcpp::observable<>::interval(std::chrono::milliseconds(100)). map([](long v){ printf("[thread %s] Interval: %ld\n", get_pid().c_str(), v); return v; }). take(100); auto v11 = rxcpp::observable<>::interval(std::chrono::milliseconds(100)). map([](long v){ char c = 0; if((c = getchar()) == EOF){ ; }else{ printf("[thread %s] INPUT: %c %d\n", get_pid().c_str(), c, c); } return (long)c; }); auto v3 = v1.merge(v11); auto v2 = v3.buffer_with_time(period, skip, rxcpp::observe_on_new_thread()); v2.observe_on(rxcpp::synchronize_new_thread()).as_blocking(). subscribe( [](std::vector<long> v){ printf("[thread %s] OnNext:", get_pid().c_str()); std::for_each(v.begin(), v.end(), [](long a){ if(prev != a){ printf("/%ld", a); } prev = a; }); printf("\n"); }, [](){printf("[thread %s] OnCompleted\n", get_pid().c_str());}); } |
ビルド
g++ -o bufftest bufftest.cpp -std=c++17 -lpthread
※-std=c++2aでも可 C++は3年ごとにバージョンアップ
[thread 139656622569280] Interval: 1
[thread 139656622569280] Interval: 2
[thread 139656622569280] Interval: 3
[thread 139656622569280] Interval: 4
[thread 139656622569280] Interval: 5
[thread 139656622569280] Interval: 6
[thread 139656622565120] OnNext:/1/-1/2/-1/3/-1/4/-1/5/-1
[thread 139656622569280] Interval: 7
[thread 139656622569280] Interval: 8
[thread 139656622569280] Interval: 9
[thread 139656622569280] Interval: 10
[thread 139656622569280] Interval: 11
[thread 139656622565120] OnNext:/6/-1/7/-1/8/-1/9/-1/10/-1
[thread 139656622569280] Interval: 12
[thread 139656622569280] Interval: 13
[thread 139656622569280] Interval: 14
[thread 139656622569280] Interval: 15
[thread 139656622569280] Interval: 16
[thread 139656622565120] OnNext:/11/-1/12/-1/13/-1/14/-1/15/-1/16/-1
[thread 139656622569280] Interval: 17
[thread 139656622569280] Interval: 18
[thread 139656622569280] Interval: 19
[thread 139656622569280] Interval: 20
[thread 139656622569280] INPUT: s 115
[thread 139656622569280] Interval: 21
[thread 139656622569280] INPUT: a 97
[thread 139656622565120] OnNext:/17/-1/18/-1/19/-1/20/115/21
[thread 139656622569280] Interval: 22
[thread 139656622569280] INPUT: d 100
[thread 139656622569280] Interval: 23
[thread 139656622569280] INPUT: f 102
[thread 139656622569280] Interval: 24
[thread 139656622569280] INPUT: a 97
[thread 139656622569280] Interval: 25
[thread 139656622569280] INPUT: s 115
[thread 139656622569280] Interval: 26
[thread 139656622569280] INPUT: d 100
[thread 139656622565120] OnNext:/97/22/100/23/102/24/97/25/115/26/100
[thread 139656622569280] Interval: 27
[thread 139656622569280] INPUT: f 102
[thread 139656622569280] Interval: 28
[thread 139656622569280] INPUT: a 97
[thread 139656622569280] Interval: 29
[thread 139656622569280] INPUT: s 115
[thread 139656622569280] Interval: 30
[thread 139656622569280] INPUT: d 100
[thread 139656622569280] Interval: 31
[thread 139656622569280] INPUT: f 102
[thread 139656622565120] OnNext:/27/102/28/97/29/115/30/100/31/102
[thread 139656622569280] Interval: 32
[thread 139656622569280] INPUT: q 113
[thread 139656622569280] Interval: 33
[thread 139656622569280] INPUT: w 119
[thread 139656622569280] Interval: 34
[thread 139656622569280] INPUT: e 101
[thread 139656622569280] Interval: 35
[thread 139656622569280] INPUT: f 102
[thread 139656622569280] Interval: 36
[thread 139656622569280] INPUT: a 97
[thread 139656622565120] OnNext:/32/113/33/119/34/101/35/102/36/97
[thread 139656622569280] Interval: 37
[thread 139656622569280] INPUT: s 115
[thread 139656622569280] Interval: 38
[thread 139656622569280] INPUT: d 100
[thread 139656622569280] Interval: 39
[thread 139656622569280] INPUT: f 102
[thread 139656622569280] Interval: 40
[thread 139656622569280] INPUT: a 97
[thread 139656622569280] Interval: 41
[thread 139656622569280] INPUT: s 115
[thread 139656622565120] OnNext:/37/115/38/100/39/102/40/97/41/115
[thread 139656622569280] Interval: 42
[thread 139656622569280] INPUT: d 100
[thread 139656622569280] Interval: 43
[thread 139656622569280] INPUT: f 102
[thread 139656622569280] Interval: 44
[thread 139656622569280] INPUT: w 119
[thread 139656622569280] Interval: 45
[thread 139656622569280] INPUT: e 101
[thread 139656622569280] Interval: 46
[thread 139656622569280] INPUT: f 102
[thread 139656622565120] OnNext:/42/100/43/102/44/119/45/101/46
[thread 139656622569280] Interval: 47
[thread 139656622569280] Interval: 48
[thread 139656622569280] INPUT: a 97
[thread 139656622569280] Interval: 49
[thread 139656622569280] INPUT: s 115
[thread 139656622569280] Interval: 50
[thread 139656622569280] INPUT: d 100
[thread 139656622569280] Interval: 51
[thread 139656622569280] INPUT: f 102
[thread 139656622565120] OnNext:/102/47/-1/48/97/49/115/50/100/51
[thread 139656622569280] Interval: 52
[thread 139656622569280] Interval: 53
[thread 139656622569280] Interval: 54
[thread 139656622569280] Interval: 55
[thread 139656622569280] Interval: 56
[thread 139656622565120] OnNext:/102/52/-1/53/-1/54/-1/55/-1/56/-1
[thread 139656622569280] Interval: 57
2つのinterval、period、skipの値をいろいろと変えてみると理解が深まります。