This commit is contained in:
CismonX 2017-08-18 23:26:46 +08:00
commit 39c8afa223
Signed by: cismonx
GPG Key ID: 3094873E29A482FB
6 changed files with 374 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
vendor/
composer.lock

21
LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017 CismonX
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

61
README.md Normal file
View File

@ -0,0 +1,61 @@
## Workerman-Amp
### 概述
[Amp](http://amphp.org/) 是一个事件驱动的 PHP 框架,与 ReactPHP 类似。
本项目用于将 Amp 的 event-loop 应用于 Workerman从而可以在 Workerman 中使用基于 Amp 的高性能组件,例如异步 MySQL异步 Redis异步 HTTP 客户端等。
笔者修改了本项目的源码,使其兼容原生 Amp。现在无需修改 Amp 的源码就可以在项目中使用其所有组件。以后笔者可能会发布一些 Demo。
### 使用说明
1. 使用 composer 将 `Workerman\Events\Amp` 加载到项目中。
```bash
composer require cismonx/workerman-amp
```
2. 将 Amp 设置为 Workerman 所使用的 event-loop。如下
```php
Worker::$eventLoopClass = '\\Workerman\\Events\\Amp';
```
### 兼容 Amp 协程
在 Amp 中,协程是一个十分重要的特性。对 Workerman 的源码稍加修改,将 watcher 回调中的同步调用(`call_user_func`)改为异步调用(`yield \Amp\call()`),即可在 Workerman 的事件回调函数中直接使用 Amp 协程。
以下几点需要注意:
1. 以上提到的对 Workerman 源码的修改在 src 目录下的 coroutine-compatibility.patch 中。这个补丁对 [Workerman-3.4.5-stable](https://github.com/walkor/Workerman/releases/tag/v3.4.5) 有效。
2. 目前只能在 `onConnect` `onSslHandshake` `onMessage` 这三个事件回调函数中使用 Amp 协程。
以下是使用 Amp 协程的示例(测试的时候同时多发几条请求,可以看出效果)
```php
use Workerman\Worker;
Worker::$eventLoopClass = '\\Workerman\\Events\\Amp';
$worker = new Worker('http://[::]:20080');
function subtractToZero($init, $interval) {
$value = $init;
$emitter = new Amp\Emitter;
$id = \Workerman\Lib\Timer::add($interval, function () use ($emitter, &$value, &$id) {
if ($value > 0)
$emitter->emit(--$value);
else {
$emitter->complete();
\Workerman\Lib\Timer::del($id);
}
});
return $emitter->iterate();
}
$worker->onMessage = function ($connection) {
$iterator = subtractToZero(10, 1);
while (yield $iterator->advance())
var_dump($iterator->getCurrent());
$connection->send('ok');
};
Worker::runAll();
```

14
composer.json Normal file
View File

@ -0,0 +1,14 @@
{
"name" : "cismonx/workerman-amp",
"description" : "Amp event loop for Workerman.",
"type" : "library",
"require" : {
"workerman/workerman": "^3.4",
"amphp/amp": "^2.0"
},
"autoload" : {
"psr-4" : {
"Workerman\\Events\\" : "src/"
}
}
}

153
src/Amp.php Normal file
View File

@ -0,0 +1,153 @@
<?php
namespace Workerman\Events;
use Amp\ {function asyncCall, Loop};
use Workerman\Worker;
class Amp implements EventInterface {
/**
* Socket onReadable/onWritable events.
* @var array
*/
protected $_allEvents = [];
/**
* Signals.
* @var array
*/
protected $_eventSignal = [];
/**
* Timers.
* @var array
*/
protected $_eventTimer = [];
/**
* Timer Id counter.
* @var int
*/
protected static $_timerId = 1;
/**
* {@inheritdoc}
*/
public function add($fd, $flag, $func, $args = null) {
switch ($flag) {
case self::EV_READ:
$fd_key = intval($fd);
$event = Loop::onReadable($fd, function ($id, $socket) use ($func) {
//In Workerman the first parameter should be socket stream.
asyncCall($func, $socket);
});
$this->_allEvents[$fd_key][$flag] = $event;
return true;
case self::EV_WRITE:
$fd_key = intval($fd);
$event = Loop::onWritable($fd, function ($id, $socket) use ($func) {
//In Workerman the first parameter should be socket stream.
asyncCall($func, $socket);
});
$this->_allEvents[$fd_key][$flag] = $event;
return true;
case self::EV_SIGNAL:
$fd_key = intval($fd);
$event = Loop::onSignal($fd, function ($id, $signal) use ($func) {
//In Workerman the first parameter should be signal.
asyncCall($func, $signal);
});
$this->_eventSignal[$fd_key] = $event;
return true;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
$param = [$func, (array)$args, $flag, self::$_timerId];
$event = Loop::repeat($fd * 1000, \Closure::bind(function () use ($param) {
$timer_id = $param[3];
if ($param[2] === self::EV_TIMER_ONCE) {
//Loop::delay() can also do the trick.
Loop::cancel($this->_eventTimer[$timer_id]);
unset($this->_eventTimer[$timer_id]);
}
try {
asyncCall($param[0], ...$param[1]);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
} catch (\Error $e) {
Worker::log($e);
exit(250);
}
}, $this, __CLASS__));
$this->_eventTimer[self::$_timerId] = $event;
return self::$_timerId++;
default:
break;
}
return false;
}
/**
* {@inheritdoc}
*/
public function del($fd, $flag) {
switch ($flag) {
case self::EV_READ:
case self::EV_WRITE:
$fd_key = intval($fd);
if (isset($this->_allEvents[$fd_key][$flag])) {
Loop::cancel($this->_allEvents[$fd_key][$flag]);
unset($this->_allEvents[$fd_key][$flag]);
}
if (empty($this->_allEvents[$fd_key]))
unset($this->_allEvents[$fd_key]);
break;
case self::EV_SIGNAL:
$fd_key = intval($fd);
if (isset($this->_eventSignal[$fd_key])) {
Loop::cancel($this->_eventSignal[$fd_key]);
unset($this->_eventSignal[$fd_key]);
}
break;
case self::EV_TIMER:
case self::EV_TIMER_ONCE:
if (isset($this->_eventTimer[$fd])) {
Loop::cancel($this->_eventTimer[$fd]);
unset($this->_eventTimer[$fd]);
}
break;
}
return true;
}
/**
* {@inheritdoc}
*/
public function loop() {
Loop::run();
}
/**
* {@inheritdoc}
*/
public function clearAllTimer() {
foreach ($this->_eventTimer as $event)
Loop::cancel($event);
$this->_eventTimer = [];
}
/**
* {@inheritdoc}
*/
public function destroy() {
foreach ($this->_eventSignal as $event)
Loop::cancel($event);
}
/**
* {@inheritdoc}
*/
public function getTimerCount() {
return count($this->_eventTimer);
}
}

View File

@ -0,0 +1,123 @@
Index: Connection/TcpConnection.php
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- Connection/TcpConnection.php (revision 212aa15bc0edc1226246de52a2fbece0db58414d)
+++ Connection/TcpConnection.php (revision )
@@ -401,7 +401,7 @@
}
if (isset($this->onSslHandshake)) {
try {
- call_user_func($this->onSslHandshake, $this);
+ yield \Amp\call($this->onSslHandshake, $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
@@ -477,7 +477,7 @@
}
try {
// Decode request buffer before Emitting onMessage callback.
- call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
+ yield \Amp\call($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
} catch (\Exception $e) {
Worker::log($e);
exit(250);
@@ -500,7 +500,7 @@
return;
}
try {
- call_user_func($this->onMessage, $this, $this->_recvBuffer);
+ yield \Amp\call($this->onMessage, $this, $this->_recvBuffer);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
@@ -526,7 +526,7 @@
// Try to emit onBufferDrain callback when the send buffer becomes empty.
if ($this->onBufferDrain) {
try {
- call_user_func($this->onBufferDrain, $this);
+ yield \Amp\call($this->onBufferDrain, $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
Index: Connection/AsyncTcpConnection.php
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- Connection/AsyncTcpConnection.php (revision 212aa15bc0edc1226246de52a2fbece0db58414d)
+++ Connection/AsyncTcpConnection.php (revision )
@@ -292,7 +292,7 @@
// Try to emit onConnect callback.
if ($this->onConnect) {
try {
- call_user_func($this->onConnect, $this);
+ yield \Amp\call($this->onConnect, $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
@@ -304,7 +304,7 @@
// Try to emit protocol::onConnect
if (method_exists($this->protocol, 'onConnect')) {
try {
- call_user_func(array($this->protocol, 'onConnect'), $this);
+ yield \Amp\call(array($this->protocol, 'onConnect'), $this);
} catch (\Exception $e) {
Worker::log($e);
exit(250);
Index: .gitignore
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- .gitignore (revision 212aa15bc0edc1226246de52a2fbece0db58414d)
+++ .gitignore (revision )
@@ -1,4 +1,5 @@
logs
+vendor
.buildpath
.project
.settings
Index: composer.json
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- composer.json (revision 212aa15bc0edc1226246de52a2fbece0db58414d)
+++ composer.json (revision )
@@ -24,7 +24,8 @@
"source": "https://github.com/walkor/workerman"
},
"require": {
- "php": ">=5.3"
+ "php": ">=7.0",
+ "amphp/amp": "^2.0@dev"
},
"suggest": {
"ext-event": "For better performance. "
Index: Worker.php
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
--- Worker.php (revision 212aa15bc0edc1226246de52a2fbece0db58414d)
+++ Worker.php (revision )
@@ -1625,7 +1625,7 @@
// Try to emit onConnect callback.
if ($this->onConnect) {
try {
- call_user_func($this->onConnect, $connection);
+ yield \Amp\call($this->onConnect, $connection);
} catch (\Exception $e) {
self::log($e);
exit(250);
@@ -1661,7 +1661,7 @@
}
ConnectionInterface::$statistics['total_request']++;
try {
- call_user_func($this->onMessage, $connection, $recv_buffer);
+ yield \Amp\call($this->onMessage, $connection, $recv_buffer);
} catch (\Exception $e) {
self::log($e);
exit(250);