今天就接續著昨天的結尾,繼續看Zeromq的各種通訊模式。
day6
的環境配置第一個以 Request-Reply 來測試
Server端
// req_rep_server.cpp
#include <zmq.hpp>
#include <iostream>
#include <string>
int main(int argc, char** argv) {
std::string bind = "tcp://*:5557";
if (argc > 1) bind = argv[1]; // 例如: tcp://*:5557
zmq::context_t ctx{1};
zmq::socket_t rep{ctx, zmq::socket_type::rep};
rep.bind(bind);
std::cerr << "REP listening on " << bind << "\n";
while (true) {
zmq::message_t req;
if (!rep.recv(req, zmq::recv_flags::none)) continue;
// 直接回覆同樣 payload
rep.send(req, zmq::send_flags::none);
}
}
// req_rep_client.cpp
#include <zmq.hpp>
#include <iostream>
#include <vector>
#include <algorithm>
#include <chrono>
#include <cstring>
using clk = std::chrono::high_resolution_clock;
static long percentile(std::vector<long>& ns, double p) {
if (ns.empty()) return 0;
size_t k = static_cast<size_t>((ns.size()-1)*p);
return ns[k];
}
int main(int argc, char** argv){
std::string connect = "tcp://127.0.0.1:5557";
int count = 10000, size = 32, warmup = 200;
if (argc > 1) connect = argv[1]; // tcp://127.0.0.1:5557
if (argc > 2) count = std::stoi(argv[2]); // 次數
if (argc > 3) size = std::stoi(argv[3]); // payload bytes
if (argc > 4) warmup = std::stoi(argv[4]); // 熱身
zmq::context_t ctx{1};
zmq::socket_t req{ctx, zmq::socket_type::req};
req.connect(connect);
std::vector<uint8_t> payload(size, 0x5A);
// warmup
for (int i=0;i<warmup;i++){
zmq::message_t msg(payload.data(), payload.size());
req.send(msg, zmq::send_flags::none);
zmq::message_t rep;
req.recv(rep, zmq::recv_flags::none);
}
std::vector<long> rtts; rtts.reserve(count);
auto t0_all = clk::now();
for(int i=0;i<count;i++){
auto t0 = clk::now();
req.send(zmq::buffer(payload), zmq::send_flags::none);
zmq::message_t rep;
req.recv(rep, zmq::recv_flags::none);
auto dt = std::chrono::duration_cast<std::chrono::nanoseconds>(clk::now()-t0).count();
rtts.push_back(dt);
}
auto dur_all_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(clk::now()-t0_all).count();
std::sort(rtts.begin(), rtts.end());
long sum=0; for(auto v: rtts) sum += v;
long avg = sum / (long)rtts.size();
std::cout << "count="<<count<<" size="<<size<<"B total="<< (dur_all_ns/1e6) <<"ms "
<< "avg="<< (avg/1e3) <<"us p50="<<(percentile(rtts,0.5)/1e3)<<"us "
<< "p95="<<(percentile(rtts,0.95)/1e3)<<"us p99="<<(percentile(rtts,0.99)/1e3)<<"us\n";
}
cmake_minimum_required(VERSION 3.15)
project(zmq_perf_cpp CXX)
set(CMAKE_CXX_STANDARD 17)
find_package(Threads REQUIRED)
# 嘗試找 cppzmq(header-only),如果找不到也不致命
find_path(CPPZMQ_INCLUDE_DIRS NAMES zmq.hpp)
if (CPPZMQ_INCLUDE_DIRS)
include_directories(${CPPZMQ_INCLUDE_DIRS})
endif()
# 找 libzmq
find_library(ZMQ_LIB NAMES zmq libzmq)
if (NOT ZMQ_LIB)
message(FATAL_ERROR "libzmq not found. Install zeromq / libzmq3-dev.")
endif()
add_executable(req_rep_server req_rep_server.cpp)
target_link_libraries(req_rep_server PRIVATE ${ZMQ_LIB} Threads::Threads)
add_executable(req_rep_client req_rep_client.cpp)
target_link_libraries(req_rep_client PRIVATE ${ZMQ_LIB} Threads::Threads)
# 先到build
cd build
# 第一個shell
./req_rep_server tcp://*:5557
# 第二個shell
./req_rep_client tcp://127.0.0.1:5557 50000 32 500
REQ-REP這邊送了50000筆請求,每筆請求的payload是32bytes,由於REQ-REP是單筆處理,所以總耗時是12.4秒左右,平均是250微秒。
./req_rep_client tcp://127.0.0.1:5557 50000 32 500
count=50000 size=32B total=12389.5ms avg=247.393us p50=230.098us p95=336.878us p99=496.848us
// sub.cpp
#include <zmq.hpp>
#include <iostream>
#include <chrono>
int main(int argc, char** argv){
std::string connect = "tcp://127.0.0.1:5556";
if (argc>1) connect = argv[1];
zmq::context_t ctx{1};
zmq::socket_t sub{ctx, zmq::socket_type::sub};
sub.set(zmq::sockopt::subscribe, ""); // 訂閱全部
sub.connect(connect);
std::cerr << "SUB connected to " << connect << "\n";
long long total=0, window=0;
auto last = std::chrono::steady_clock::now();
while(true){
zmq::message_t msg;
if (!sub.recv(msg, zmq::recv_flags::none)) continue;
total++; window++;
auto now = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::seconds>(now-last).count()>=1){
auto sec = std::chrono::duration_cast<std::chrono::seconds>(now-last).count();
double rate = (double)window / (double)sec;
std::cout << "[+"<<sec<<"s] window="<<window<<" msgs rate="<< (long long)rate <<" msgs/s total="<<total<<"\n";
window=0; last=now;
}
}
}
// pub.cpp
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <vector>
#include <chrono>
int main(int argc, char** argv){
std::string bind = "tcp://*:5556";
int count = 1'000'000, size = 100, linger_ms = 0, sleep_ms = 0;
if (argc>1) bind = argv[1];
if (argc>2) count = std::stoi(argv[2]);
if (argc>3) size = std::stoi(argv[3]);
if (argc>4) linger_ms = std::stoi(argv[4]); // 關閉等待
if (argc>5) sleep_ms = std::stoi(argv[5]); // 每筆間隔(控制速率)
zmq::context_t ctx{1};
zmq::socket_t pub{ctx, zmq::socket_type::pub};
pub.set(zmq::sockopt::sndhwm, 100000);
pub.set(zmq::sockopt::linger, linger_ms);
pub.bind(bind);
std::cerr << "PUB bound at " << bind << " waiting 500ms for subs...\n";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::vector<uint8_t> payload(size, 0x42);
auto t0 = std::chrono::steady_clock::now();
for (int i=0;i<count;i++){
pub.send(zmq::buffer(payload), zmq::send_flags::none);
if (sleep_ms>0) std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
}
auto dt = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()-t0).count();
double mps = (double)count / ((double)dt/1000.0);
double mib = ((double)count*size)/(1024.0*1024.0)/((double)dt/1000.0);
std::cout << "Sent " << count << " msgs(size="<<size<<") in "<<dt<<" ms => "<<(long long)mps<<" msgs/s, "<<mib<<" MiB/s\n";
}
# 先到build
cd build
# 第一個shell
./sub tcp://127.0.0.1:5556
# 第二個shell
./pub tcp://*:5556 1000000 100 0
./pub tcp://*:5556 1000000 100 0
PUB bound at tcp://*:5556 waiting 500ms for subs...
Sent 1000000 msgs(size=100) in 654 ms => 1529051 msgs/s, 145.822 MiB/s
今天先介紹這兩個通訊模式,明天再接著介紹DEALER-ROUTER的模式。