基本概念
使用C语言接口完成官方的这个模型:
很有意思。感觉开源的东西真的好,不用自己去写C/C++服务端。
p为生产者不经过交换机,直接把数据传给消息队列,c为consumer用于消费。
这种结构在本科生的时候,经常自己写,现在用RabbitMQ来试试
代码与实例
这里有套Java的代码,其实C接口和Java接口差不多。因为本人C/C++写得多。个人觉得比Java好用,但资料很少。只能慢慢啃官方实例和教程。这一点没有Java快。
C语言版,程序运行如下:
其中RabbitMQ如下:
producter源码:
#define _CRT_SECURE_NO_WARNINGS
#include
#include
#include
#include
#include
using namespace std;
int main(int *argc, int *argv[]){
string hostName = '127.0.0.1';
int port = 5672;
amqp_socket_t *socket = nullptr;
amqp_connection_state_t conn;
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if(!socket){
cout << 'create socket failed!';
exit(1);
}
if(amqp_socket_open(socket, hostName.c_str(), port)){
cout << 'opening TCP socket failed' << endl;
exit(1);
}
//登录
if(1 != amqp_login(conn, '/vhost_cff', 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, 'cff', '123').reply_type){
cout << 'login failed' << endl;
}
//管道
amqp_channel_open(conn, 1);
while(1){
char message[64] = {' '};
cout << 'please input the msg: ';
cin.getline(message, sizeof(message));
amqp_bytes_t message_bytes;
message_bytes.len = sizeof(message);
message_bytes.bytes = message;
amqp_basic_publish(conn, 1, amqp_cstring_bytes(''), amqp_cstring_bytes('test_simple_queue'), 0, 0, nullptr, message_bytes);
cout << 'send msg over!' << endl;
}
getchar();
return 0;
}
consumer源码:
#define _CRT_SECURE_NO_WARNINGS
#include
#include
#include
#include
#include
using namespace std;
int main(int *argc, int *argv[]){
string hostName = '127.0.0.1';
int port = 5672;
amqp_socket_t *socket = nullptr;
amqp_connection_state_t conn;
conn = amqp_new_connection();
socket = amqp_tcp_socket_new(conn);
if(!socket){
cout << 'create socket failed!';
exit(1);
}
if(amqp_socket_open(socket, hostName.c_str(), port)){
cout << 'opening TCP socket failed' << endl;
exit(1);
}
//登录
if(1 != amqp_login(conn, '/vhost_cff', 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, 'cff', '123').reply_type){
cout << 'login failed' << endl;
exit(1);
}
amqp_channel_open(conn, 1);
while(1){
amqp_basic_consume_ok_t *msg = amqp_basic_consume(conn, 1, amqp_cstring_bytes('test_simple_queue'), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
amqp_envelope_t envelope;
amqp_consume_message(conn, &envelope, NULL, 0);
cout << 'The result is : ' << (char *)envelope.message.body.bytes << endl;
}
getchar();
return 0;
}
这里只要把amqp_basic_xxxx中关于交换机和路由key设置为空,就可以直接发送到队列上,不需要经过交换机。也就是官方的这个模型
使用amqp_queue_declare这个函数可以声明一个队列,也就是当RabbitMQ没有队列的时候,会自动生成一个。在本实验中没有用到这个。
总体来说实现功能还是比较简单的,但写好一个程序还是有难度的。
源码打包下载:
文章为作者独立观点,不代表股票交易接口观点