From 39c8afa223ae61f4fc88dec3ce29ab70593a19bb Mon Sep 17 00:00:00 2001 From: CismonX Date: Fri, 18 Aug 2017 23:26:46 +0800 Subject: [PATCH] archive --- .gitignore | 2 + LICENSE | 21 +++++ README.md | 61 ++++++++++++ composer.json | 14 +++ src/Amp.php | 153 +++++++++++++++++++++++++++++++ src/coroutine-compatiblity.patch | 123 +++++++++++++++++++++++++ 6 files changed, 374 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 README.md create mode 100644 composer.json create mode 100644 src/Amp.php create mode 100644 src/coroutine-compatiblity.patch diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..4f4acd3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +vendor/ +composer.lock \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..fab3f18 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2017 CismonX + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..e3fbc34 --- /dev/null +++ b/README.md @@ -0,0 +1,61 @@ +## Workerman-Amp + +### 概述 + +[Amp](http://amphp.org/) 是一个事件驱动的 PHP 框架,与 ReactPHP 类似。 + +本项目用于将 Amp 的 event-loop 应用于 Workerman,从而可以在 Workerman 中使用基于 Amp 的高性能组件,例如异步 MySQL,异步 Redis,异步 HTTP 客户端等。 + +笔者修改了本项目的源码,使其兼容原生 Amp。现在无需修改 Amp 的源码就可以在项目中使用其所有组件。以后笔者可能会发布一些 Demo。 + +### 使用说明 + +1. 使用 composer 将 `Workerman\Events\Amp` 加载到项目中。 + +```bash +composer require cismonx/workerman-amp +``` + +2. 将 Amp 设置为 Workerman 所使用的 event-loop。如下: + +```php +Worker::$eventLoopClass = '\\Workerman\\Events\\Amp'; +``` + +### 兼容 Amp 协程 + +在 Amp 中,协程是一个十分重要的特性。对 Workerman 的源码稍加修改,将 watcher 回调中的同步调用(`call_user_func`)改为异步调用(`yield \Amp\call()`),即可在 Workerman 的事件回调函数中直接使用 Amp 协程。 + +以下几点需要注意: + +1. 以上提到的对 Workerman 源码的修改在 src 目录下的 coroutine-compatibility.patch 中。这个补丁对 [Workerman-3.4.5-stable](https://github.com/walkor/Workerman/releases/tag/v3.4.5) 有效。 + +2. 目前只能在 `onConnect` `onSslHandshake` `onMessage` 这三个事件回调函数中使用 Amp 协程。 + +以下是使用 Amp 协程的示例(测试的时候同时多发几条请求,可以看出效果) + +```php +use Workerman\Worker; +Worker::$eventLoopClass = '\\Workerman\\Events\\Amp'; +$worker = new Worker('http://[::]:20080'); +function subtractToZero($init, $interval) { + $value = $init; + $emitter = new Amp\Emitter; + $id = \Workerman\Lib\Timer::add($interval, function () use ($emitter, &$value, &$id) { + if ($value > 0) + $emitter->emit(--$value); + else { + $emitter->complete(); + \Workerman\Lib\Timer::del($id); + } + }); + return $emitter->iterate(); +} +$worker->onMessage = function ($connection) { + $iterator = subtractToZero(10, 1); + while (yield $iterator->advance()) + var_dump($iterator->getCurrent()); + $connection->send('ok'); +}; +Worker::runAll(); +``` diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..f961be8 --- /dev/null +++ b/composer.json @@ -0,0 +1,14 @@ +{ + "name" : "cismonx/workerman-amp", + "description" : "Amp event loop for Workerman.", + "type" : "library", + "require" : { + "workerman/workerman": "^3.4", + "amphp/amp": "^2.0" + }, + "autoload" : { + "psr-4" : { + "Workerman\\Events\\" : "src/" + } + } +} diff --git a/src/Amp.php b/src/Amp.php new file mode 100644 index 0000000..711ba59 --- /dev/null +++ b/src/Amp.php @@ -0,0 +1,153 @@ +_allEvents[$fd_key][$flag] = $event; + return true; + case self::EV_WRITE: + $fd_key = intval($fd); + $event = Loop::onWritable($fd, function ($id, $socket) use ($func) { + //In Workerman the first parameter should be socket stream. + asyncCall($func, $socket); + }); + $this->_allEvents[$fd_key][$flag] = $event; + return true; + case self::EV_SIGNAL: + $fd_key = intval($fd); + $event = Loop::onSignal($fd, function ($id, $signal) use ($func) { + //In Workerman the first parameter should be signal. + asyncCall($func, $signal); + }); + $this->_eventSignal[$fd_key] = $event; + return true; + case self::EV_TIMER: + case self::EV_TIMER_ONCE: + $param = [$func, (array)$args, $flag, self::$_timerId]; + $event = Loop::repeat($fd * 1000, \Closure::bind(function () use ($param) { + $timer_id = $param[3]; + if ($param[2] === self::EV_TIMER_ONCE) { + //Loop::delay() can also do the trick. + Loop::cancel($this->_eventTimer[$timer_id]); + unset($this->_eventTimer[$timer_id]); + } + try { + asyncCall($param[0], ...$param[1]); + } catch (\Exception $e) { + Worker::log($e); + exit(250); + } catch (\Error $e) { + Worker::log($e); + exit(250); + } + }, $this, __CLASS__)); + $this->_eventTimer[self::$_timerId] = $event; + return self::$_timerId++; + default: + break; + } + return false; + } + + /** + * {@inheritdoc} + */ + public function del($fd, $flag) { + switch ($flag) { + case self::EV_READ: + case self::EV_WRITE: + $fd_key = intval($fd); + if (isset($this->_allEvents[$fd_key][$flag])) { + Loop::cancel($this->_allEvents[$fd_key][$flag]); + unset($this->_allEvents[$fd_key][$flag]); + } + if (empty($this->_allEvents[$fd_key])) + unset($this->_allEvents[$fd_key]); + break; + case self::EV_SIGNAL: + $fd_key = intval($fd); + if (isset($this->_eventSignal[$fd_key])) { + Loop::cancel($this->_eventSignal[$fd_key]); + unset($this->_eventSignal[$fd_key]); + } + break; + case self::EV_TIMER: + case self::EV_TIMER_ONCE: + if (isset($this->_eventTimer[$fd])) { + Loop::cancel($this->_eventTimer[$fd]); + unset($this->_eventTimer[$fd]); + } + break; + } + return true; + } + + /** + * {@inheritdoc} + */ + public function loop() { + Loop::run(); + } + + /** + * {@inheritdoc} + */ + public function clearAllTimer() { + foreach ($this->_eventTimer as $event) + Loop::cancel($event); + $this->_eventTimer = []; + } + + /** + * {@inheritdoc} + */ + public function destroy() { + foreach ($this->_eventSignal as $event) + Loop::cancel($event); + } + + /** + * {@inheritdoc} + */ + public function getTimerCount() { + return count($this->_eventTimer); + } +} \ No newline at end of file diff --git a/src/coroutine-compatiblity.patch b/src/coroutine-compatiblity.patch new file mode 100644 index 0000000..8376b8b --- /dev/null +++ b/src/coroutine-compatiblity.patch @@ -0,0 +1,123 @@ +Index: Connection/TcpConnection.php +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +--- Connection/TcpConnection.php (revision 212aa15bc0edc1226246de52a2fbece0db58414d) ++++ Connection/TcpConnection.php (revision ) +@@ -401,7 +401,7 @@ + } + if (isset($this->onSslHandshake)) { + try { +- call_user_func($this->onSslHandshake, $this); ++ yield \Amp\call($this->onSslHandshake, $this); + } catch (\Exception $e) { + Worker::log($e); + exit(250); +@@ -477,7 +477,7 @@ + } + try { + // Decode request buffer before Emitting onMessage callback. +- call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this)); ++ yield \Amp\call($this->onMessage, $this, $parser::decode($one_request_buffer, $this)); + } catch (\Exception $e) { + Worker::log($e); + exit(250); +@@ -500,7 +500,7 @@ + return; + } + try { +- call_user_func($this->onMessage, $this, $this->_recvBuffer); ++ yield \Amp\call($this->onMessage, $this, $this->_recvBuffer); + } catch (\Exception $e) { + Worker::log($e); + exit(250); +@@ -526,7 +526,7 @@ + // Try to emit onBufferDrain callback when the send buffer becomes empty. + if ($this->onBufferDrain) { + try { +- call_user_func($this->onBufferDrain, $this); ++ yield \Amp\call($this->onBufferDrain, $this); + } catch (\Exception $e) { + Worker::log($e); + exit(250); +Index: Connection/AsyncTcpConnection.php +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +--- Connection/AsyncTcpConnection.php (revision 212aa15bc0edc1226246de52a2fbece0db58414d) ++++ Connection/AsyncTcpConnection.php (revision ) +@@ -292,7 +292,7 @@ + // Try to emit onConnect callback. + if ($this->onConnect) { + try { +- call_user_func($this->onConnect, $this); ++ yield \Amp\call($this->onConnect, $this); + } catch (\Exception $e) { + Worker::log($e); + exit(250); +@@ -304,7 +304,7 @@ + // Try to emit protocol::onConnect + if (method_exists($this->protocol, 'onConnect')) { + try { +- call_user_func(array($this->protocol, 'onConnect'), $this); ++ yield \Amp\call(array($this->protocol, 'onConnect'), $this); + } catch (\Exception $e) { + Worker::log($e); + exit(250); +Index: .gitignore +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +--- .gitignore (revision 212aa15bc0edc1226246de52a2fbece0db58414d) ++++ .gitignore (revision ) +@@ -1,4 +1,5 @@ + logs ++vendor + .buildpath + .project + .settings +Index: composer.json +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +--- composer.json (revision 212aa15bc0edc1226246de52a2fbece0db58414d) ++++ composer.json (revision ) +@@ -24,7 +24,8 @@ + "source": "https://github.com/walkor/workerman" + }, + "require": { +- "php": ">=5.3" ++ "php": ">=7.0", ++ "amphp/amp": "^2.0@dev" + }, + "suggest": { + "ext-event": "For better performance. " +Index: Worker.php +IDEA additional info: +Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP +<+>UTF-8 +=================================================================== +--- Worker.php (revision 212aa15bc0edc1226246de52a2fbece0db58414d) ++++ Worker.php (revision ) +@@ -1625,7 +1625,7 @@ + // Try to emit onConnect callback. + if ($this->onConnect) { + try { +- call_user_func($this->onConnect, $connection); ++ yield \Amp\call($this->onConnect, $connection); + } catch (\Exception $e) { + self::log($e); + exit(250); +@@ -1661,7 +1661,7 @@ + } + ConnectionInterface::$statistics['total_request']++; + try { +- call_user_func($this->onMessage, $connection, $recv_buffer); ++ yield \Amp\call($this->onMessage, $connection, $recv_buffer); + } catch (\Exception $e) { + self::log($e); + exit(250);