企业网站建设怎么策划,WordPress恶意扫描,著名食品包装设计的案例,北京做网站建设公司哪家好路由模式
在第三节中我们使用的 交换机的 fanout 把生产者的消息广播到了所有与它绑定的队列中处理#xff0c;但是我们能不能把特定的消息#xff0c;发送给指定的队列#xff0c;而不是广播给所有队列呢#xff1f; 如图#xff0c;交换机把 orange 类型的消息发送给了…路由模式
在第三节中我们使用的 交换机的 fanout 把生产者的消息广播到了所有与它绑定的队列中处理但是我们能不能把特定的消息发送给指定的队列而不是广播给所有队列呢 如图交换机把 orange 类型的消息发送给了 队列1处理 而带有 black 和 green标记的数据发送给了队列2来处理。 这时就要使用路由模式了
在路由模式中要使用交换机的类型需要是直联模式并且绑定的时候必须使用 route_key,而上节中使用的 fanout 模式会忽略这个值。
路由模式的使用方法很简单就是在交换机和队列绑定的时候提供第三个参数 $routing_key
$channel-queue_bind($queue_name, $exchange_name, $routing_key);代码和发布订阅模式的代码差不多主要是 exchange的模式要改成直联 然后在消费者的代码中binding 时指名 routing_key
生产者
?php
declare (strict_types 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class RoutingMQProduce extends Command
{protected function configure(){// 指令配置$this-setName(routingmqproduce)-setDescription(路由模式);}protected function execute(Input $input, Output $output){//获取连接$connection $this-getConnectRabbitMQ();//创建通道$channel $connection-channel();//创建交换机/*** params exchange 自定义交换机名称* params type 交换机的类型路由模式使用 直联 direct* params passive 是否消极声名* params durable 是否持久化* params auto_delete 是否自动删除* params internal 设置是否内置的, true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式* params nowait 相当于做一个异步版的声明,不等待返回就让程序继续执行*/$channel-exchange_declare(exchangeName,direct,false,false,false,false,false);//现在生产者只需要把消息发给交换机就可以了所以不用在生产者中创建队列了当然想创建也是可以的//在这里随机一个名称来做为 routing_keyfor ($i 0; $i 20; $i) {$routing_keys [orange,black,green];shuffle($routing_keys);$routing_key $routing_keys[0];$msgArr [namehaha.$routing_key, //这里把 routingkey 传过去验证age10,sexfemale.$i];$msg new AMQPMessage(json_encode($msgArr),[delivery_modeAMQPMessage::DELIVERY_MODE_PERSISTENT]);sleep(1);//这里发布时指定了 $routing_key$channel-basic_publish($msg,exchangeName,$routing_key);}$channel-close();$connection-close();}protected function getConnectRabbitMQ(){try{$connection new AMQPStreamConnection(192.168.3.228,5672,admin,123456);return $connection;}catch(Exception $e){throw new Exception(队列连接失败);}}
}
消费者代码
?php
declare (strict_types 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class RoutingMQConsumer extends Command
{protected function configure(){// 指令配置$this-setName(routingmqconsumer)-setDescription(路由模式的消费者);}protected function execute(Input $input, Output $output){$connection $this-connectRabbitMQ();$channel $connection-channel();//创建两个队列$channel-queue_declare(queueName1,false,false,false,false,false);$channel-queue_declare(queueName2,false,false,false,false,false);//绑定交换机和队列交换机的名称是在生产者中定义的$channel-queue_bind(queueName1,exchangeName,orange);$channel-queue_bind(queueName2,exchangeName,green);$channel-queue_bind(queueName2,exchangeName,black);//设置消息处理函数$callback1 function($msg){$msgArr json_decode($msg-body,true);echo 我是队列1我只处理 orange 标记的数据 .$msgArr[name].-11-.$msgArr[age].-11-.$msgArr[sex].PHP_EOL;$msg-delivery_info[channel]-basic_ack($msg-delivery_info[delivery_tag]); //这里让就是消息的应答了};$callback2 function($msg){$msgArr json_decode($msg-body,true);echo 我是队列2我处理 black和green 标记的数据 .$msgArr[name].-22-.$msgArr[age].-22-.$msgArr[sex].PHP_EOL;$msg-delivery_info[channel]-basic_ack($msg-delivery_info[delivery_tag]); //这里让就是消息的应答了};$channel-basic_consume(queueName1,,false,false,false,false,$callback1);$channel-basic_consume(queueName2,,false,false,false,false,$callback2);while(count($channel-callbacks)){$channel-wait();}}protected function connectRabbitMQ(){try{$connection new AMQPStreamConnection(192.168.3.228,5672,admin,123456);return $connection;}catch(Exception $e){throw new Exception(队列连接失败);}}
}
结果显示