中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

TP5.0.24+Workerman+定時任務

發布時間:2020-08-05 16:15:40 來源:網絡 閱讀:536 作者:qq5b8f70dcdd315 欄目:web開發

1.安裝 Workerman
安裝GatewayWorker內核文件(不包含start_gateway.php start_businessworker.php等啟動入口文件),直接上composer

composer require workerman/gateway-worker

2.創建 Workerman 啟動文件
創建一個自定義命令類文件來啟動 Socket 服務端,新建

application/push/command/Workerman.php
namespace app\push\command;

use Workerman\Worker;
use GatewayWorker\Register;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class Workerman extends Command
{

    protected function configure()
    {
        $this->setName('workerman')
            ->addArgument('action', Argument::OPTIONAL, "action  start|stop|restart")
            ->addArgument('type', Argument::OPTIONAL, "d -d")
            ->setDescription('workerman chat');
    }

    protected function execute(Input $input, Output $output)
    {
        global $argv;
        $action = trim($input->getArgument('action'));
        $type   = trim($input->getArgument('type')) ? '-d' : '';

        $argv[0] = 'chat';
        $argv[1] = $action;
        $argv[2] = $type ? '-d' : '';

        $this->start();
    }

    private function start()
    {
        $this->startGateWay();
        $this->startBusinessWorker();
        $this->startRegister();
        Worker::runAll();
    }

    private function startBusinessWorker()
    {
        // bussinessWorker 進程
        $worker = new BusinessWorker();
        // worker名稱
        $worker->name = 'YourAppBusinessWorker';
        // bussinessWorker進程數量
        $worker->count = 4;
        //設置處理業務的類,此處制定Events的命名空間
        $worker->eventHandler= \app\push\controller\Events::class;
        // 服務注冊地址
        $worker->registerAddress = '127.0.0.1:1238';
    }

    private function startGateWay()
    {
        // gateway 進程,這里使用Text協議,可以用telnet測試
        $gateway = new Gateway("websocket://0.0.0.0:8282");
        // gateway名稱,status方便查看
        $gateway->name = 'YourAppGateway';
        // gateway進程數
        $gateway->count = 4;
        // 本機ip,分布式部署時使用內網ip
        $gateway->lanIp = '127.0.0.1';
        // 內部通訊起始端口,假如$gateway->count=4,起始端口為4000
        // 則一般會使用4000 4001 4002 4003 4個端口作為內部通訊端口
        $gateway->startPort = 20003;
        // 服務注冊地址
        $gateway->registerAddress = '127.0.0.1:1238';
        // 心跳間隔
        $gateway->pingInterval = 55;
        $gateway->pingNotResponseLimit = 1;
        // 心跳數據
        $gateway->pingData = '';
    }

    private function startRegister()
    {
        new Register('text://0.0.0.0:1238');
    }
}

配置 application/command.php 文件

return [
    'app\common\command\Workerman',
];

3.創建事件監聽文件
創建 application/push/controller/Events.php 文件來監聽處理 workerman 的各種事件。

<?php

namespace app\push\controller;

use GatewayWorker\Lib\Gateway;
use think\Hook;
use Workerman\Lib\Timer;

class Events
{

    //定時器間隔
    protected static $interval = 2;
    //定時器
    protected static $timer = null;
    //事件處理類
    protected static $evevtRunClass = \app\push\controller\EvevtRun::class;
    /*
     * 消息事件回調 class
     *
     * */
    protected static $eventClassName = \app\push\controller\Push::class;
    /**
     * 當客戶端發來消息時觸發
     * @param int $client_id 連接id
     * @param mixed $message 具體消息
     */
    public static function onMessage($client_id, $message)
    {
        $message_data = json_decode($message,true);
        if (!$message_data) return ;
        try{
            if(!isset($message_data['type'])) throw new \Exception('缺少消息參數類型');
            //消息回調處理
            $evevtName = self::$eventClassName.'::instance';
            if(is_callable($evevtName))
                $evevtName()->start($message_data['type'],$client_id,$message_data);
            else
                throw new \Exception('消息處理回調不存在。['+$evevtName+']');
        }catch (\Exception $e){
            var_dump([
                'file'=>$e->getFile(),
                'code'=>$e->getCode(),
                'msg'=>$e->getMessage(),
                'line'=>$e->getLine()
            ]);
        }
    }

    /**
     * 當用戶連接時觸發的方法
     * @param integer $client_id 連接的客戶端
     * @return void
     */
    public static function onConnect($client_id)
    {
        Gateway::sendToClient($client_id, json_encode(array(
            'type'      => 'init',
            'client_id' => $client_id
        )));
    }

    /**
     * 當用戶斷開連接時觸發的方法
     * @param integer $client_id 斷開連接的客戶端
     * @return void
     */
    public static function onClose($client_id)
    {
        Gateway::sendToClient($client_id,json_encode([
            'type'=>'logout',
            'message'=>"client[$client_id]"
        ]));
    }

    /**
     * 當進程啟動時
     * @param integer $businessWorker 進程實例
     */
    public static function onWorkerStart($worker)
    {
        //在進程1上開啟定時器 每self::$interval秒執行
        if($worker->id === 0){
            $last = time();
            $task = [6 => $last, 10 => $last, 30 => $last, 60 => $last, 180 => $last, 300 => $last];
            self::$timer = Timer::add(self::$interval, function() use(&$task) {
                try {
                    $now = time();
                    Hook::exec(self::$evevtRunClass);
                    foreach ($task as $sec => &$time) {
                        if (($now - $time) >= $sec) {
                            $time = $now;
                            Hook::exec(self::$evevtRunClass,'task_'.$sec);
                        }
                    }
                } catch (\Throwable $e) {}

            });
        }

    }

    /**
     * 當進程關閉時
     * @param integer $businessWorker 進程實例
     */
    public static function onWorkerStop($worker)
    {
        if($worker->id === 0) Timer::del(self::$timer);
    }
}

消息事件回調 class 方法里的處理根據自身情況編寫

<?php

namespace app\push\controller;

use app\wap\model\live\LiveUser;
use GatewayWorker\Lib\Gateway;
use app\wap\model\live\LiveHonouredGuest;
use app\wap\model\user\User;
use app\wap\model\live\LiveBarrage;

class Push
{

    /*
     * @var array 消息內容
     * */
    protected $message_data = [
        'type' => '',
        'message'=>'',
    ];
    /*
     * @var string 消息類型
     * */
    protected $message_type = '';
    /*
     * @var string $client_id
     * */
    protected $client_id    = '';
    /*
     * @var int 當前登陸用戶
     * */
    protected $uid = null;
    /*
     * @var null 本類實例化結果
     * */
    protected static $instance = null;
    /*
     *
     * */
    protected function __construct($message_data = [])
    {

    }
    /*
     * 實例化本類
     * */
    public static function instance()
    {
        if(is_null(self::$instance)) self::$instance = new static();
        return self::$instance;
    }

    /*
     * 檢測參數并返回
     * @param array || string $keyValue 需要提取的鍵值
     * @param null || bool $value
     * @return array;
     * */
    protected function checkValue($keyValue = null,$value = null)
    {
        if(is_null($keyValue))
            $message_data = $this->message_data;
        if(is_string($keyValue))
            $message_data = isset($this->message_data[$keyValue]) ? $this->message_data[$keyValue] : (is_null($value) ? '': $value);
        if(is_array($keyValue))
            $message_data = array_merge($keyValue,$this->message_data);
        if(is_bool($value) && $value === true && is_array($message_data) && is_array($keyValue)){
            $newData = [];
            foreach ($keyValue as $key => $item){
                $newData [] = $message_data[$key];
            }
            return $newData;
        }
        return $message_data;
    }

    /*
     * 開始設置回調
     * @param string $typeFnName 回調函數名
     * @param string $client_id
     * @param array $message_data
     *
     * */
    public function start($typeFnName,$client_id,$message_data)
    {

        $this->message_type = $typeFnName;

        $this->message_data = $message_data;

        $this->client_id    = $client_id;

        $this->uid = Gateway::getUidByClientId($client_id);
        //記錄用戶上線
        if($this->uid && Gateway::isOnline($client_id) && ($live_id = $this->checkValue('room')))
        {
            LiveUser::setLiveUserOnline($live_id,$this->uid,1);
        }

        if(method_exists($this,$typeFnName))
            call_user_func([$this,$typeFnName]);
        else
            throw new \Exception('缺少回調方法');
    }

    /*
     * 心跳檢測
     *
     * */
    protected  function ping()
    {
        return ;
    }

    /*
     * 綁定用戶相應客戶端
     * @param string $client_id
     * @param array $message_data
     * @return
     * */
    protected function handshake()
    {
        $message_data = $this->checkValue(['uid'=>0,'room'=>0]);
        if(!$message_data['uid'])  throw new \Exception("缺少用戶uid,無法綁定用戶");
        $new_message    = [
            'type'      => $this->message_type,
            'client_id' => $this->client_id,
            'time'      => date('H:i:s'),
            'msg'       => '綁定成功!'
        ];
        Gateway::bindUid($this->client_id,$message_data['uid']);

        //如果有群組id加入群組
        if($message_data['room']){
            // 加入某個群組(可調用多次加入多個群組) 將clientid加入roomid分組中
            Gateway::joinGroup($this->client_id, $message_data['room']);
        }

        Gateway::sendToClient($this->client_id, json_encode($new_message));
    }

    /*
     * 接受客戶端發送的消息
     * @param string $client_id 客戶端client_id
     * @param array $message_data 發送的數據
     * @return
     *
     * */
    protected function send()
    {
        list($toUid,$message,$room,$type) = $this->checkValue(['uid'=>0,'content'=>'','room'=>false,'ms_type' => 0],true);
        $client_id      = $this->client_id;
        if(!$this->uid) {
            //認證用戶信息失敗,關閉用戶鏈接
            Gateway::closeClient($client_id);
            throw new \Exception("缺少用戶uid");
        }
        $userInfo = User::get($this->uid);
        if(!$userInfo){
            //認證用戶信息失敗,關閉用戶鏈接
            Gateway::closeClient($client_id);
            throw new \Exception("用戶信息缺少");
        }
        if($room && Gateway::getClientIdCountByGroup($room)){
            $user_type = LiveHonouredGuest::where(['uid'=>$this->uid,'live_id'=>$room])->value('type');
            if(is_null($user_type)) $user_type = 2;
            $res = LiveBarrage::set([
                'live_id'=>$room,
                'uid'=>$this->uid,
                'type'=>$type,
                'barrage'=>$message,
                'add_time'=>time(),
                'is_show'=>1
            ]);
            if(!$res) throw new \Exception("寫入歷史記錄失敗");
            Gateway::sendToGroup($room,json_encode([
                'message'=>$message,
                'm_type'=>$type,
                'type'=>'message',
                'user_type'=>$user_type,
                'userInfo'=>$userInfo,
                'id'=>$res['id']
            ]));
        }else{
            $new_message    = [
                'type'      => 'reception',
                'content'   => $message,
                'time'      => date('H:i:s'),
                'timestamp' => time(),
            ];
            if(Gateway::isUidOnline($toUid)) return Gateway::sendToUid($toUid, json_encode($new_message));
        }
    }

    /*
     * 消息撤回
     * @param string $client_id
     * @param array $message_data
     * */
    protected function recall()
    {
        list($id,$room) = $this->checkValue(['id'=>0,'room'=>''],true);

        if(!$id)
            throw new \Exception('缺少撤回消息的id');

        if(!$room)
            throw new \Exception('缺少房間號');

        if(LiveBarrage::del($id)){
            Gateway::sendToGroup($room,json_encode([
                'type'=>'recall',
                'id'=>$id
            ]),Gateway::getClientIdByUid($this->uid));
        }
    }

}

定時任務事件處理類 按照自身情況編寫方法內邏輯

<?php

namespace app\push\controller;

use GatewayWorker\Lib\Gateway;

/*
 * 定時任務
 *
 * */

class EvevtRun
{

    /*
     * 默認定時器執行事件
     * */
    public function run()
    {

    }

    /*
     * 每隔6秒執行
     * */
    public function task_6()
    {

    }

    /*
     * 每隔10秒執行
     * */
    public function task_10()
    {

    }

    /*
     * 每隔30秒執行
     * */
    public function task_30()
    {

    }

    /*
     * 每隔60秒執行
     * */
    public function task_60()
    {

    }
    /*
     * 每隔180秒執行
     * */
    public function task_180()
    {

    }

    /*
     * 每隔300秒執行
     * */
    public function task_300()
    {

    }

}

4.啟動 Workerman 服務端
以debug(調試)方式啟動

以debug(調試)方式啟動
php think workerman start
//以daemon(守護進程)方式啟動
php think workerman start d
//停止
php think workerman stop
//重啟
php think workerman restart
//平滑重啟
php think workerman reload
//查看狀態
php think workerman status

//當你看到如下結果的時候,workerman已經啟動成功了。
Workerman[chat] start in DEBUG mode
----------------------- WORKERMAN -----------------------------
Workerman version:3.5.11          PHP version:7.0.29
------------------------ WORKERS -------------------------------
user          worker          listen                    processes status
tegic         Gateway         websocket://0.0.0.0:8282   4         [OK]
tegic         BusinessWorker  none                       1         [OK]
tegic         Register        text://0.0.0.0:1236        4         [OK]
----------------------------------------------------------------
Press Ctrl+C to stop. Start success.

5.客戶端連接使用

socket.ws.send()調用可發送消息,socket.onmessage 內是處理消息類型,即可實現長鏈接

(function (global) {

    var socketDebug = window.socketDebug == undefined ? false : window.socketDebug;
    var socket = {
        ws:null,
        connect:function () {
            var that= this;
            that.ws = new WebSocket("ws://"+document.domain+":"+window.workermanConfig.port);//這里如果使用127.0.0.1或者localhost會出現連接失敗。當時為了方便以后的維護,這里在php的全局文件里定義了一個常量來定義ip,后來本地開發完提交到linux服務器環境之后發現鏈接失敗!按照此行代碼會有效連接~
            that.ws.onopen = this.onopen;
            that.ws.onmessage = this.onmessage;
            that.ws.onclose = function(e) {
                socketDebug && console.log("連接關閉,定時重連");
                that.connect();
            };
            that.ws.onerror = function(e) {
                socketDebug && console.log("出現錯誤");
            };
        },
        onopen:function () {
            var joint = '{"type":"handshake","role":"user","uid":'+window.uid+',"room":'+window.room+'}';
            socket.ws.send(joint);
            socket.heartCheck.start();
        },
        sendMsg:function(content,type,id){
            socket.ws.send("{content:'"+content+"',m_type:'"+type+"',room:"+id+",type:'send'}")
        },
        onmessage:function (e) {
            try {
                var data = JSON.parse(e.data);
                socketDebug && console.log(data)
                switch(data.type){
                    case 'init':

                        break;
                    // 服務端ping客戶端
                    case 'ping':

                        break;
                    // 登錄 更新用戶列表
                    case 'handshake':

                        break;
                    // 提醒
                    case 'reception':

                        break;
                    //直播進行中
                    case 'live_ing':

                        break;
                    //直播結束
                    case 'live_end':

                        break;
                    //消息提醒
                    case 'message':

                        break;
                    //消息撤回
                    case 'recall':

                        break;
                    case 'ban':

                        break;
                }
            }catch (e) {
                socketDebug && console.info(e);
            }

        },
        heartCheck:{
            timeout: 3000,
            timeoutObj: null,
            start: function(){
                this.timeoutObj = setInterval(function(){
                    socket.ws.send("{'type':'ping'}");
                }, this.timeout);
            }
        }
    };

    window.onload=function () {
        socket.connect();
    };

    global.socket = socket;

    return socket
}(this));

windows 版本無法啟動,已經在商城項目中使用

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

应城市| 禹城市| 广宗县| 井研县| 玉溪市| 通山县| 张北县| 宽甸| 西吉县| 尉氏县| 佛教| 新郑市| 苗栗县| 肇庆市| 白银市| 班戈县| 保德县| 奈曼旗| 临清市| 吴江市| 建阳市| 密山市| 肥西县| 合肥市| 尚义县| 安国市| 会泽县| 邵武市| 泊头市| 江川县| 林州市| 南宫市| 千阳县| 荆门市| 车致| 汕头市| 凌源市| 临泽县| 台山市| 肃宁县| 文登市|