I/O Multiplexing: select/poll/epoll#
We can see from the TCP Socket programming model that the TCP Socket calling mechanism is a simple one-to-one calling mechanism, using a synchronous blocking method. So how can we improve the IO model to serve more users with the same resources?
Multi-process Model#
A more traditional approach is to use a multi-process model, which allocates a process to handle requests for each client. The main process of the server is responsible for listening to client connections. Once the connection with the client is established, the accept() function will return a "connected Socket." At this point, a child process is created using the fork() function, effectively copying all relevant elements from the parent process, including file descriptors, memory address space, program counter, executing code, etc. When these two processes are just copied, they are almost identical. However, they can be distinguished by the return value: if the return value is 0, it is the child process; if the return value is another integer, it is the parent process. Because the child process copies the file descriptors of the parent process, it can directly use the "connected Socket" to communicate with the client. It can be observed that the child process does not need to care about the "listening Socket," only the "connected Socket"; conversely, the parent process delegates client services to the child process, so it does not need to care about the "connected Socket," only the "listening Socket." The following diagram describes the process from connection request to connection establishment, where the parent process creates a child process for client service.
Additionally, when the "child process" exits, the kernel retains some information about that process, which also occupies memory. If proper "recycling" is not done, it will become a zombie process. As the number of zombie processes increases, it will gradually deplete our system resources. Therefore, the parent process must "clean up" after its children. How to do this? There are two ways to recycle resources after a child process exits: by calling the wait() and waitpid() functions. This method of using multiple processes to handle multiple clients is feasible for 100 clients, but when the number of clients reaches ten thousand, it will definitely not hold up, because each process consumes a certain amount of system resources, and the "burden" of context switching between processes is very heavy, significantly impacting performance. The context switching of processes includes not only user space resources such as virtual memory, stack, and global variables but also kernel space resources such as kernel stack and registers.
Multi-thread Model#
Since the "burden" of context switching between processes is heavy, we can use a lighter model to handle multiple user requests — the multi-thread model. A thread is a "logical flow" running within a process, and multiple threads can run within a single process. Threads within the same process can share some resources of the process, such as the file descriptor list, process space, code, global data, heap, shared libraries, etc. These shared resources do not need to be switched during context switching; only the private data and registers of the thread need to be switched. Therefore, the overhead of context switching between threads in the same process is much smaller than that of processes. After the server completes the TCP connection with the client, it creates a thread using the pthread_create() function, then passes the file descriptor of the "connected Socket" to the thread function, and then communicates with the client in the thread, achieving concurrent processing. If a thread is created for each incoming connection, after the thread finishes running, the operating system must destroy the thread. Although the overhead of thread switching is not large, if threads are frequently created and destroyed, the system overhead is also considerable. Therefore, we can use a thread pool to avoid the frequent creation and destruction of threads. A thread pool is a mechanism where several threads are created in advance. When a new connection is established, the connected Socket is placed in a queue, and the threads in the thread pool are responsible for taking the "connected Socket" from the queue for processing.
It is important to note that this queue is global and will be accessed by each thread. To avoid multi-thread competition, threads must lock before operating on this queue. The above models based on processes or threads still have issues. When a new TCP connection arrives, a process or thread needs to be allocated. To achieve C10K, it means a machine must maintain 10,000 connections, which is equivalent to maintaining 10,000 processes/threads. The operating system cannot handle this even if it tries to endure.
I/O Multiplexing#
Since allocating a process/thread for each request is not suitable, is it possible to maintain multiple Sockets using just one process? The answer is yes, which is the I/O multiplexing technology. A process can only handle one request at any given time, but if the time spent on processing each request is controlled to within 1 millisecond, then thousands of requests can be handled in one second. Looking at it over a longer period, multiple requests reuse a single process, which is multiplexing. This idea is very similar to a CPU concurrently processing multiple processes, hence it is also called time-division multiplexing. The familiar select/poll/epoll kernel provides multiplexing system calls to user space, allowing processes to obtain multiple events from the kernel through a system call function. How do select/poll/epoll obtain network events? When obtaining events, all connections (file descriptors) are first passed to the kernel, which then returns the connections that have generated events, and then the corresponding requests for these connections are processed in user space. Select/poll/epoll are three multiplexing interfaces; can they all achieve C10K? Next, we will discuss them one by one.
select/poll#
The way select implements multiplexing is by placing all connected Sockets into a file descriptor set and then calling the select function to copy the file descriptor set to the kernel, allowing the kernel to check for any network events. The checking method is quite brute-force, simply by traversing the file descriptor set. When an event is detected, the corresponding Socket is marked as readable or writable, and then the entire file descriptor set is copied back to user space, where user space must again use traversal to find the readable or writable Sockets for processing. Therefore, for select, it requires 2 traversals of the file descriptor set, once in kernel space and once in user space, and there will also be 2 copies of the file descriptor set, first from user space to kernel space, modified by the kernel, and then back to user space. Select uses a fixed-length BitsMap to represent the file descriptor set, and the number of file descriptors it supports is limited. In Linux systems, this is restricted by FD_SETSIZE in the kernel, with a default maximum value of 1024, allowing it to only listen to file descriptors from 0 to 1023. Poll no longer uses BitsMap to store the file descriptors of interest; instead, it uses a dynamic array organized in a linked list form, breaking the limitation on the number of file descriptors in select, although it is still subject to system file descriptor limits. However, poll and select do not have significant essential differences; both use a "linear structure" to store the Socket set of interest to the process, so both require traversing the file descriptor set to find readable or writable Sockets, with a time complexity of O(n), and also require copying the file descriptor set between user space and kernel space. As the concurrency increases, the performance loss will grow exponentially.
epoll#
Epoll effectively solves the problems of select/poll in two ways. First, epoll uses a red-black tree in the kernel to track all file descriptors that need to be monitored. The Sockets that need to be monitored are added to the kernel's red-black tree via the epoll_ctl() function. The red-black tree is an efficient data structure, with a general time complexity of O(logn) for additions, deletions, and modifications. In contrast, select/poll do not have a similar data structure in the kernel to store all file descriptors that need to be monitored, so select/poll must pass the entire Socket set to the kernel each time. Because epoll maintains a red-black tree in the kernel, which can store all file descriptors that need to be monitored, it only needs to pass one file descriptor to the kernel, significantly reducing the amount of data copying and memory allocation between the kernel and user space. Second, epoll uses an event-driven mechanism, where the kernel maintains a linked list to record ready events. When an event occurs on a Socket, the kernel adds it to this ready event list via a callback function. When the user calls the epoll_wait() function, it only returns the number of file descriptors with events, eliminating the need to poll the entire Socket set as in select/poll, greatly improving detection efficiency. From the diagram below, you can see the functions of the epoll-related interfaces:
The efficiency of epoll does not significantly decrease even as the number of monitored Sockets increases, and it can monitor a very large number of Sockets, with the upper limit being the maximum number of file descriptors that the system defines for the process. Therefore, epoll is known as a powerful tool for solving the C10K problem.
Edge Triggering and Level Triggering#
Epoll supports two event triggering modes: edge-triggered (ET) and level-triggered (LT).
- In edge-triggered mode, when a monitored Socket descriptor has readable events, the server will only wake up once from epoll_wait, even if the process does not call the read function to read data from the kernel. Therefore, our program must ensure that it reads all data from the kernel buffer at once.
- In level-triggered mode, when a monitored Socket has readable events, the server will continuously wake up from epoll_wait until the data in the kernel buffer is completely read by the read function, indicating that there is data to be read.
Level triggering means that as long as the event condition is met, such as there is data to read in the kernel, the event will continue to be passed to the user; edge triggering means that the event is only triggered the first time the condition is met, and will not pass the same event again afterward. If using level-triggered mode, when the kernel notifies that a file descriptor is readable or writable, it can continue to check its status to see if it is still readable or writable. Therefore, after receiving the notification, there is no need to perform as many read/write operations as possible at once. If using edge-triggered mode, when I/O events occur, it will only notify once, and we do not know how much data can be read or written. Therefore, after receiving the notification, we should read and write data as much as possible to avoid missing the opportunity. Thus, we will loop to read and write data from the file descriptor. If the file descriptor is blocking and there is no data to read or write, the process will block at the read/write function, and the program will not be able to continue executing. Therefore, edge-triggered mode is generally used in conjunction with non-blocking I/O, where the program will continue to perform I/O operations until the system call (such as read and write) returns an error, with the error type being EAGAIN or EWOULDBLOCK. Generally speaking, edge-triggered mode is more efficient than level-triggered mode because it can reduce the number of system calls to epoll_wait, which also has some overhead due to context switching. Select/poll only have level-triggered mode, while epoll's default triggering mode is level-triggered but can be set to edge-triggered mode based on application scenarios. Additionally, when using I/O multiplexing, it is best to use it in conjunction with non-blocking I/O, as the events returned by the multiplexing API may not be readable or writable. If blocking I/O is used, calling read/write may cause the program to block, so it is best to pair it with non-blocking I/O to handle rare special cases.
High-Performance Network Model: Reactor and Proactor#
Reactor#
We just mentioned that high performance in networking can be achieved through I/O multiplexing, but writing network programs using I/O multiplexing interfaces is a procedural way of coding, which is not very efficient for development. Therefore, experts have encapsulated I/O multiplexing based on object-oriented ideas, allowing users to focus on writing application code without worrying about the details of the underlying network API. This underlying model is called the Reactor model. The Reactor model, also known as the Dispatcher model, listens for events through I/O multiplexing and, upon receiving an event, dispatches it to a specific process/thread based on the event type. The Reactor model consists of two core components: the Reactor and the processing resource pool, responsible for the following tasks:
- The Reactor is responsible for listening to and dispatching events, which include connection events and read/write events.
- The processing resource pool is responsible for handling events, such as read -> business logic -> send.
The Reactor model is flexible and can adapt to different business scenarios. Its flexibility lies in:
- The number of Reactors can be one or multiple.
- The processing resource pool can consist of a single process/thread or multiple processes/threads.
Next, we will introduce three classic Reactor solutions.
Single Reactor Single Process/Thread#
In the process, there are three objects: Reactor, Acceptor, and Handler:
- The role of the Reactor object is to listen to and dispatch events.
- The role of the Acceptor object is to obtain connections.
- The role of the Handler object is to handle business logic.
The select, accept, read, and send in the objects are system call functions, while dispatch and "business processing" are operations that need to be completed, with dispatch being the event dispatch operation. Now, let’s introduce the "Single Reactor Single Process" solution:
- The Reactor object listens for events through select (I/O multiplexing interface). Upon receiving an event, it dispatches it, determining whether to dispatch it to the Acceptor object or the Handler object based on the event type.
- If it is a connection establishment event, it is handled by the Acceptor object, which will use the accept method to obtain the connection and create a Handler object to handle subsequent response events.
- If it is not a connection establishment event, it is handled by the current connection's corresponding Handler object.
The Handler object completes the entire business process through the read -> business processing -> send flow.
The Single Reactor Single Process solution is relatively simple to implement because all work is done within the same process, eliminating the need to consider inter-process communication and concerns about multi-process competition. However, this solution has two drawbacks:
- The first drawback is that with only one process, the performance of multi-core CPUs cannot be fully utilized.
- The second drawback is that while the Handler object is processing business logic, the entire process cannot handle events from other connections. If business processing takes a long time, it will cause response delays.
Therefore, the Single Reactor Single Process solution is not suitable for CPU-intensive scenarios and is only applicable in scenarios where business processing is very fast. Redis, implemented in C, adopted the "Single Reactor Single Process" solution before version 6.0, as Redis's business processing mainly occurs in memory, making operations very fast, and the performance bottleneck is not in the CPU. Thus, Redis processes commands using a single-process solution.
Single Reactor Multi-threaded/Multi-process#
To overcome the drawbacks of the "Single Reactor Single Thread/Process" solution, we need to introduce multi-threading/multi-processing, leading to the Single Reactor Multi-threaded/Multi-process solution. As the name suggests, let’s look at the diagram of the "Single Reactor Multi-thread" solution:
Let’s detail this solution:
- The Reactor object listens for events through select (I/O multiplexing interface). Upon receiving an event, it dispatches it, determining whether to dispatch it to the Acceptor object or the Handler object based on the event type.
- If it is a connection establishment event, it is handled by the Acceptor object, which will use the accept method to obtain the connection and create a Handler object to handle subsequent response events.
- If it is not a connection establishment event, it is handled by the current connection's corresponding Handler object.
The above three steps are the same as in the Single Reactor Single Thread solution, but the next steps begin to differ:
- The Handler object is no longer responsible for business processing; it only handles data reception and sending. After reading data through read, the Handler object will pass the data to the Processor object in the child thread for business processing.
- The Processor object in the child thread will perform business processing and, after completion, send the result back to the Handler object in the main thread, which will then send the response result to the client using the send method.
- The advantage of the Single Reactor Multi-threaded solution is that it can fully utilize multi-core CPU performance. However, introducing multi-threading also brings about resource competition issues.
For example, after the child thread completes business processing, it needs to pass the result back to the main thread's Handler for sending, which involves competition for shared data. To avoid data inconsistency due to multi-thread competition for shared resources, mutex locks must be used before operating on shared resources to ensure that only one thread operates on shared resources at any given time. Once that thread releases the mutex lock, other threads can then operate on the shared data. After discussing the Single Reactor Multi-threaded solution, let’s look at the Single Reactor Multi-process solution. In fact, the Single Reactor Multi-process solution is more complicated to implement than the Single Reactor Multi-threaded solution, mainly because it must consider bidirectional communication between child and parent processes, and the parent process must know which client the child process will send data to. In contrast, multi-threading can share data, and while concurrency issues must be considered, this is much less complex than inter-process communication. Therefore, the Single Reactor Multi-process model is rarely seen in practical applications. Additionally, the "Single Reactor" model has another issue: since a single Reactor object handles all event listening and responses and runs only in the main thread, it can easily become a performance bottleneck when faced with sudden high concurrency.
Multi Reactor Multi-process/Thread#
To solve the "Single Reactor" problem, we can implement "Single Reactor" as "Multi Reactor," leading to the Multi Reactor Multi-process/Thread solution. As usual, let’s look at the diagram of the Multi Reactor Multi-process/Thread solution (using threads as an example):
The detailed explanation of the solution is as follows:
- The MainReactor object in the main thread monitors connection establishment events through select. Upon receiving an event, it uses the accept method in the Acceptor object to obtain the connection and assigns the new connection to a child thread.
- The SubReactor object in the child thread adds the connection assigned by the MainReactor object to select for continued monitoring and creates a Handler to handle the response events of the connection.
- If new events occur, the SubReactor object will call the corresponding Handler object for the current connection to respond.
- The Handler object completes the entire business process through the read -> business processing -> send flow.
Although the Multi Reactor Multi-threaded solution may seem complex, it is actually much simpler to implement than the Single Reactor Multi-threaded solution for the following reasons:
- The main thread and child threads have clear divisions of labor; the main thread is only responsible for receiving new connections, while child threads handle subsequent business processing.
- The interaction between the main thread and child threads is straightforward; the main thread only needs to pass new connections to child threads, which do not need to return data but can directly send the processing results to the client.
The well-known open-source software Netty and Memcache both adopt the "Multi Reactor Multi-threaded" solution. The open-source software Nginx adopts the "Multi Reactor Multi-process" solution, but the approach differs somewhat from the standard Multi Reactor Multi-process model. The specific difference lies in that the main process is only used to initialize the socket and does not create a mainReactor to accept connections. Instead, the child process's Reactor accepts connections, controlled by a lock to ensure that only one child process accepts at a time (to prevent the thundering herd problem). After the child process accepts a new connection, it processes it in its own Reactor without redistributing it to other child processes.
Proactor#
Reactor is a non-blocking synchronous network model, while Proactor is an asynchronous network model. Reactor can be understood as "when an event arrives, the operating system notifies the application process to handle it," while Proactor can be understood as "when an event arrives, the operating system handles it and then notifies the application process." Here, "events" refer to new connections, data to read, and data to write, while "handling" includes reading from the driver to the kernel and from the kernel to user space. The difference lies in that the Reactor model is based on "pending" I/O events, while the Proactor model is based on "completed" I/O events.
Let’s introduce the workflow of the Proactor model:
- The Proactor Initiator is responsible for creating Proactor and Handler objects and registering both with the kernel through the Asynchronous Operation Processor.
- The Asynchronous Operation Processor is responsible for handling registration requests and performing I/O operations.
- After the Asynchronous Operation Processor completes the I/O operation, it notifies the Proactor.
- The Proactor calls different Handlers for business processing based on different event types.
- The Handler completes the business processing.
Unfortunately, asynchronous I/O in Linux is not well-developed. The aio series of functions are defined by POSIX as asynchronous operation interfaces, but they are not true operating system-level supported asynchronous operations; they are simulated in user space and only support aio asynchronous operations based on local files. Sockets in network programming are not supported, which means that high-performance network programs based on Linux primarily use the Reactor model. In contrast, Windows has implemented a complete asynchronous programming interface that supports sockets, known as IOCP, which is an operating system-level implementation of asynchronous I/O. Therefore, high-performance network programs can use the more efficient Proactor model in Windows.
Consistent Hashing#
How to Properly Distribute Requests in Distributed Systems?#
When we want to increase the capacity of a system, we will horizontally partition the data to store it across different nodes, meaning that the data is distributed across different nodes. For example, in a distributed KV (key-value) caching system, it should be clear which node or nodes a certain key should be accessed from; it should not be the case that any node can be accessed to get the cached result. Therefore, we need to come up with a load balancing algorithm that can handle distributed systems.
Hash Algorithm#
The simplest approach of the hash algorithm is to perform a modulo operation. For example, in a distributed system with 3 nodes, data is mapped based on the formula hash(key) % 3. However, there is a critical issue: if the number of nodes changes, that is, when scaling the system up or down, it is necessary to migrate the data that has changed its mapping relationship. In the worst case, all data may need to be migrated, resulting in a data migration scale of O(M); otherwise, there will be issues with querying data.
Consistent Hashing Algorithm#
The consistent hashing algorithm also uses a modulo operation, but unlike the hash algorithm, which performs modulo on the number of nodes, the consistent hashing algorithm performs modulo on 2^32, which is a fixed value. We can visualize the results of the consistent hashing algorithm as a circular ring composed of 2^32 points, similar to a clock. This circular ring is called the hash ring, as shown in the diagram below:
Consistent hashing requires two steps of hashing:
- The first step: perform a hash calculation on the storage nodes, mapping the storage nodes, for example, based on their IP addresses.
- The second step: when storing or accessing data, perform a hash mapping on the data.
Thus, consistent hashing refers to mapping both "storage nodes" and "data" onto a circular hash ring that connects end to end. The question arises: how do we find the storage node for the data based on the hash mapping result? The answer is to find the first node in the clockwise direction from the mapped result, which is the node that stores the data.
If we want to remove node D, we only need to migrate D to B. Therefore, in the consistent hashing algorithm, if a node is added or removed, only the node's clockwise neighboring successor nodes are affected, and other data will not be impacted.
However, the consistent hashing algorithm does not guarantee that nodes will be evenly distributed on the hash ring, which can lead to a problem where a large number of requests concentrate on a single node.
How to Improve Balance with Virtual Nodes?#
To solve the problem of uneven distribution of nodes on the hash ring, we need to have a large number of nodes. The more nodes there are, the more evenly distributed the nodes will be on the hash ring. Therefore, we introduce virtual nodes, which means creating multiple replicas for a real node.
The specific approach is to map virtual nodes onto the hash ring instead of real nodes, and map the virtual nodes to actual nodes, resulting in a "two-layer" mapping relationship. Additionally, virtual nodes not only improve the balance of node distribution but also enhance the stability of the system. When nodes change, different nodes can share the load of the changes, leading to higher stability. For example, when a node is removed, all its corresponding virtual nodes will also be removed, and the next virtual node in the clockwise direction may correspond to different real nodes, meaning that these different real nodes share the pressure caused by the node change. Moreover, with virtual nodes, we can also increase the weight of better hardware-configured nodes by adding more virtual nodes for higher-weight nodes. Therefore, the consistent hashing method with virtual nodes is suitable for scenarios with nodes of different hardware configurations and for scenarios where the number of nodes may change.