介紹swoole異步群發(fā)模板消息

介紹swoole異步群發(fā)模板消息

1、用的是TP5.1的框架,swoole分成一個(gè)客戶端發(fā)送接收消息,一個(gè)服務(wù)器負(fù)責(zé)處理信息

  服務(wù)端代碼,服務(wù)器要先安裝swoole拓展,用 php server.php 啟動(dòng)進(jìn)程監(jiān)聽(tīng)

推薦(免費(fèi)):swoole

<?php namespace think; date_default_timezone_set('Asia/Shanghai'); // 加載基礎(chǔ)文件 require_once __DIR__ . '/thinkphp/base.php';  // 支持事先使用靜態(tài)方法設(shè)置Request對(duì)象和Config對(duì)象  // 執(zhí)行應(yīng)用并響應(yīng) //Container::get('app')->run()->send();  //require_once __DIR__ . '/../../../thinkphp/helper.php'; use thinkcachedriverRedis; //use thinkController; use thinkDb;   class Swoole {     const errcode = array(         43004 => '需要接收者關(guān)注',         40037 => '無(wú)效模板',         40003 => '需要接收者關(guān)注',         43005 => '需要好友關(guān)系',         43019 => '需要將接收者從黑名單中移除',         44001 => '多媒體文件為空',         44002 => 'POST 的數(shù)據(jù)包為空',         44003 => '圖文消息內(nèi)容為空',         44004 => '文本消息內(nèi)容為空',         45001 => '多媒體文件大小超過(guò)限制',         45002 => '消息內(nèi)容超過(guò)限制',         45003 => '標(biāo)題字段超過(guò)限制',         45004 => '描述字段超過(guò)限制',         45005 => '鏈接字段超過(guò)限制',         45006 => '圖片鏈接字段超過(guò)限制',         45007 => '語(yǔ)音播放時(shí)間超過(guò)限制',         45008 => '圖文消息超過(guò)限制',         45009 => '接口調(diào)用超過(guò)限制',         45010 => '創(chuàng)建菜單個(gè)數(shù)超過(guò)限制',         45011 => 'API 調(diào)用太頻繁,請(qǐng)稍候再試',     );     private $serv;     private $redis;     private $conn = [         // 數(shù)據(jù)庫(kù)類型         'type'            => 'mysql',         // 服務(wù)器地址         'hostname'        => '',         // 數(shù)據(jù)庫(kù)名         'database'        => '',         // 用戶名         'username'        => '',         // 密碼         'password'        => '',         // 端口         'hostport'        => '3306',         // 連接dsn         'dsn'             => '',         // 數(shù)據(jù)庫(kù)連接參數(shù)         'params'          => [],         // 數(shù)據(jù)庫(kù)編碼默認(rèn)采用utf8         'charset'         => 'utf8',         // 數(shù)據(jù)庫(kù)表前綴         'prefix'          => 'shd_',         // 數(shù)據(jù)庫(kù)調(diào)試模式         'debug'           => true,         // 數(shù)據(jù)集返回類型         'resultset_type'  => 'array',         // 自動(dòng)寫入時(shí)間戳字段         'auto_timestamp'  => false,         // 時(shí)間字段取出后的默認(rèn)時(shí)間格式         'datetime_format' => 'Y-m-d H:i:s',         // 是否需要進(jìn)行SQL性能分析         'sql_explain'     => false,         // Builder類         'builder'         => '',         // Query類         'query'           => 'thinkdbQuery',         // 是否需要斷線重連         'break_reconnect' => false,         // 斷線標(biāo)識(shí)字符串         'break_match_str' => [],     ];      //初始化配置,監(jiān)聽(tīng)端口     public function __construct()     {         //redis         $this->redis = new Redis();          $this->serv = new swoole_server("0.0.0.0", 9501);         $this->serv->set(array(             'worker_num' => 2, //一般設(shè)置為服務(wù)器CPU數(shù)的1-4倍             'daemonize' => 1, //以守護(hù)進(jìn)程執(zhí)行             'max_request' => 10000,             'dispatch_mode' => 2,             'task_worker_num' => 8, //task進(jìn)程的數(shù)量             "task_ipc_mode " => 3, //使用消息隊(duì)列通信,并設(shè)置為爭(zhēng)搶模式             "log_file" => "taskqueueu.log" ,//日志         ));         $this->serv->on('Receive', array($this, 'onReceive'));         // bind callback         $this->serv->on('Task', array($this, 'onTask'));         $this->serv->on('Finish', array($this, 'onFinish'));         $this->serv->start();     }         //接收客戶端的請(qǐng)求并響應(yīng)     public function onReceive(swoole_server $serv, $fd, $from_id, $data)     {         echo "Get Message From Client {$fd}:{$data} ";          $serv->send($fd, '發(fā)送任務(wù)已建立,正在發(fā)送,請(qǐng)稍后查看發(fā)送記錄');          // send a task to task worker.         $serv->task($data);//投遞任務(wù)     }      public function onTask($serv, $task_id, $from_id, $data)     {         echo "Task {$task_id} task ";         $array = json_decode($data, true);         $success = 0;         $fail = 0;         $log = '';          $access_token = $array['access_token'];          $openid_list = $this->redis->sMembers($array['appid'].'users');//從redis取出要批量發(fā)送的openid          $fields = json_decode($array['data'],true);         $send_data = array();                 $start = time();         //模板消息         foreach ($openid_list as $openid) {             $template = array(                 'touser' => $openid,                 'template_id' => $array['tem_id'],                 'url' => $array['url'],                 'topcolor' => "#000000",                 'data' => $send_data,             );               $url = "https://api.weixin.qq.com/cgi-bin/message/template/send?access_token=" . $access_token;             $res = $this->send_post($url, $template);             $res_arr = json_decode($res, true);             if ($res_arr['errcode'] == 0){                 ++ $success;             }else{                 ++ $fail;                 $log = self::errcode[$res_arr['errcode']];             }          }         $result = array('success'=>$success,'fail'=>$fail,'tem_id'=>$array['tem_id'],'uid'=>$array['uid'],'data'=>$array['data'],'url'=>$array['url'],'log'=>$log,'start'=>$start);         return json_encode($result);      }      //任務(wù)執(zhí)行完自動(dòng)回調(diào)結(jié)束方法     public function onFinish($serv, $task_id, $data)     {         $array = json_decode($data,true);         $fields = json_decode($array['data'],true);          //獲取當(dāng)前模板         $list =  Db::connect($this->conn)->name('wechat_template')->where('template_id',$array['tem_id'])->where('uid',$array['uid'])->find();          $new_field = $list['field'];                  $insert['template_id'] = $array['tem_id'];         $insert['success'] = $array['success'];         $insert['fail'] = $array['fail'];         $insert['url'] = $array['url'];         $insert['log'] = $array['log'];         $insert['create_time'] = date('Y-m-d H:i:s',$array['start']);         $insert['finish_time'] = date('Y-m-d H:i:s');          Db::connect($this->conn)->name('wechat_template_log')->insert($insert);         echo "Task{$data} {$task_id} finish ";      }      function send_post($url, $post_data) {         $postdata=json_encode($post_data,JSON_UNESCAPED_UNICODE);         $options = array(             'http' => array(                 'method' => 'POST',                 'header' => 'Content-type:application/x-www-form-urlencoded',                 'content' => $postdata, //            'protocol_version' => 1.1, //            'header' => [ //                'Connection: close', //            ],                 'timeout' => 2 // 超時(shí)時(shí)間(單位:s)             )         );         $context = stream_context_create($options);         $result = file_get_contents($url, false, $context);          return $result;     }    }  $server = new Swoole();

2、客戶端請(qǐng)求,可以通過(guò)api訪問(wèn)

function send_tem_to(){         $type = input('type'); // 0 按人頭算 1 按標(biāo)簽算 2 全部粉絲         $target = input('target/s');         $field = input('fields/s');         $tem_id = input('tem_id');//模板ID,字符串         $url = input('url','');          $client = new swoole_client(SWOOLE_SOCK_TCP);//創(chuàng)建同步TCP         if (!$client->connect('127.0.0.1', 9501, 0.5))//鏈接         {             exit("connect failed. Error: {$client->errCode} ");         }               $client->send(json_encode(array('appid'=>$this->appid,'uid'=>$this->uid,'tem_id'=>$tem_id,'data'=>$field))); //發(fā)送請(qǐng)求             $rec = $client->recv();//接收返回?cái)?shù)據(jù)             $client->close();//關(guān)閉鏈接              }

以上就是介紹

? 版權(quán)聲明
THE END
喜歡就支持一下吧
點(diǎn)贊11 分享