消息队列的概念、原理、实现方式

概念

队列结构的一个中间件不需要立即消费消息由消费者或者订阅者进行按顺序消费

基本的流程图如下所示

流程

应用场景

冗余解耦流量削峰异步通信

实现方式

mysql:可靠、速度慢redis:速度快,对于大消息包处理较慢消息系统:可靠、专业性强

消息的触发机制

死循环的方式,故障时无法及时恢复定时任务:压力均分、但是处理量有上限守护进程的方式

解耦 (订单和配送系统)

架构设计1 采用定时任务的方式


(资料图片仅供参考)

使用配送处理系统进行处理时,将当前数据库里需要处理的订单状态更新为2,待处理完成后将状态设为1

可以每次指定更新多少条数据

流量削锋 (redis实现秒杀)

使用队列的数据结构

lpush/rpush 将数据放入列表中lpop/rpop 将数据移除列表并获取到移除的值ltrim 保留指定区间内的元素llen 获取列表长度lset 通过索引设置列表的值lindex 通过索引获取列表中的值lrange 获取指定范围的元素

图示如下

代码流程如下

秒杀程序将请求写入redis(uid,time)

检查redis列表存放的长度,超过10个直接舍弃

通过死循环读取redis数据,并存入数据库

// Spike.php 秒杀程序if(Redis::llen("lottery") < 10){    // 成功    Redis::lpush("lottery", $uid."%".microtime());}else{    // 失败}
// Warehousing.php 入库程序while(true){    $user = Redis::rpop("lottery");    if (!$user || $user == "nil") {        sleep(2);        continue;    }    $user_arr = explode($user, "%");    $insert_user = [        "uid" => $user_arr[0],        "time" => $user_arr[1]    ];    $res = DB::table("lottery_queue")->insert($insert_user);    if (!$res) {        Redis::lpush("lottery", $user);    }}

上述代码中假如并发过大的话会存在超卖的情况,此时可以使用文件锁或者redis分布式锁进行控制,先将商品放入redis list中 使用rpop进行取出,如果取不到则说明已经卖完

具体的思路及伪代码如下

// 先将商品放入redis中  $goods_id = 2;  $sql = select id,num from goods where id = $goods_id;  $res = DB::select($sql);  if (!empty($res)) {      // 也可以指定多少件      Redis::del("lottery_goods" . $goods_id);      for($i=0;$i<$res["num"];$i++){          Redis::lpush("lottery_goods . $goods_id", $i);      }      LOG::info("商品存入队列成功,数量:" . Redis::llen("lottery_goods . $goods_id"));  } else {      LOG::info($goods_id . "加入失败");  }
// 开始秒杀  $count = Redis::rpop("lottery_goods" . $goods_id);  if (!$count) {      // 商品已抢完      ...  }  // 用户抢购队列  $user_list = "user_goods_id_" . $goods_id;  $user_status = Redis::sismember($user_list, $user_id);  if ($user_status) {      // 已抢过      ...  }  // 将抢到的放到列表中  Redis::sadd($user_list, $uid);  $msg = "用户:" . $uid . "顺序" . $count;  Log::info($msg);  // 生成订单等  ...  // 减库存  $sql = update goods set num = num -1 where id = $goods_id and num > 0; // 防止超卖  DB::update($sql)  // 抢购成功

rabbitmq

架构及原理其中P代表生产者,X为交换机(channal),C代表消费者

简单使用

// Send.php  require_once __DIR__."/vendor/autoload.php";  use PhpAmqpLib\Connection\AMQPStreamConnection;  use PhpAmqpLib\Message\AMQPMessage;  $connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest");  // 创建通道  $channel = $connection->channel();  // 声明一个队列  $channel->queue_declare("user_email", false, false, false, false);  // 制作消息  $msg = new AMQPMessage("send email");  // 将消息推送到队列  $channel->basic_publish($msg, "", "user_email");  echo "[x] send email";  $channel->close();  $connection->close();
// Receive.php  require_once __DIR__."/vendor/autoload.php";  use PhpAmqpLib\Connection\AMQPStreamConnection;  use PhpAmqpLib\Message\AMQPMessage;  $connection = new AMQPStreamConnection("localhost", 5672, "guest", "guest");  //创建通道  $channel = $connection->channel();  $channel->queue_declare("user_email", false, false, false, false);  // 当收到消息时的回调函数  $callback = function($msg){      //发送邮件      echo "Received ".$msg->body."\n";  };  $channel->basic_consume("user_email", "", false, true, false, false, $callback);  // 保持监听状态  while($channel->is_open()){      $channel->wait();  }

以上就是详解PHP消息队列的实现以及运用(附流程图)的详细内容,更多请关注php中文网其它相关文章!

推荐内容