安裝好 tp5 的 rabbitmq 擴(kuò)展后,在項(xiàng)目根目錄文件添加文件 rabbitmq.php 引導(dǎo)啟動(dòng) rabbitmq。
<?php define('APP_PATH', __DIR__ . '/application/'); define('BIND_MODULE','rabbitmq/Client'); // 加載框架引導(dǎo)文件 require __DIR__ . '/thinkphp/start.php';
生成者
??private?function?queueEvent($message) ????{ //????????error_log("n******"?.?date("His")?.?"********n"?.?print_r($message,?1)?.?"n*************n",?3,? 'messag_event.log'); ????????dump($message); ????????//設(shè)置你的連接 ????????$conn_args?=?array('host'?=>?'ip',?'port'?=>?'5672',?'login'?=>?'ymq',?'password'?=>?'123456', ????????'vhost'=>'/'); ? ? ????????$content?=?$message; //創(chuàng)建連接和channel ????????$conn?=?new?AMQPConnection($conn_args); ????????if?(!$conn->connect())?{ ????????????die("Cannot?connect?to?the?broker!n"); ????????} ????????$channel?=?new?AMQPChannel($conn); ? //創(chuàng)建交換機(jī) ????????$e_name?=?'MQTT_device_event';?//交換機(jī)名 ????????$ex?=?new?AMQPExchange($channel); ????????$ex->setName($e_name); //????????$ex->setType(AMQP_EX_TYPE_TOPIC);?//direct類(lèi)型 ????????$ex->setType(AMQP_EX_TYPE_DIRECT);?//direct類(lèi)型 ????????$ex->setFlags(AMQP_DURABLE);?//持久化 ????????$ex->declareExchange(); ????}
相關(guān)推薦:《ThinkPHP教程》
立即學(xué)習(xí)“PHP免費(fèi)學(xué)習(xí)筆記(深入)”;
運(yùn)行 php 目錄運(yùn)行生產(chǎn)者
消費(fèi)者
?public?function?index() ????{ ????????//連接RabbitMQ ????????$conn_args?=?array('host'?=>?'ip',?'port'?=>?'5672',?'login'?=>?'ymq',?'password'?=>?'123456',?'vhost'? ????????=>?'/'); ? ????????$e_name?=?'MQTT_device_event';?//交換機(jī)名 ????????$q_name?=?'q_event';?//隊(duì)列名 ????????$k_route?=?'key_event';?//路由key? //創(chuàng)建連接和channel ????????$conn?=?new?AMQPConnection($conn_args); ????????if?(!$conn->connect())?{ ????????????die("Cannot?connect?to?the?broker!n"); ????????} ????????$channel?=?new?AMQPChannel($conn);? //創(chuàng)建交換機(jī) ????????$ex?=?new?AMQPExchange($channel); ????????$ex->setName($e_name); ????????$ex->setType(AMQP_EX_TYPE_DIRECT);?//direct類(lèi)型 ????????$ex->setFlags(AMQP_DURABLE);?//持久化 ????????$ex->declareExchange(); //創(chuàng)建隊(duì)列 ????????$q?=?new?AMQPQueue($channel); ????????$q->setName($q_name); ????????$q->setFlags(AMQP_DURABLE);?//持久化 ????????$q->declareQueue();?????//最好隊(duì)列object在這里declare()下,否則如果是新的queue會(huì)報(bào)錯(cuò)? //綁定交換機(jī)與隊(duì)列,并指定路由鍵,可以多個(gè)路由鍵 ????????$q->bind($e_name,?$k_route); //$q->bind($e_name,?'key_33');?? //阻塞模式接收消息 ????????echo?"Message:n"; ????????while(True){ ????????????$q->consume(function($envelope,?$queue)?{ ????????????????$msg?=?$envelope->getBody(); ????????????????//處理數(shù)據(jù) ????????????????echo?$msg?.?PHP_EOL;?//處理消息 ????????????????$queue->ack($envelope->getDeliveryTag());?//手動(dòng)發(fā)送ACK應(yīng)答 ????????????}); ????????????//$q->consume('processMessage',?AMQP_AUTOACK);?//自動(dòng)ACK應(yīng)答 ????????} ? ????????$conn->disconnect();? ????}
執(zhí)行下命令 php rabbitmq
啟動(dòng)即可
查看隊(duì)列是否被消費(fèi)
登錄 http://127.0.0.115672/#/queues? 地址
? 版權(quán)聲明
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載。
THE END