Skip to content

Commit

Permalink
Add keepalive and some minor fixes with disposables (#14)
Browse files Browse the repository at this point in the history
* Add keepalive and some minor fixes with disposables

* Shorter callables

* Correct ping to actually send the frame instead of nothing

* Remove old method that should have been removed a long time ago

* Mask ping if client
  • Loading branch information
mbonneau authored and davidwdan committed Oct 31, 2017
1 parent 06680bb commit 6ce121f
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 70 deletions.
8 changes: 6 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
},
"autoload-dev": {
"psr-4": {
"Rx\\Websocket\\Test\\": "tests/"
}
"Rx\\Websocket\\Test\\": "test/",
"Rx\\": "vendor/reactivex/rxphp/test/Rx"
},
"files": [
"vendor/reactivex/rxphp/test/helper-functions.php"
]
},
"require": {
"react/http": "^0.7.3",
Expand Down
7 changes: 5 additions & 2 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ class Client extends Observable
private $subProtocols;
private $loop;
private $connector;
private $keepAlive;

public function __construct(string $url, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, ConnectorInterface $connector = null)
public function __construct(string $url, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, ConnectorInterface $connector = null, int $keepAlive = 60000)
{
$parsedUrl = parse_url($url);
if (!isset($parsedUrl['scheme']) || !in_array($parsedUrl['scheme'], ['wss', 'ws'])) {
Expand All @@ -46,6 +47,7 @@ public function __construct(string $url, bool $useMessageObject = false, array $
$this->subProtocols = $subProtocols;
$this->loop = $loop ?: \EventLoop\getLoop();
$this->connector = $connector;
$this->keepAlive = $keepAlive;
}

public function _subscribe(ObserverInterface $clientObserver): DisposableInterface
Expand Down Expand Up @@ -137,7 +139,8 @@ function () use ($request) {
$this->useMessageObject,
$subprotoHeader,
$nRequest,
$psr7Response
$psr7Response,
$this->keepAlive
));
});

Expand Down
76 changes: 54 additions & 22 deletions src/MessageSubject.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
use Ratchet\RFC6455\Messaging\Message;
use Ratchet\RFC6455\Messaging\MessageBuffer;
use Ratchet\RFC6455\Messaging\MessageInterface;
use Rx\Disposable\CallbackDisposable;
use Rx\Disposable\CompositeDisposable;
use Rx\DisposableInterface;
use Rx\Exception\TimeoutException;
use Rx\Observable;
use Rx\ObserverInterface;
use Rx\Subject\Subject;
Expand All @@ -32,11 +36,12 @@ public function __construct(
bool $useMessageObject = false,
$subProtocol = "",
RequestInterface $request,
ResponseInterface $response
ResponseInterface $response,
int $keepAlive = 60000
) {
$this->request = $request;
$this->response = $response;
$this->rawDataIn = $rawDataIn;
$this->rawDataIn = $rawDataIn->share();
$this->rawDataOut = $rawDataOut;
$this->mask = $mask;
$this->subProtocol = $subProtocol;
Expand All @@ -46,7 +51,7 @@ public function __construct(
function (MessageInterface $msg) use ($useMessageObject) {
parent::onNext($useMessageObject ? $msg : $msg->getPayload());
},
function (FrameInterface $frame) use ($rawDataOut) {
function (FrameInterface $frame) {
switch ($frame->getOpcode()) {
case Frame::OP_PING:
$this->sendFrame(new Frame($frame->getPayload(), true, Frame::OP_PONG));
Expand All @@ -63,31 +68,63 @@ function (FrameInterface $frame) use ($rawDataOut) {
parent::onError($exception);
}

// complete output stream
$rawDataOut->onCompleted();
$this->rawDataOut->onCompleted();

// signal subscribers that we are done here
//parent::onCompleted();
parent::onCompleted();

$this->rawDataDisp->dispose();
return;
}
},
!$this->mask
);

$this->rawDataDisp = $this->rawDataIn->subscribe(
function ($data) use ($messageBuffer) {
$messageBuffer->onData($data);
},
function (\Exception $exception) {
parent::onError($exception);
},
function () {
parent::onCompleted();
});
// keepAlive
$keepAliveObs = Observable::empty();
if ($keepAlive > 0) {
$keepAliveObs = $this->rawDataIn
->startWith(0)
->throttle($keepAlive / 2)
->map(function () use ($keepAlive, $rawDataOut) {
return Observable::timer($keepAlive)
->do(function () use ($rawDataOut) {
$frame = new Frame('', true, Frame::OP_PING);
if ($this->mask) {
$frame->maskPayload();
}
$rawDataOut->onNext($frame->getContents());
})
->delay($keepAlive)
->do(function () use ($rawDataOut) {
$rawDataOut->onError(new TimeoutException());
});
})
->switch()
->flatMapTo(Observable::never());
}

$this->rawDataDisp = $this->rawDataIn
->merge($keepAliveObs)
->subscribe(
[$messageBuffer, 'onData'],
[$this, 'parent::onError'],
[$this, 'parent::onCompleted']
);

$this->subProtocol = $subProtocol;
}

protected function _subscribe(ObserverInterface $observer): DisposableInterface
{
$disposable = new CompositeDisposable([
parent::_subscribe($observer),
$this->rawDataDisp,
new CallbackDisposable([$this->rawDataOut, 'onCompleted'])
]);

return $disposable;
}

private function createCloseFrame(int $closeCode = Frame::CLOSE_NORMAL): Frame
{
$frame = new Frame(pack('n', $closeCode), true, Frame::OP_CLOSE);
Expand All @@ -112,11 +149,6 @@ public function sendFrame(Frame $frame)
$this->rawDataOut->onNext($frame->getContents());
}

public function getControlFrames(): Observable
{
return $this->controlFrames;
}

// The ObserverInterface is commandeered by this class. We will use the parent:: stuff ourselves for notifying
// subscribers
public function onNext($value)
Expand Down
7 changes: 5 additions & 2 deletions src/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ class Server extends Observable
private $useMessageObject;
private $subProtocols;
private $loop;
private $keepAlive;

public function __construct(string $bindAddressOrPort, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null)
public function __construct(string $bindAddressOrPort, bool $useMessageObject = false, array $subProtocols = [], LoopInterface $loop = null, int $keepAlive = 60000)
{
$this->bindAddress = $bindAddressOrPort;
$this->useMessageObject = $useMessageObject;
$this->subProtocols = $subProtocols;
$this->loop = $loop ?: \EventLoop\getLoop();
$this->keepAlive = $keepAlive;
}

public function _subscribe(ObserverInterface $observer): DisposableInterface
Expand Down Expand Up @@ -124,7 +126,8 @@ function () use ($responseStream) {
$this->useMessageObject,
$subProtocol,
$psrRequest,
$negotiatorResponse
$negotiatorResponse,
$this->keepAlive
);

$observer->onNext($messageSubject);
Expand Down
35 changes: 0 additions & 35 deletions test/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
namespace Rx\Websocket\Test;

use React\EventLoop\Factory;
use Rx\Websocket\Client;
use Rx\Websocket\MessageSubject;
use Rx\Websocket\Server;

class ClientTest extends \PHPUnit_Framework_TestCase
{
Expand All @@ -29,36 +26,4 @@ function ($err) use (&$errored) {

$this->assertTrue($errored);
}

public function testRequestEndOnDispose()
{
$this->markTestSkipped();
$loop = Factory::create();

$server = new Server('tcp://127.0.0.1:1234', false, [], $loop);
$serverDisp = $server->subscribe(function (MessageSubject $ms) {
$ms->map('strrev')->subscribe($ms);
});

$value = null;

$client = new Client('ws://127.0.0.1:1234/', false, [], $loop);
$client
->subscribe(function (MessageSubject $ms) use ($serverDisp) {
$ms->onNext('Hello');
$ms
->finally(function () use ($serverDisp) {
$serverDisp->dispose();
})
->take(1)
->subscribe(function ($x) use (&$value) {
$this->assertNull($value);
$value = $x;
});
});

$loop->run();

$this->assertEquals('olleH', $value);
}
}
135 changes: 134 additions & 1 deletion test/MessageSubjectTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
use GuzzleHttp\Psr7\Request;
use GuzzleHttp\Psr7\Response;
use Ratchet\RFC6455\Messaging\Frame;
use Rx\Exception\TimeoutException;
use Rx\Observer\CallbackObserver;
use Rx\Subject\Subject;
use Rx\Testing\MockObserver;
use Rx\Websocket\MessageSubject;
use Rx\Websocket\WebsocketErrorException;

class MessageSubjectTest extends \PHPUnit_Framework_TestCase
class MessageSubjectTest extends TestCase
{
public function testCloseCodeSentToOnError()
{
Expand Down Expand Up @@ -50,4 +52,135 @@ function () use (&$closeCode) {

$this->assertEquals(4000, $closeCode);
}

public function testPingPongTimeout()
{
$dataIn = $this->createHotObservable([
onNext(200, (new Frame('', true, Frame::OP_TEXT))->getContents()),
onNext(205, (new Frame('', true, Frame::OP_TEXT))->getContents()),
]);

$dataOut = new Subject();

$ms = new MessageSubject(
$dataIn,
$dataOut,
true,
false,
'',
new Request('GET', '/ws'),
new Response(),
300
);

$result = $this->scheduler->startWithCreate(function () use ($dataOut) {
return $dataOut;
});

$this->assertMessages([
onNext(650, (new Frame('', true, Frame::OP_PING))->getContents()),
onError(950, new TimeoutException())
], $result->getMessages());
}

public function testPingPong()
{
$dataIn = $this->createHotObservable([
onNext(200, (new Frame('', true, Frame::OP_TEXT))->getContents()),
onNext(205, (new Frame('', true, Frame::OP_TEXT))->getContents()),
onNext(651, (new Frame('', true, Frame::OP_PONG))->getContents())
]);

$dataOut = new Subject();

$ms = new MessageSubject(
$dataIn,
$dataOut,
true,
false,
'',
new Request('GET', '/ws'),
new Response(),
300
);

$result = $this->scheduler->startWithDispose(function () use ($dataOut) {
return $dataOut;
}, 2000);

$this->assertMessages([
onNext(650, (new Frame('', true, Frame::OP_PING))->getContents()),
onNext(951, (new Frame('', true, Frame::OP_PING))->getContents()),
onError(1251, new TimeoutException())
], $result->getMessages());
}

public function testPingPongDataSuppressesPing()
{
$dataIn = $this->createHotObservable([
onNext(201, (new Frame('', true, Frame::OP_TEXT))->getContents()),
onNext(205, (new Frame('', true, Frame::OP_TEXT))->getContents()),
onNext(649, (new Frame('', true, Frame::OP_TEXT))->getContents())
]);

$dataOut = new Subject();

$ms = new MessageSubject(
$dataIn,
$dataOut,
true,
false,
'',
new Request('GET', '/ws'),
new Response(),
300
);

$result = $this->scheduler->startWithDispose(function () use ($dataOut) {
return $dataOut;
}, 2000);

$this->assertMessages([
onNext(949, (new Frame('', true, Frame::OP_PING))->getContents()),
onError(1249, new TimeoutException())
], $result->getMessages());
}

public function testDisposeOnMessageSubjectClosesConnection()
{
$dataIn = $this->createHotObservable([
onNext(201, (new Frame('', true, Frame::OP_TEXT))->getContents()),
onNext(205, (new Frame('', true, Frame::OP_TEXT))->getContents()),
]);

$dataOut = new MockObserver($this->scheduler);

$ms = new MessageSubject(
$dataIn,
$dataOut,
true,
false,
'',
new Request('GET', '/ws'),
new Response(),
300
);

$result = $this->scheduler->startWithDispose(function () use ($ms) {
return $ms;
}, 300);

$this->assertMessages([
onNext(201, ''),
onNext(205, ''),
], $result->getMessages());

$this->assertSubscriptions([
subscribe(0,300)
], $dataIn->getSubscriptions());

$this->assertMessages([
onCompleted(300)
], $dataOut->getMessages());
}
}
Loading

0 comments on commit 6ce121f

Please sign in to comment.