云诺说 - 小程序开发 - 软件定制
当前位置: RabbitMQ > RabbitMQ四种Exchange类型之Headers(Erlang)

RabbitMQ四种Exchange类型之Headers(Erlang)

2019-08-01 08:28 分类:RabbitMQ 作者:云诺 阅读(3771)

版权声明:本文为博主原创文章,如果转载请给出原文链接:http://doofuu.com/article/4156163.html

    Headers 类型的Exchanges是不处理路由键的,而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。

匹配规则x-match有下列两种类型:

  • x-match = all   :表示所有的键值对都匹配才能接受到消息

  • x-match = any :表示只要有键值对匹配就能接受到消息

不过headers比较少用到,下面是headers的官方说明文档:

A headers exchange is designed to for routing on multiple attributes that are more easily expressed as message headers than a routing key. Headers exchanges ignore the routing key attribute. Instead, the attributes used for routing are taken from the headers attribute. A message is considered matching if the value of the header equals the value specified upon binding.

It is possible to bind a queue to a headers exchange using more than one header for matching. In this case, the broker needs one more piece of information from the application developer, namely, should it consider messages with any of the headers matching, or all of them? This is what the "x-match" binding argument is for. When the "x-match" argument is set to "any", just one matching header value is sufficient. Alternatively, setting "x-match" to "all" mandates that all the values must match.

Headers exchanges can be looked upon as "direct exchanges on steroids". Because they route based on header values, they can be used as direct exchanges where the routing key does not have to be a string; it could be an integer or a hash (dictionary) for example.

在Erlang里headers键值对表现的是一个三元组{Key,Type,Value} , 如下:

-type(amqp_field_type() ::
      'longstr' | 'signedint' | 'decimal' | 'timestamp' |
      'table' | 'byte' | 'double' | 'float' | 'long' |
      'short' | 'bool' | 'binary' | 'void' | 'array').
-type(amqp_property_type() ::
      'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
      'longlongint' | 'timestamp' | 'bit' | 'table').
 
-type(amqp_table() :: [{binary(), amqp_field_type(), amqp_value()}]).
-type(amqp_array() :: [{amqp_field_type(), amqp_value()}]).
-type(amqp_value() :: binary() |    % longstr
                      integer() |   % signedint
                      {non_neg_integer(), non_neg_integer()} | % decimal
                      amqp_table() |
                      amqp_array() |
                      byte() |      % byte
                      float() |     % double
                      integer() |   % long
                      integer() |   % short
                      boolean() |   % bool
                      binary() |    % binary
                      'undefined' | % void
                      non_neg_integer() % timestamp
     ).

代码实现!!!!

消费者:

-module(mod_headers_receive).
 
-behaviour(gen_server).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
 
-export([start_link/0]).
-include("common.hrl").
 
-record(state, {}).
 
start_link() ->
	gen_server:start_link({local,?MODULE}, ?MODULE, [], []).
 
init([]) ->
	start(),
    {ok, #state{}}.
 
handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.
 
 
handle_cast(_Msg, State) ->
    {noreply, State}.
 
handle_info({'basic.consume_ok',_}, State) ->
	{noreply, State};
handle_info({#'basic.deliver'{},#amqp_msg{payload=Msg}}, State) ->
	io:format(" receive  messages is ~p~n",[Msg]),
	{noreply, State};
 
handle_info(Info, State) ->
	io:format(" unknown messages is ~p~n", [Info]),
    {noreply, State}.
 
terminate(_Reason, _State) ->
    ok.
 
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.
 
start() ->
	Params = #amqp_params_network{host=?HOST,username=?USER_NAME,password=?PASSWORD},
	case amqp_connection:start(Params) of
		{ok,ConnectionPid} ->
			{ok, Channel} = amqp_connection:open_channel(ConnectionPid),
			%%生成队列名称
			Queue = lists:concat([fanout_queue,now_time()]),
			QueueName = erlang:list_to_binary(Queue),
			%%声明队列
			amqp_channel:call(Channel, #'queue.declare'{queue = QueueName,auto_delete=true}),
			
			%%声明exchange
			amqp_channel:call(Channel, #'exchange.declare'{auto_delete =true,exchange = <<"headers_test">>, type = ?EXCHANGE_TYPE_HEADERS}),
			%%队列绑定到exchange,并设置headers
			Headers = [{<<"x-match">>,longstr,<<"any">>},{<<"name">>,longstr,<<"jack">>},{<<"age">>,long,30}],
			amqp_channel:call(Channel, #'queue.bind'{queue = QueueName, exchange = <<"headers_test">>, arguments = Headers}),
			io:format("Waiting for messages......~n"),
			
			amqp_channel:subscribe(Channel, #'basic.consume'{queue = QueueName,no_ack = true}, self());
		{error,Resaon} ->
			io:format("connection rabbit error: ~p~n", [Resaon]),
			Resaon
	end.
 
now_time()->
	{A, B, _} = os:timestamp(),
    A * 1000000 + B.

生产者:

-module(mod_headers_send).
 
-export([send/1]).
-include("common.hrl").
send(Msg) ->
	Params = #amqp_params_network{host=?HOST,username=?USER_NAME,password=?PASSWORD},
	case amqp_connection:start(Params) of
		{ok,ConnectionPid} ->
			{ok, Channel} = amqp_connection:open_channel(ConnectionPid),
			Headers = [{<<"name">>,longstr,<<"jack">>},{<<"age">>,long,31}],
			amqp_channel:cast(Channel,
							  #'basic.publish'{ exchange = <<"headers_test">>},
							  #amqp_msg{props= #'P_basic'{headers=Headers},payload = Msg}),
			io:format("Sent '~p'~n",[Msg]),
			amqp_channel:close(Channel),
			amqp_connection:close(ConnectionPid);
		{error,Reason} ->Reason
	end.

Common头文件:

-include("amqp_client_internal.hrl").
 
-define(USER_NAME 	, <<"test">>).
-define(PASSWORD 	, <<"test">>).
-define(HOST	 	, "192.168.249.128").
-define(PORT	 	, 5672).
 
 
%%exchanges type
-define(EXCHANGE_TYPE_FANOUT	, <<"fanout">> ).
-define(EXCHANGE_TYPE_DIRECT	, <<"direct">> ).
-define(EXCHANGE_TYPE_TOPIC 	, <<"topic">>  ).
-define(EXCHANGE_TYPE_HEADERS	, <<"headers">>).

{<<"x-match">>,longstr,<<"any">>}运行结果如下:

QQ截图20190801201756.png

{<<"x-match">>,longstr,<<"all">>}运行结果如下:

QQ截图20190801201858.png

系统默认是{<<"x-match">>,longstr,<<"all">>}

「创作不易,你的支持是本站持续更新最大的动力!」

赞(0) 打赏

谢谢你请我喝奶茶*^_^*

支付宝
微信
1

谢谢你请我喝奶茶*^_^*

支付宝
微信

共有 0 条评论 - RabbitMQ四种Exchange类型之Headers(Erlang)

博客简介

云诺说是一个致力于分享互联网编程技术交流、小程序开发、小程序源码分享、软件服务定制和生活记录的技术服务型学习博客网站。

微信 :LGY178888

职业 :小程序开发、软件定制

现居 :广东省-广州市-天河区

最近更新

友情链接

欢迎与本博客交换友情链接,本博客对交换链接的网站没有要求。如果您是本博客的友情链接网站,在遇到网站运行问题时,可以随时联系,我们将免费提供技术类支持! 申请交换友链

站点统计

  • 文章总数:170 篇
  • 草稿数目:0 篇
  • 分类数目:14 个
  • 独立页面:180 个
  • 评论总数:0 条
  • 访问总量: 854086次
  • 最近更新:2024年12月10日