云诺说 - 小程序开发 - 软件定制
当前位置: RabbitMQ > RabbitMQ四种Exchange类型之Topic(Erlang)

RabbitMQ四种Exchange类型之Topic(Erlang)

2019-07-15 05:37 分类:RabbitMQ 作者:云诺 阅读(1957)

版权声明:本文为博主原创文章,如果转载请给出原文链接:http://doofuu.com/article/4156156.html

Topic类型的Exchange是要进行路由键匹配的。此时需要通过路由键将队列绑定要一个交换器上。规则如下:

  • 符号“#”匹配一个或多个词,例如:“logs.#”能够匹配到“logs.error”、“logs.info.toc”

  • 符号“*”只能匹配一个词。例如:“logs.*” 只能匹配到“logs.error”,不能匹配到“logs.info.toc” 。

如下图:

QQ截图20190715172543.png

下面就直接上代码吧!!

消费者:

-module(mod_topic_receive).

-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).

-export([start_link/1]).
-include("common.hrl").

-record(state, {routing_key = <<"">>}).


start_link(RoutingKey) ->
	Server1 = lists:concat([?MODULE,erlang:binary_to_list(RoutingKey)]),
	Server2 = erlang:list_to_atom(Server1),
	gen_server:start_link({local,Server2}, ?MODULE, [RoutingKey], []).


init([RoutingKey]) ->
	start(RoutingKey),
    {ok, #state{routing_key=RoutingKey}}.

handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.


handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info({'basic.consume_ok',_}, State) ->
	{noreply, State};
handle_info({#'basic.deliver'{},#amqp_msg{payload=Msg}}, State) ->
	io:format(" [routing_key = ~p] receive  messages is ~p~n",[State#state.routing_key,Msg]),
	{noreply, State};

handle_info(Info, State) ->
	io:format("[routing_key = ~p] unknown messages is ~p~n", [State#state.routing_key,Info]),
    {noreply, State}.


terminate(_Reason, _State) ->
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

start(RoutingKey) ->
	Params = #amqp_params_network{host=?HOST,username=?USER_NAME,password=?PASSWORD},
	case amqp_connection:start(Params) of
		{ok,ConnectionPid} ->
			{ok, Channel} = amqp_connection:open_channel(ConnectionPid),
			%%生成队列名称
			Queue = lists:concat([fanout_queue,now_time()]),
			QueueName = erlang:list_to_binary(Queue),
			%%声明队列
			amqp_channel:call(Channel, #'queue.declare'{queue = QueueName,auto_delete=true}),
			%%声明exchange
			amqp_channel:call(Channel, #'exchange.declare'{ auto_delete =true,exchange = <<"topic">>, type = ?EXCHANGE_TYPE_TOPIC}),
			%%队列绑定到exchange
			amqp_channel:call(Channel, #'queue.bind'{queue = QueueName, exchange = <<"topic">>,routing_key = RoutingKey}),
			io:format(" [routing_key = ~p] Waiting for messages......~n",[RoutingKey]),
			amqp_channel:subscribe(Channel, #'basic.consume'{queue = QueueName,no_ack = true}, self());
		{error,Resaon} ->
			io:format("[routing_key = ~p] connection rabbit error: ~p~n", [RoutingKey,Resaon]),
			Resaon
	end.

now_time()->
	{A, B, _} = os:timestamp(),
    A * 1000000 + B.

生产者:

-module(mod_topic_send).

-export([send/2]).
-include("common.hrl").
send(Msg,RoutingKey) ->
	Params = #amqp_params_network{host=?HOST,username=?USER_NAME,password=?PASSWORD},
	case amqp_connection:start(Params) of
		{ok,ConnectionPid} ->
			{ok, Channel} = amqp_connection:open_channel(ConnectionPid),
			amqp_channel:cast(Channel,
							  #'basic.publish'{routing_key = RoutingKey, 
											   exchange = <<"topic">>},
							  #amqp_msg{payload = Msg}),
			io:format("Sent '~p'~n",[Msg]),
			amqp_channel:close(Channel),
			amqp_connection:close(ConnectionPid);
		{error,Reason} ->Reason
	end.

common头文件:

-include("amqp_client_internal.hrl").

-define(USER_NAME 	, <<"test">>).
-define(PASSWORD 	, <<"test">>).
-define(HOST	 	, "192.168.249.128").
-define(PORT	 	, 5672).

%%exchanges type
-define(EXCHANGE_TYPE_FANOUT	, <<"fanout">> ).
-define(EXCHANGE_TYPE_DIRECT	, <<"direct">> ).
-define(EXCHANGE_TYPE_TOPIC 	, <<"topic">>  ).

结果如下:

QQ截图20190715172607.png

「创作不易,你的支持是本站持续更新最大的动力!」

赞(0) 打赏

谢谢你请我喝奶茶*^_^*

支付宝
微信
1

谢谢你请我喝奶茶*^_^*

支付宝
微信

共有 0 条评论 - RabbitMQ四种Exchange类型之Topic(Erlang)

博客简介

云诺说是一个致力于分享互联网编程技术交流、小程序开发、小程序源码分享、软件服务定制和生活记录的技术服务型学习博客网站。

微信 :LGY178888

职业 :小程序开发、软件定制

现居 :广东省-广州市-天河区

最近更新

随机文章

友情链接

欢迎与本博客交换友情链接,本博客对交换链接的网站没有要求。如果您是本博客的友情链接网站,在遇到网站运行问题时,可以随时联系,我们将免费提供技术类支持! 申请交换友链

站点统计

  • 文章总数:170 篇
  • 草稿数目:0 篇
  • 分类数目:14 个
  • 独立页面:180 个
  • 评论总数:0 条
  • 访问总量: 884757次
  • 最近更新:2025年01月21日