版权声明:本文为博主原创文章,如果转载请给出原文链接:http://doofuu.com/article/4156138.html
关于RabbitMQ四种Exchange类型的介绍请看这篇博客,这里只贴出用Erlang实现的代码。
消费者:
-module(mod_fanout_receive). -behaviour(gen_server). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([start_link/1,start/1]). -include("common.hrl"). -record(state, {}). %% 启动Num个server start(Num) -> lists:foreach( fun(N) -> start_link(erlang:list_to_atom(lists:concat([?MODULE,N]))) end , lists:seq(1, Num)). start_link(Server) -> gen_server:start_link({local,Server}, ?MODULE, [], []). init([]) -> start(), {ok, #state{}}. handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. handle_cast(_Msg, State) -> {noreply, State}. handle_info({'basic.consume_ok',_}, State) -> {noreply, State}; handle_info({#'basic.deliver'{},#amqp_msg{payload=Msg}}, State) -> io:format(" [~p] receive messages is ~p~n",[self(),Msg]), {noreply, State}; handle_info(Info, State) -> io:format("[~p] unknown messages is ~p~n", [self(),Info]), {noreply, State}. terminate(_Reason, _State) -> ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. start() -> Params = #amqp_params_network{host=?HOST,username=?USER_NAME,password=?PASSWORD}, case amqp_connection:start(Params) of {ok,ConnectionPid} -> {ok, Channel} = amqp_connection:open_channel(ConnectionPid), %%生成队列名称 Queue = lists:concat([fanout_queue,erlang:pid_to_list(self())]), QueueName = erlang:list_to_binary(Queue), %%声明队列 amqp_channel:call(Channel, #'queue.declare'{queue = QueueName}), %%声明exchange amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"logs">>, type = ?EXCHANGE_TYPE_FANOUT}), %%队列绑定到exchange amqp_channel:call(Channel, #'queue.bind'{queue = QueueName, exchange = <<"logs">>}), io:format(" [~p] Waiting for messages......~n",[self()]), amqp_channel:subscribe(Channel, #'basic.consume'{queue = QueueName,no_ack = true}, self()); {error,Resaon} -> io:format("[~p] connection rabbit error: ~p~n", [self(),Resaon]), Resaon end.
生产者:
-module(mod_fanout_send). -export([send/1]). -include("common.hrl"). send(Msg) -> Params = #amqp_params_network{host=?HOST,username=?USER_NAME,password=?PASSWORD}, case amqp_connection:start(Params) of {ok,ConnectionPid} -> {ok, Channel} = amqp_connection:open_channel(ConnectionPid), amqp_channel:cast(Channel, #'basic.publish'{ exchange = <<"logs">>}, #amqp_msg{payload = Msg}), io:format("Sent '~p'~n",[Msg]), amqp_channel:close(Channel), amqp_connection:close(ConnectionPid); {error,Reason} ->Reason end.
common头文件:
-include("amqp_client_internal.hrl"). -define(USER_NAME , <<"test">>). -define(PASSWORD , <<"test">>). -define(HOST , "192.168.249.128"). -define(PORT , 5672). %%exchanges type -define(EXCHANGE_TYPE_FANOUT, <<"fanout">>).
结果如下:
共有 0 条评论 - RabbitMQ四种Exchange类型之Fanout (Erlang)