In my previous post I showed how to create a multi process socket server in PHP with pcntl_fork. This time I’m going to extend this example and write a chat server.
Chat is far more challenging because you not only have to handle simultaneous connections but also allow communications between processes. Inter process communication (IPC) has to be close to real time, synchronized and safe from racing condition.
Before I continue let me say the example I’m going to show won’t work on Windows. It use POSIX extension which is available only on Linux-like environments.
As previously you can download the code from GitHub and try it.
$ git clone https://github.com/lukaszkujawa/php-multithreaded-socket-server.git socketserver $ cd socketserver $ php server-broadcast.php $ Listening on 127.0.0.1:4444...
From different terminals
$ telnet 127.0.0.1 4444
To see how it works you need at least two telnet sessions. When a messages is typed on one of them it should be immediately broadcasted to the others.
SocketServerBroadcast (which extends SocketServer) is heart of the application and is handled by parent process. The Parent is responsible for listening for a new connections, maintaining list of active connections and sending broadcast on a child process request.
Client connections are handled by callback “onConnect()” in server-broadcast.php. When data is received an instance of SocketClientBroadcast wraps it into an array and sends it via pipe to the parent process. The code which actually sends the data is inside SocketServerBroadcast.
public function broadcast( Array $msg ) { $msg['pid'] = posix_getpid(); $message = serialize( $msg ); $f = fopen(self::PIPENAME, 'w+'); if( !$f ) { echo "ERROR: Can't open PIPE for writtingn"; return; } fwrite($f, $this->strlenInBytes($message) . $message ); fclose($f); posix_kill($this->pid, SIGUSR1); }
To tell the parent which child send a message a PID key is added to the message array. Pipe works like a file so the array has to be converted to a string. Serialiaze() is perfect for the job. The parent listening on the other side of the pipe is unable to figure out how long a message is going to be. The child has to tell him. In order to achieve that every first 4 bytes in every message are representing an integer. The integer carries a length of the message.
fwrite($f, $this->strlenInBytes($message) . $message );
Finally, when data is sent the child has to inform the parent there is a message for him.
posix_kill($this->pid, SIGUSR1);
Posix_kill() send a SIGUSR1 signal to $this->pid which holds the parent process id.
SocketServerBroadcast register SIGUSR1 in beforeServerLoop method.
protected function beforeServerLoop() { parent::beforeServerLoop(); socket_set_nonblock( $this->sockServer ); pcntl_signal(SIGUSR1, array($this, 'handleProcess'), true); }
It also set the socketServer to work in a nonblocking mode. By default socket_accept() waits for a new connection and is blocking process execution. When the nonblocking mode is on, socket_accept() checks is there any new connection at a certain moment. If there isn’t it throws a warning and continues execution.
protected function serverLoop() { while( $this->_listenLoop ) { if( ( $client = @socket_accept( $this->sockServer ) ) === false ) { $info = array(); if( pcntl_sigtimedwait(array(SIGUSR1),$info,1) > 0 ) { if( $info['signo'] == SIGUSR1 ) { $this->handleProcess(); } } continue; }
In the main loop the server check is there a connection and if not it wait 1 second for a SIGUSR1 signal. When signal is sent pcntl_sigtimedwait() returns immediately and $this->handleProcess() is executed.
public function handleProcess() { $header = fread($this->pipe, 4); $len = $this->bytesToInt( $header ); $message = unserialize( fread( $this->pipe, $len ) ); if( $message['type'] == 'msg' ) { $client = $this->connections[ $message['pid'] ]; $msg = sprintf('[%s] (%d):%s', $client->getAddress(), $message['pid'], $message['data'] ); printf( "Broadcast: %s", $msg ); foreach( $this->connections as $pid => $conn ) { if( $pid == $message['pid'] ) { continue; } $conn->send( $msg ); } } else if( $message['type'] == 'disc' ) { unset( $this->connections[ $message['pid'] ] ); } }
Before the parent acquire a message from the pipe is has to know how long the message is. As you remember first 4 bytes hold the length.
$header = fread($this->pipe, 4); $len = $this->bytesToInt( $header );
Following code is straight forward. Read the actual message, unserialize and handle it.
That would be it. You can extend this example and create much complex application. Be cautious that the pipe communication strictly relies on [HEADER][MESSAGE] pattern. If for any reason header value will get incorrect the application will not recover. For a real live server I would suggest to implement a solution to mitigate header corruption.
It looks great but without Windows support its not useful (even if Win is not so fashionable right now). Do you have a solution for all sensible clients?
LikeLike
Thank you for your comment Jon. Yes, it can be done with pthreads. As you probably know pthreads are better fit for this job and are far more convenient to program. The extension is available for Windows. The only problem is that the extension is still in experimental mode.
LikeLike