In this article you will find a short overview of I/O completion ports (IOCP), as well as a simple C++ implementation of copying folder with files by using I/O requests. We hope that this guide will prove useful for anyone with basic C++ and Windows API experience and will help them learn the basics and certain specifics of WinAPI IOCP programming.

Written by:
Andrew Timoshenko,
Software Designer of Apriorit

 

Overview

I/O completion ports are a flexible way of processing multiple I/O requests by using a thread pool allocated earlier. Moreover, they allow to avoid performance loss due to switching contexts and because of too many or too few worker threads.

At the base of IOCP, the Queue kernel object lies, which is used for storing I/O completion packets. Although packets are placed into IOCP queue in the FIFO order, they can be taken out in another order.

Creating IOCP and associating it with the file handles is done via the CreateloCompletionPort API function:

HANDLE
WINAPI
CreateIoCompletionPort(
    _In_ HANDLE FileHandle,
    _In_opt_ HANDLE ExistingCompletionPort,
    _In_ ULONG_PTR CompletionKey,
    _In_ DWORD NumberOfConcurrentThreads
    );

Handle from the FileHandle parameter is associated with a new or existing completion port. Although the parameter is called FileHandle, it does not necessarily points to a file, you can give it a handle to any object that supports overlapped I/O, for example, pipe, socket, etc.

If the ExistingCompletionPort parameter equals NULL, a new completion port will be created. After creation, it will be associated with the creator process and can be collectively used by the threads of a single process, but not different processes.

Packets queuing

When the asynchronous I/O request for the IOCP-related file is completed, I/O manager creates an I/O completion packet and places it in the queue.

User can also create a packet and place it to the queue with the PostQuedCompletionStatus API function:

BOOL
WINAPI
PostQueuedCompletionStatus(
    _In_ HANDLE CompletionPort,
    _In_ DWORD dwNumberOfBytesTransferred,
    _In_ ULONG_PTR dwCompletionKey,
    _In_opt_ LPOVERLAPPED lpOverlapped
    ); 

This can be used to transfer user information to the worker threads. Usually, this is how the end of the task is communicated.

Packets dequeuing

Packets are extracted from the queue when a worker thread calls the GetQueuedCompletionStatus API function:

BOOL
WINAPI
GetQueuedCompletionStatus(
    _In_ HANDLE CompletionPort,
    _Out_ LPDWORD lpNumberOfBytesTransferred,
    _Out_ PULONG_PTR lpCompletionKey,
    _Out_ LPOVERLAPPED * lpOverlapped,
    _In_ DWORD dwMilliseconds
    );

After first calling this function, the calling thread is associated with IOCP. Each thread can be associated with a single IOCP only. It will be disassociated only if it transmits another handle with the CompletionPort parameter, completes its execution, or close the IOCP handle.

Worker thread usually calls GetQueuedCompletionStatus in order to get a I/O completion packet to process. If at this point queue holds completion packets and the number of active threads does not exceed the concurrency value, the thread will take the packet from the queue and become active. The number of active worker threads (i.e., simultaneously processing packets) is regulated by the concurrency value, which is the NumberOfConcurrentThreads parameter from the CreateIoCompletionPort function.

If the queue has no packets or the maximum number of active threads is reached, then the thread will be blocked. It is released when a new packet arrives (and if the concurrency value allows it) or when one of the active threads is blocked (for example, at the synchronization object, I/O operations, etc.), and then itself becomes active. As the previously active thread will continue packet processing when unblocked, the number of concurrent active threads can exceed the concurrency value for a short amount of time.

Threads, blocked in GetQueuedCompletionStatus, will be released in the LIFO order. This allows a single active thread to take and process packets from the queue, leaving other threads blocked, thus minimizing the costs of switching thread contexts.

The GetQueuedCompletionStatusEx function is also available, tracing its origin to Windows Vista. It allows to take several packets at a time from the queue, and also specify whether there will be ‘alertable wait’ when blocking a thread.

Implementation

Create I/O completion pool

Let’s start by creating I/O Completion Port and worker threads. After the new IOCP is created, the handle returned from the CreateIoCompletionPort function needs to be closed via regular CloseHandle. It is best to create more worker threads in the pool than there are processors in the system, because they can be blocked not only in the GetQueuedCompletionStatus function, but also while processing packets.

        // Create IOCP
        HANDLE iocp = ::CreateIoCompletionPort
            ( INVALID_HANDLE_VALUE // don't associate files now
            , nullptr // create new IOCP
            , 0 // completion key is ignored
            , concurrency);
	...
        // Create threads
        for (DWORD i = 0; i < concurrency * 2; ++i)
        {
            const uintptr_t thread = ::_beginthreadex(0, 0, &WorkThreadFunction, m_iocp.get(), 0, nullptr);
		...
        }

After getting a completion packet from the queue, the GetQueuedCompletionStatus function sets such out parameters for the worker thread:

  1. Number of bytes, transferred via the completed I/O operation
  2. Completion key used to associate handle with the IOCP
  3. Pointer to the OVERLAPPED structure used to start the operation.

Then it returns a BOOL-type value that will be non-zero if the completion packet corresponding to the successfully completed overlapped I/O operation was successfully extracted from the queue. Different combinations of returned zero value, values of out parameters, and Last Error allow to define unsuccessful operations, closing of the IOCP handle, etc. All of this is described in the documentation for this function in full detail.

    unsigned __stdcall WorkThreadFunction(void* param)
    {
	...
            const HANDLE iocp = param;
            for (;;)
            {
                // get completion
                LPOVERLAPPED ovl = nullptr;
                DWORD transferred = 0;
                ULONG_PTR completionKey = 0;
                const BOOL result = ::GetQueuedCompletionStatus(iocp, &transferred, &completionKey, &ovl, INFINITE);
		...
            } // loop
    }

In order to get information about the pool client we need to use the OVERLAPPEDPLUS structure, that the client itself fills out before initiating the overlapped operation:

    struct OVERLAPPEDPLUS
    {
        OVERLAPPED m_ovl;
        IExecutor* m_poolClient;
	...
    };

Here m_ovl is the standard OVERLAPPED structure that is used by the system with asynchronous I/O, m_poolClient points on the operation handler object:

unsigned __stdcall WorkThreadFunction(void* param)
    {
		...
                io::OVERLAPPEDPLUS* ovlPlus = CONTAINING_RECORD(ovl, io::OVERLAPPEDPLUS, m_ovl);
		...
                ovlPlus->m_poolClient->OnOperationCanceled(ovl);
		...
                ovlPlus->m_poolClient->OnOperationCompleted(ovl, transferred);		
...
    }

In order to stop execution of worker threads, let’s place a special completion packet into the IOCP queue. It should have the completion key equal to ThreadExitKey:

        // send exit packets to the worker threads
        OVERLAPPEDPLUS destroyOvl;
        for (auto thread : m_threads)
            ::PostQueuedCompletionStatus(m_iocp.get(), 0, ThreadExitKey, &destroyOvl.m_ovl);
...
unsigned __stdcall WorkThreadFunction(void* param)
{
		...
if (completionKey == ThreadExitKey)
return 0;
...

Create or open files to work with

Files are opened or created with the regular CreateFile call. FILE_FLAG_OVERLAPPED should be used to inform the system that the file will be used for asynchronous I/O:

        HANDLE file = ::CreateFileW(fileName, access, shareMode, nullptr,
            disposition, FILE_FLAG_OVERLAPPED, nullptr);

Associate files with the IOCP

File handles are associated with the IOCP using the same CreateIoCompletionPort function. Associating handles with IOCP increments its link counter. It will be released only after its handle, as well as all other associated handles are closed. Here we can also use the CompletionKey parameter, which, just as IOCP, is associated with the file and will be returned from the GetQueuedCompletionStatus function. Completion key can be used to transfer handle-specific information into the worker thread when file operation has been completed. After association with IOCP, file handles cannot be used with the ReadFileEx and WriteFileEx functions.

        HANDLE existingPort = ::CreateIoCompletionPort
            ( file
            , m_iocp.get()
            , 0 // don't use files' completion keys
            , 0); // concurrency is ignored with existing ports

After this when all overlapped operations for this file are completed, I/O completion packets will be created and placed into the IOCP queue. The only exception are overlapped operations that end immediately with an error. Beside this, we can disable placing completion packets in the queue for operations that are immediately successfully completed by setting the FILE_SKIP_COMPLETION_PORT_ON_SUCCESS flag for the handle by means of the SetFileCompletionNotificationModes API function.

Overlapped file I/O

Let’s add our own functional object m_comleteHandler, responsible for processing the completion of a particular read or write operation, into the OVERLAPPEDPLUS structure defined by the pool:

    typedef std::function<void(DWORD transferred)> CompleteHandler_t;
	...
    struct OvlFile::FileIoOverlapped
    {
        OVERLAPPEDPLUS m_ovlPlus;
        CompleteHandler_t m_comleteHandler;
    };

Before starting an overlapped operation, we should prepare the OVERLAPPED structure for the system and fill the m_poolClient field of the OVERLAPPEDPLUS structure, as it will be used by the pool worker threads after the operation is completed. We fill out the Offset and OffsetHigh fields of the OVERLAPPED structure with the offset value from the beginning of the file, that will be used to read or write data. The rest of the fields can be initialized with zeroes. For simplicity, we will allocate a new FileIoOverlapped structure in the heap for each operation:

    std::unique_ptr<OvlFile::FileIoOverlapped>
        OvlFile::PrepareOverlapped(uint64_t fileOffset, CompleteHandler_t handler)
    {
        std::unique_ptr<FileIoOverlapped> fileOverlapped =
            std::make_unique<FileIoOverlapped>();
        fileOverlapped->m_ovlPlus.m_ovl.Offset = static_cast<DWORD>(fileOffset);
        fileOverlapped->m_ovlPlus.m_ovl.OffsetHigh = static_cast<DWORD>(fileOffset >> 32);
        fileOverlapped->m_ovlPlus.m_poolClient = this;
        fileOverlapped->m_comleteHandler = handler;
        return fileOverlapped;
    }

If the file has been opened with the FILE_FLAG_OVERLAPPED flag, the lpOverlapped parameter of the ReadFile and WriteFile functions becomes mandatory. We will use a pointer to the OVERLAPPED system structure inside our own one for this parameter:

        const BOOL result = ::WriteFile
            ( m_file.get()
            , data
            , size
            , nullptr
            , &fileOverlapped->m_ovlPlus.m_ovl);

Now, the operation can be immediately completed with an error or successfully, or can be completed asynchronously. In case of asynchronous completion WriteFile will return FALSE and will set the Last Error value equal to ERROR_IO_PENDING. Here we check only the case of immediate exit with an error, because in all other cases completion packet will be placed to the IOCP queue and will be processed in the worker threads of the pool:

        const DWORD lastError = ::GetLastError();
        if (!result && lastError != ERROR_IO_PENDING)
            throw ex::WinException("OvlFile. Write file", lastError);

When operation runs, memory, used for the structure, should be left for the system:

        fileOverlapped.release(); // delete it when operation completes

In the handlers of asynchronous operations executed in the pool worker threads, we call a higher level handler and release the memory allocated for the FileIoOverlapped structure:

            io::OVERLAPPEDPLUS* ovlPlus = CONTAINING_RECORD(ovl, io::OVERLAPPEDPLUS, m_ovl);
            FileIoOverlapped* fileOvl = CONTAINING_RECORD(ovlPlus, FileIoOverlapped, m_ovlPlus);
            std::unique_ptr<FileIoOverlapped> guard(fileOvl);
            if (fileOvl->m_comleteHandler)
                fileOvl->m_comleteHandler(transferred);

File copying

When copying, we first read the data from the source file. Let’s create a functional object for handling read completion. We increase the link counter and place the pointer for ‘this’ in this object, it will be stored in the heap as a part of the OvlFile::FileIoOverlapped structure. After this, we start the asynchronous read operation:

        void StartRead()
        {
            io::CompleteHandler_t completeHandler = std::bind
                ( std::mem_fn(&CopyFile::OnReadComplete)
                , shared_from_this()
                , std::placeholders::_1);
            m_srcFile.StartRead(m_copied, &m_buf[0],
                static_cast<DWORD>(m_buf.size()), completeHandler);
        }

When reading has been completed, we create the identical completion handler, this time for writing. Then, we start writing data to the target file:

        void OnReadComplete(DWORD transferred)
        {
			...
                io::CompleteHandler_t completeHandler = std::bind
                    ( std::mem_fn(&CopyFile::OnWriteComplete)
                    , shared_from_this()
                    , std::placeholders::_1);
                m_dstFile.StartWrite(m_copied, &m_buf[0], transferred, completeHandler);
			...
        }

After writing has been completed, if the total size of the written data is smaller than the source file, we should start reading again, this time using the next offset.

If we already have read and written all the necessary data, then our file is successfully copied and we should call the copying completion handler:

        void OnWriteComplete(DWORD transferred)
        {
			...
                m_copied += transferred;
                if (m_copied == m_srcSize)
                {
                    m_onComplete();
                    return;
                }
                StartRead();
			...
        }

Conclusion

In this tutorial, we took a look at asynchronous I/O in Windows, covering both general information and specific API functions of I/O completion ports. We also provided some Windows IO completion ports example, showing how to start asynchronous operations with files, and how to handle their completion.

Learn more about the Apriorit System Development services.

Get the sample project source code: ZIP, 12KB

Subscribe to updates