php使用swoole实现TCP服务

这里以在Yii框架下示例

一:swoole配置TCP

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
'swoole' => [
    // 日志文件路径
    'log_file' => '@console/log/swoole.log',
    // 设置swoole_server错误日志打印的等级,范围是0-5。低于log_level设置的日志信息不会抛出
    'log_level' => 1,
    // 进程的PID存储文件
    'pid_file' => '@console/log/swoole.server.pid',
 
    // HTTP协议配置
    'http' => [
        'host' => '0.0.0.0',
        'port' => '8889',
 
        // 异步任务的工作进程数量
        'task_worker_num' => 4,
    ],
    // TCP协议配置
    'tcp' => [
        'host' => '0.0.0.0',
        'port' => '14000',
 
        // 异步任务的工作进程数量
        'task_worker_num' => 4,
 
        // 启用TCP-Keepalive死连接检测
        'open_tcp_keepalive' => 1,
        // 单位秒,连接在n秒内没有数据请求,将开始对此连接进行探测
        'tcp_keepidle' => 5 * 60,
        // 探测的次数,超过次数后将close此连接
        'tcp_keepcount' => 3,
        // 探测的间隔时间,单位秒
        'tcp_keepinterval' => 60,
 
        // 心跳检测,此选项表示每隔多久轮循一次,单位为秒
        'heartbeat_check_interval' => 2 * 60,
        // 心跳检测,连接最大允许空闲的时间
        'heartbeat_idle_time' => 5 * 60,
    ]
],

二:swoole实现TCP服务基类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
swoole = new swoole_http_server($httpHost, $httpPort);
        $this->swoole->set(ArrayHelper::merge($config, $httpConfig));
 
        $this->swoole->on('start', [$this, 'onStart']);
        $this->swoole->on('request', [$this, 'onRequest']);
        $this->swoole->on('WorkerStart', [$this, 'onWorkerStart']);
        $this->swoole->on('WorkerStop', [$this, 'onWorkerStop']);
        $this->swoole->on('task', [$this, 'onTask']);
        $this->swoole->on('finish', [$this, 'onTaskFinish']);
 
        $this->swoole->on('shutdown', [$this, 'onShutdown']);
 
        $tcpHost = ArrayHelper::remove($tcpConfig, 'host');
        $tcpPort = ArrayHelper::remove($tcpConfig, 'port');
        $tcpServer = $this->swoole->listen($tcpHost, $tcpPort, SWOOLE_SOCK_TCP);
        $tcpServer->set($tcpConfig);
        $tcpServer->on('connect', [$this, 'onConnect']);
        $tcpServer->on('receive', [$this, 'onReceive']);
        $tcpServer->on('close', [$this, 'onClose']);
    }
 
    /*
     * 启动server
     */
    public function run()
    {
        $this->swoole->start();
    }
 
    /**
     * Server启动在主进程的主线程时的回调事件处理
     *
     * @param swoole_server $server
     */
    public function onStart(swoole_server $server)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**Server Start**n", Console::FG_GREEN);
        $this->stdout("master_pid: ");
        $this->stdout("{$server->master_pid}n", Console::FG_BLUE);
 
        $this->onStartHandle($server);
 
        $this->afterExec($startedAt);
    }
 
    /**
     * 客户端与服务器建立连接后的回调事件处理
     *
     * @param swoole_server $server
     * @param integer $fd
     * @param integer $reactorId
     */
    abstract public function onConnect(swoole_server $server, int $fd, int $reactorId);
 
    /**
     * 当服务器收到来自客户端的数据时的回调事件处理
     *
     * @param swoole_server $server
     * @param integer $fd
     * @param integer $reactorId
     * @param string $data
     */
    abstract public function onReceive(swoole_server $server, int $fd, int $reactorId, string $data);
 
    /**
     * 当服务器收到来自客户端的HTTP请求时的回调事件处理
     *
     * @param swoole_http_request $request
     * @param swoole_http_response $response
     */
    abstract public function onRequest(swoole_http_request $request, swoole_http_response $response);
 
    /**
     * Worker进程/Task进程启动时发生
     *
     * @param swoole_server $server
     * @param integer $worker_id
     */
    abstract public function onWorkerStart(swoole_server $server, int $worker_id);
 
    /**
     * Worker进程/Task进程终止时发生
     *
     * @param swoole_server $server
     * @param integer $worker_id
     */
    abstract public function onWorkerStop(swoole_server $server, int $worker_id);
 
    /**
     * 异步任务处理
     *
     * @param swoole_server $server
     * @param integer $taskId
     * @param integer $srcWorkerId
     * @param mixed $data
     */
    abstract public function onTask(swoole_server $server, int $taskId, int $srcWorkerId, mixed $data);
 
    /**
     * 异步任务处理完成
     *
     * @param swoole_server $server
     * @param integer $taskId
     * @param mixed $data
     */
    abstract public function onTaskFinish(swoole_server $server, int $taskId, mixed $data);
 
    /**
     * 客户端与服务器断开连接后的回调事件处理
     *
     * @param swoole_server $server
     * @param integer $fd
     */
    abstract public function onClose(swoole_server $server, $fd);
 
    /**
     * Server正常结束时的回调事件处理
     *
     * @param swoole_server $server
     */
    public function onShutdown(swoole_server $server)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**Server Stop**n", Console::FG_GREEN);
        $this->stdout("master_pid: ");
        $this->stdout("{$server->master_pid}n", Console::FG_BLUE);
 
        $this->onShutdownHandle($server);
 
        $this->afterExec($startedAt);
    }
 
    /**
     * Server启动在主进程的主线程时的自定义事件处理
     *
     * @param swoole_server $server
     */
    protected function onStartHandle(swoole_server $server)
    {
 
    }
 
    /**
     * Server正常结束时的自定义事件处理
     *
     * @param swoole_server $server
     */
    protected function onShutdownHandle(swoole_server $server)
    {
 
    }
 
    /**
     * 获取请求路由
     *
     * @param swoole_http_request $request
     */
    protected function getRoute(swoole_http_request $request)
    {
        return ltrim($request->server['request_uri'], '/');
    }
 
    /**
     * 获取请求的GET参数
     *
     * @param swoole_http_request $request
     */
    protected function getParams(swoole_http_request $request)
    {
        return $request->get;
    }
 
    /**
     * 解析收到的数据
     *
     * @param string $data
     */
    protected function decodeData($data)
    {
        return json_decode($data, true);
    }
 
    /**
     * Before Exec
     */
    protected function beforeExec()
    {
        $startedAt = microtime(true);
        $this->stdout(date('Y-m-d H:i:s') . "n", Console::FG_YELLOW);
        return $startedAt;
    }
 
    /**
     * After Exec
     */
    protected function afterExec($startedAt)
    {
        $duration = number_format(round(microtime(true) - $startedAt, 3), 3);
        $this->stdout("{$duration} snn", Console::FG_YELLOW);
    }
 
    /**
     * Prints a string to STDOUT.
     */
    protected function stdout($string)
    {
        if (Console::streamSupportsAnsiColors(STDOUT)) {
            $args = func_get_args();
            array_shift($args);
            $string = Console::ansiFormat($string, $args);
        }
        return Console::stdout($string);
    }
}

三:swoole操作类(继承swoole基类)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
beforeExec();
 
        $this->stdout("**Connection Open**n", Console::FG_GREEN);
        $this->stdout("fd: ");
        $this->stdout("{$fd}n", Console::FG_BLUE);
 
        $this->afterExec($startedAt);
    }
 
    /**
     * @inheritdoc
     */
    public function onReceive($server, $fd, $reactorId, $data)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**Received Message**n", Console::FG_GREEN);
 
        $this->stdout("fd: ");
        $this->stdout("{$fd}n", Console::FG_BLUE);
 
        $this->stdout("data: ");//接收的数据
        $this->stdout("{$data}n", Console::FG_BLUE);
 
 
 
        $result = $server->send($fd, '回复消息');
 
 
 
        $this->afterExec($startedAt);
    }
 
 
    /**
     * @inheritdoc
     */
    public function onRequest($request, $response)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**HTTP Request**n", Console::FG_GREEN);
 
        $this->stdout("fd: ");
        $this->stdout("{$request->fd}n", Console::FG_BLUE);
 
        $response->status(200);
        $response->end('success');
 
        $this->afterExec($startedAt);
    }
 
    /**
     * @inheritdoc
     */
    public function onClose($server, $fd)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("**Connection Close**n", Console::FG_GREEN);
 
        $this->stdout("fd: ");
        $this->stdout("{$fd}n", Console::FG_BLUE);
 
        $this->afterExec($startedAt);
    }
 
 
 
    /**
     * @inheritdoc
     */
    public function onTask($server, $taskId, $srcWorkerId, $data)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("New AsyncTask: ");
        $this->stdout("{$taskId}n", Console::FG_BLUE);
        $this->stdout("{$data}n", Console::FG_BLUE);
 
        $server->finish($data);
 
        $this->afterExec($startedAt);
    }
 
    /**
     * @inheritdoc
     */
    public function onWorkerStop($server, $worker_id)
    {
        // Yii::$app->db->close();
    }
    /**
     * @inheritdoc
     */
    public function onWorkerStart($server, $worker_id)
    {
        // Yii::$app->db->open();
    }
 
 
    /**
     * @inheritdoc
     */
    public function onTaskFinish($server, $taskId, $data)
    {
        $startedAt = $this->beforeExec();
 
        $this->stdout("AsyncTask finished: ");
        $this->stdout("{$taskId}n", Console::FG_BLUE);
 
        $this->afterExec($startedAt);
    }
}

四:操作TCP服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
params['swoole'])) {
                return false;
            }
            $this->_params = Yii::$app->params['swoole'];
            $this->_http_params = ArrayHelper::remove($this->_params, 'http');
            $this->_tcp_params = ArrayHelper::remove($this->_params, 'tcp');
 
            foreach ($this->_params as &$param) {
                if (strncmp($param, '@', 1) === 0) {
                    $param = Yii::getAlias($param);
                }
            }
 
            $this->_params = ArrayHelper::merge($this->_params, [
                'daemonize' => $this->daemon
            ]);
 
            return true;
        } else {
            return false;
        }
    }
    /**
     * 启动服务
     */
    public function actionStart()
    {
        if ($this->getPid() !== false) {
            $this->stdout("WebSocket Server is already started!n", Console::FG_RED);
            return self::EXIT_CODE_NORMAL;
        }
        $server = new Server($this->_http_params, $this->_tcp_params, $this->_params);
        $server->run();
    }
 
    /**
     * 停止服务
     */
    public function actionStop()
    {
        $pid = $this->getPid();
        if ($pid === false) {
            $this->stdout("Tcp Server is already stoped!n", Console::FG_RED);
            return self::EXIT_CODE_NORMAL;
        }
 
        swoole_process::kill($pid);
    }
 
    /**
     * 清理日志文件
     */
    public function actionClearLog()
    {
        $logFile = Yii::getAlias($this->_params['log_file']);
        FileHelper::unlink($logFile);
    }
 
    /**
     * 获取进程PID
     *
     * @return false|integer PID
     */
    private function getPid()
    {
        $pidFile = $this->_params['pid_file'];
        if (!file_exists($pidFile)) {
            return false;
        }
 
        $pid = file_get_contents($pidFile);
        if (empty($pid)) {
            return false;
        }
 
        $pid = intval($pid);
        if (swoole_process::kill($pid, 0)) {
            return $pid;
        } else {
            FileHelper::unlink($pidFile);
            return false;
        }
    }
 
    /**
     * @inheritdoc
     */
    public function options($actionID)
    {
        return ArrayHelper::merge(parent::options($actionID), [
            'daemon',
            'test'
        ]);
    }
 
    /**
     * @inheritdoc
     */
    public function optionAliases()
    {
        return ArrayHelper::merge(parent::optionAliases(), [
            'd' => 'daemon',
            't' => 'test',
        ]);
    }
}

以上就是php使用swoole实现TCP服务的详细内容,更多关于php swoole实现TCP服务的资料请关注IT俱乐部其它相关文章!

本文收集自网络,不代表IT俱乐部立场,转载请注明出处。https://www.2it.club/code/php/10730.html
上一篇
下一篇
联系我们

联系我们

在线咨询: QQ交谈

邮箱: 1120393934@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

返回顶部