Then you're in luck! This article aims at answering the "how" question, we'll explore the implementation details of Event Driven Architecture by:
- implementing a simple (and inefficient) HTTP server
- adding multiple client handling to it by creating an Event Loop
- adding time control to it by creating a Scheduler
- solving the callback hell issue by creating Deferrer and Promise
- adding "non-blocking" capacity to "blocking" calls by using a Thread Pool
A story of Input / Output
Input / Output (I/O) can refer to Client / Server communication through sockets, for example a HTTP server.
Compared to calculations, I/O is really slow! To understand how this latency can be a performance bottleneck for our applications, we're going to create a simple HTTP server implementation.
In order to do so, we need to make use of some system calls:
- create a new "internet" socket (there are other types of sockets, e.g. unix ones)
- bind this socket to a host and port
- start to listen by creating a connection queue
From this point clients can ask the permission to connect to the socket, they're
going to be queued up until the given maximum in
listen is reached, at which
point errors are going to be thrown everywhere.
To prevent this nightmare, our priority will be to keep this queue empty by calling accept: it's going to unqueue the first client and return a new socket dedicated for it, allowing the "server" socket to accept more clients.
At some point the client will send data in the socket: the HTTP Request. We'll need to call read to retrieve it. We usually need to parse the received string, for example to create a Request value object our HTTP application can understand.
The HTTP application could then return a Response value object that we'll need to convert back to string and send it to the client using write.
Finally, once done we can call close to get rid of the client and start accepting more.
If we put everything in a loop that runs forever we can handle one client at a time. Here's an implementation example (written in pseudo language):
# Socket abstracts `socket`, `bind` and `listen` http_server = new Socket(host, port, max_connections_to_queue) while true: http_connection = http_server.accept() data = http_connection.read() request = http_request_parse(data) response = application(request) http_connection.write((string) response) http_connection.close()
In our loop, for each request we call 3 I/O operations:
accept, this call will wait until a new connection is available
read, this call will wait until some data is sent from the client
write, this call will wait until the data is sent to the client
That's a lot of waiting! While we wait for data to be sent, more clients can try to connect, be queued and eventually reach the limit.
In other words, waiting is blocking. If only we could do something else while waiting...
Turns out we can, thanks to polling system calls:
- we first have to add sockets we want to use to a collection
- we then call
pollwith the collection of sockets to watch
pollwill wait until it detects activity on those, and returns the ones that are ready
As goes the saying: "Blocking. If it's not solving all your problems, you simply aren't using enough of it".
Note: There's actually many polling system calls:
select, a POSIX standard which takes 3 size fixed bitmap of sockets (read, write, error)
poll, another POSIX standard which takes an array of sockets
epoll, a stateful Linux specific system call equivalent to
kqueue, a stateful BSD (that includes Mac OS) specific system call equivalent to
IOCP, a Windows equivalent to
For more information about those, check epoll VS kqueue. In our article
pollwill refer to polling in general, not to a specific implementation.
With this we can change the architecture of our HTTP server:
- create the HTTP server socket
- add it to the collection of sockets to watch
- start our infinite loop:
- wait until some sockets are ready
- if the socket is the HTTP server one:
acceptit to get a HTTP client socket
- add the HTTP client socket to the collection of sockets to watch
- if the socket is a HTTP client one:
readit to get its data
- convert the data into a HTTP Request
- forward the HTTP Request to the application to get a HTTP Response
- convert the HTTP Response to a string and
closethe HTTP client socket
- remove the HTTP client socket from the collection of sockets to watch
Let's change our HTTP server to use
http_server = new Socket(host, port, max_connections_to_queue) connections = new SocketCollection() connections.append(http_server) while true: connections_ready = poll(connections) for connection in connections_ready: if http_server == connection: http_connection = http_server.accept() connections.append(http_connection) else: data = connection.read() request = http_request_parse(data) response = application(request) connection.write((string) response) connection.close() connections.remove(connection)
Now we can see the advantage of polling: while waiting for data to be ready on client sockets, we can now accept more connections on the server socket.
Before we continue, let's refactor our code a bit to abstract away the polling logic:
class EventLoop: function append(connection, callback): key = (int) connection self._connections[key] = connection self._callbacks[key] = callback function remove(connection): key = (int) connection self._connections.pop(key) self._callbacks.pop(key) function run(): while true: connections_ready = poll(self._connections) for connection in connections_ready: key = (int) connection self._callbacks[key](connection, self)
We've named the class
EventLoop: every time something happens (an Event) in
the Loop, we call the appropriate callback. Here's our HTTP server with the
function handle_http_request(http_connection, event_loop): data = http_connection.read() request = http_request_parse(data) response = application(request) http_connection.write((string) response) http_connection.close() event_loop.remove(http_connection) function handle_http_connection(http_server, event_loop): http_connection = http_server.accept() event_loop.append(http_connection, handle_http_request) http_server = new Socket(host, port, max_connections_to_queue) event_loop = new EventLoop() event_loop.append(http_server, handle_http_connection) event_loop.run()
In the previous implementation, we couldn't make a distinction between client sockets,
with this refactoring we can split our application even more by waiting for
write to be ready (usually
poll is able to make a distinction between sockets
ready to be read and sockets ready to be written).
If we don't have any connections, our server will spend most of its time waiting. If only we could do something else while waiting...
Wait a second!
Polling system calls usually take a
timeout argument: if nothing happens for
the given time it's going to return an empty collection.
By combining it with a
OneOffScheduler, we can achieve interesting things.
Here's an implementation:
class OneOffScheduler: function append(interval, callback, arguments): self._callbacks[interval] = callback self._arguments[interval] = arguments function lowest_interval(): return self._callbacks.keys().min() function tick(): for interval, callbacks in self._callbacks: if time.now() % interval != 0: continue for id, callback in callbacks: arguments = self._arguments[interval][id] callback(arguments) self._callbacks[interval].pop(id) self._arguments[interval].pop(id)
By "ticking" the clock we check if any registered callback is due.
lowest_interval method will allow us to set a smart timeout for
(e.g. no callback will mean no timeout, a callback with 5s interval will mean 5s timeout, etc).
EventLoop improved with the
class EventLoop: function constructor(): self.one_off_scheduler = new OneOffScheduler() function append(connection, callback): key = (int) connection self._connections[key] = connection self._callbacks[key] = callback function remove(connection): key = (int) connection self._connections.pop(key) self._callbacks.pop(key) function run(): while true: timeout = self.one_off_scheduler.lowest_interval() connections_ready = poll(self._connections, timeout) for connection in connections_ready: key = (int) connection self._callbacks[key](connection, self) self.one_off_scheduler.tick()
There are many
Scheduler variants possible:
- periodic ones: instead of removing the callback once it's called, we could keep it for the next interval
- idle ones: instead of calling callback every time, we could only call them if nothing is ready
As goes the saying: "Scheduler. If it's not solving all your problems, you simply aren't using enough of it".
We're now able to execute actions even if no actual events happened. All we need
is to register in our
EventLoop a callback. And in this callback we can also
register a new callback for our
EventLoop. And in this callback...
Async what you did here...
That's a lot of nested callbacks! It might become hard to understand the "flow of execution" of our application: we're used to read "synchronous" code, not "asynchronous" code.
What if I told you there's a way to make "asynchronous" code look like "synchronous" code? One of the way to do this is to implement promise:
- Create a
- ask politely the
Deferrerto create a
- when creating the
Deferrerinjects into it a
- when creating the
- register a
on_fulfilledcallback in the
Promisecalls the injected
resolvercallback with the given
on_fulfilledcallback as argument
- this sets
on_fulfilledcallback as an attribute in
- tell the
Deferrerthat we finally got a
on_fulfilledcallback with the
As goes the saying: "Callback. If it's not solving all your problems, you simply aren't using enough of it".
Here's an implementation for
class Deferrer: function promise(): return new Promise(self.resolver) function resolve(value): for on_fulfill in self._on_fulfilled: on_fulfill(value) function resolver(on_fulfilled): self._on_fulfilled.append(on_fulfilled)
class Promise: function constructor(resolver): self._resolver = resolver function then(on_fulfilled): self._resolver(on_fulfilled) return new Promise(resolver)
And finally here's a basic usage example:
function hello_world(name): print 'Hello ' + name + '!' function welcome_world(name): print 'Welcome ' + name + '!' deferrer = new Deferrer() promise = new deferrer.promise() promise.then(hello_world).then(welcome_world) deferrer.resolve('Igor') # prints `Hello Igor!` and `Welcome Igor!`
With this, we contain the complexity to two classes, the rest of the application becomes easier to read: instead of nesting callbacks we can chain them.
Deferrer both look neat. But what's the link with our scheduled
EventLoop? Turns out the link is Filesystem.
Thankfully we've got a solution for that: wrapping "blocking" filesystem operations in a class that will simulate a "non-blocking" behavior:
- ask for regular arguments (e.g.
filename), and an additional
- execute the "blocking" operation in a thread pool, which acts as a
Deferrerand returns a
- set the
Promisecallback to add
EventLoop, scheduled immediately
Here's an implementation example of such a wrapper:
class NonBlockingFilesystem: function constructor(event_loop, filesystem, thread_pool): self._event_loop = event_loop self._filesystem = filesystem self._thread_pool = thread_pool function open(file, on_opened): promise = self._thread_pool.map(self._filesystem.open, file) promise.then(lambda file_descriptor: self._on_file_opened(file_descriptor, on_opened)) function _on_file_opened(file_descriptor, on_opened): self._event_loop.scheduler.append(1, on_opened, file_descriptor) function read(file_descriptor, on_read): promise = self._thread_pool.map(self._filesystem.read, file_descriptor) promise.then(lambda content: self._on_file_read(content, on_read)) function _on_file_read(content, on_read): self._event_loop.scheduler.append(1, on_read, content)
By deferring actual filesystem operations to threads, our HTTP server can accept more connections and handle more clients until the call is ready.The thread pool is usually set up with 4 threads.
As goes the saying: "Threading. If it's not solving all your problems, you simply aren't using enough of it... NOT!".
For once, limits are good. If we put too many threads in the pool, we’ll soon reach another limit: the number of filesystem operations allowed by the kernel. If we increase this limit, we’ll soon reach another limit: the number of filesystem operations physically allowed by the hardware (some people tried it, they ended up with burned disks).
Note: Our server and application are still single-threaded. The use of a
ThreadPoolis done in a decoupled way, isolating us from multi-threaded issues.
By "non-blocking I/O", Node.js means that it's using an Event Loop to make use of the network latency to handle multiple clients in parallel.
It's been built with libuv, a low-level C library which embeds in its Event Loop many types of Schedulers and a Thread Pool: it allows it to simulate "non-blocking" behavior by wrapping "blocking" calls (e.g. Filesystem).
Instead of implementing our server in a "sequential" way, like Apache2 does, we can instead implement it with "polling events" in mind: nginx is using this "Event-Driven Architecture" and it allows it to outperform Apache.
Systems built in this way often use Promises, as they help us perceive our "asynchronous" code as "synchronous".
If you're interested to read more on the topic, here are some links:
- PHP - build your own server
- ReactPHP - components
- ReactPHP - reactive PHP events
- PHP - co-operative PHP multitasking
- Node.js - When is the Thread Pool used?
- Node.js - Thread Pool usage
- C - the C10K problem
- C - Handle multiple socket connections with fd_set and select on Linux
- C - Introduction to non-blocking I/O
- C - libev documentation
- C - libuv documentation
- C - libuv online book
Note: In the PHP landscape, there are many libraries that allow us to build Event Driven applications:
There's even a PHP Async Interop Group that started researching, to create PHP Standard Recommandation (PSR) for Event Loops, Promise, etc.