越努力,越幸运

教你如何在PHP中使用rabbitMQ

 1 year ago

1.前提

因为公司有一个接口遇到请求超时,查找原因知道是高并发造成的超时和请求失败,检查出是因为 insert 操作导致的高并发之一,想到的解决办法:1.异步 2.使用消息队列,先尝试使用消息队列解决高并发的原因。

在这里我会将如何通过php来操作 rabbitMQ 生产者和如何做到监听 rabbitMQ 消费者,至于 rabbitMQ 的安装、配置和amqp扩展类需要自己动手搞定。

2.rabbitMQ生产者

<?php
/**
 * @param $data array 要加入到消息队列的数据
 */
function rabbitMQSend($data)
{
    $r_array = json_encode($data);
    require('rabbitMQCommand.php');

    $configs = array('host'=>'ip','port'=>5672,'username'=>'guest','password'=>'guest','vhost'=>'/');
    $exchange = 'demo';
    $queue = 'demo';
    $route_key = 'demo';
    $ra = new rabbitMQCommand($configs, $exchange, $queue, $route_key);
    $ra->send($r_array);
}

3.rabbitMQ消费者

<?php
error_reporting(0);
include_once('rabbitMQCommand.php');

$configs = array('host'=>'ip','port'=>5672,'username'=>'guest','password'=>'guest','vhost'=>'/');
$exchange = 'demo';
$queue = 'demo';
$route_key = 'demo';
$ra = new rabbitMQCommand($configs, $exchange, $queue, $route_key);
class A{
    function processMessage($envelope, $queue) {
        $msg = $envelope->getBody();
        $envelopeID = $envelope->getDeliveryTag();
        $pid = posix_getpid();
        file_put_contents("log/log{$pid}.log", $msg.'|'.$envelopeID.''."\r\n",FILE_APPEND);
        $queue->ack($envelopeID);
    }
}
$a = new A();

$s = $ra->run(array($a,'processMessage'),false);

到此生产者和消费者都大致OK,不过还有个问题需要考虑,那就是如何做到监听消息队列从而消费这个步骤?答案就是使用 php 命令行模式来运行这个文件,从而达到监听队列消费这种效果。

/usr/local/php/bin/php rabbitMQReceive.php 2>&1 &

/usr/local/php/bin/php rabbitMQReceive.php 是以php运行指定的文件,2>&1 & 其中的 2>&1 不是很了解,有兴趣的可以去了解学习一下,最后面的 & 是以进程的方式运行。


经过本人的测试这个实例还是比较稳定的,这个实例是通过查找 CSDN 的一位大神分享的博客,链接是:http://blog.csdn.net/nuli888/article/details/51864338,如果对生产者和消费者整个实现过程比较有兴趣的,可以通过翻阅 rabbitMQ 官网或者研究 rabbitMQCommand.php 的代码。如果上面的消费者不稳定也可以使用本人测试过的比较稳定的实例,如下:

<?php
$routingkey='demo';
//设置你的连接
$conn_args = array('host' => 'ip', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => '/');
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    echo "Cannot connect to the broker \n ";exit;
}
//创建channel
$channel = new AMQPChannel($conn);
//创建队列
$q = new AMQPQueue($channel);
$q->setName('demo');
// 通过 curl 来调用接口来做些事情
$curl = "http://ip/demo/doSomeThingOper";
while(true){
    if($messages = $q->get(AMQP_AUTOACK)) {
        $message = $messages->getBody();
        $data = json_decode($message, true);
        curl($curl, 'post');
    } else {
        sleep(1);
    }
}
$conn->disconnnect();
function curl($curl, $method='get'){
    $ch = curl_init();
    curl_setopt( $ch, CURLOPT_URL, $curl );
    curl_setopt( $ch, CURLOPT_HEADER, false );
    curl_setopt( $ch, CURLOPT_RETURNTRANSFER, true );
    curl_setopt( $ch, CURLOPT_TIMEOUT, 0 );
    curl_setopt( $ch, CURLOPT_PROXY, null);
    if($method == 'post') {
        curl_setopt( $ch, CURLOPT_POST, t;
        curl_setopt( $ch, CURLOPT_POSTFIELDS, $data );
    }
    $str = curl_exec( $ch );
    if($error=curl_error($ch)){
        die($error);
    }
    curl_close( $ch );
}

到此消息队列搞定,希望可以帮助到大家!!!