Cotamer I/O Programming Manual
Cotamer provides asynchronous I/O through reference-counted file descriptors, readiness events, and a coroutine-aware mutex. These components let you write non-blocking network code in a sequential style.
File descriptors
A cotamer::fd is a reference-counted wrapper around a Unix file descriptor.
Constructing a cotamer::fd transfers ownership over the corresponding file:
when the last copy of the cotamer::fd is destroyed, the underlying file
descriptor is automatically closed.
cot::fd rfd(fildes); // takes ownership of raw fd
cot::fd rfd_copy = rfd; // shares the same underlying fd
assert(rfd.fileno() == rfd_copy.fileno());
| Member | Description |
|---|---|
fd() |
construct an invalid fd |
fd(int fileno) |
take ownership of a file descriptor |
fileno() |
return the file descriptor number |
valid(), operator bool |
true if the fd is open |
close() |
close fd |
Closing an fd, whether by calling close(), by destroying the last copy, or
by cancelling the owning task, triggers all outstanding readiness events on
that fd. This guarantees that coroutines waiting on a closed fd will always
wake up.
Readiness events
Three free functions create events tied to file descriptor readiness:
| Function | Triggers when... |
|---|---|
cotamer::readable(f) |
::read(f.fileno()) wouldn’t block |
cotamer::writable(f) |
::write(f.fileno()) wouldn’t block |
cotamer::closed(f) |
the fd encounters an error or closes |
Like all Cotamer events, these are one-shot: once triggered, they stay
triggered. To wait for readability again, call readable(f) again.
cot::fd rfd(fildes);
cot::set_nonblocking(rfd);
co_await cot::readable(rfd); // suspend until data available or EOF
char buf[256];
ssize_t n = ::read(rfd.fileno(), buf, sizeof(buf));
File descriptors must be in non-blocking mode for readiness events to work
correctly. I/O helpers that return a cotamer::fd (accept, tcp_listen,
tcp_connect, tcp_accept) automatically put that fd in non-blocking mode;
for file descriptors obtained in other ways, call cotamer::set_nonblocking,
which is overloaded for both a raw int fd and a cotamer::fd.
When a file descriptor closes or experiences a serious error, Cotamer triggers
all three readiness events for that file descriptor. This means that most code
doesn’t need closed(). It can be useful nevertheless for coroutines that
passively track files without reading or writing them.
Byte transfer
The cotamer::read_once, cotamer::write_once, cotamer::read, and
cotamer::write coroutines are suspendable wrappers around ::read and
::write.
Transient errors (EINTR, EAGAIN, and EWOULDBLOCK) cause these functions to
retry. On serious errors (anything else), the functions exit early, returning
either a short count (if any data has been transferred) or an error indication
(if the error occurred before transfer). The once versions retry only while
blocked; the others loop until all bytes are transferred (or end-of-file or
error).
These coroutines return a
std::expected object,
which encapsulates either a real value or an error code. Specifically, they
return cotamer::ioresult, a synonym for std::expected<size_t, std::error_code>. On success, this value encapsulates the number of transferred
bytes; you can access that count with *result. On failure, it encapsulates the
errno number (in a
std::error_code object).
Some example uses:
cot::ioresult result = cot::read(fd, buf, n);
if (result) {
// transfer succeeded (or end-of-file)
size_t nbytes = *result; // fetch number of bytes
} else {
// serious error
int errc = result.error().value();
}
if (!result || *result == 0) {
// no bytes transferred
}
size_t nb = *result; // will throw exception if `result` is an error
Signatures:
| Function | Description |
|---|---|
task<ioresult> read_once(fd, void* buf, size_t count) |
Read until first success |
task<ioresult> read(fd, void* buf, size_t count) |
Read until completion |
task<ioresult> write_once(fd, const void* buf, size_t count) |
Write until first success |
task<ioresult> write(fd, const void* buf, size_t count) |
Write until completion |
task<ioresult> writev(fd, const iovec* iov, size_t iovcnt) |
Scatter-gather write until completion |
Here is a complete pipe example:
cot::task<> pipe_echo() {
int piperaw[2];
pipe(piperaw);
cot::fd rfd(piperaw[0]), wfd(piperaw[1]);
cot::set_nonblocking(rfd);
cot::set_nonblocking(wfd);
// writer
auto writer = [&]() -> cot::task<> {
co_await cot::write(wfd, "hello", 5);
};
writer().detach();
// reader
char buf[64] = {};
if (auto r = co_await cot::read_once(rfd, buf, sizeof(buf))) {
std::print("read {} bytes: {}\n", *r, std::string_view(buf, *r));
}
}
Sockets
cotamer::connect and cotamer::accept perform asynchronous versions of the
corresponding socket system calls. connect connects a client socket to a
remote address; the coroutine suspends until the connection attempt succeeds.
accept suspends until a new connection request arrives on lfd; when one
does, it is accepted, put into non-blocking mode, and returned as an fd
object. These functions throw std::system_error on serious error, rather
than returning an ioresult.
To read and write the resulting sockets, prefer cotamer::send and
cotamer::recv. Although write and read also work, the send and recv
versions supply arguments useful for typical socket use. For instance, if a
socket’s remote peer has shut down, write might kill the process via the
SIGPIPE signal, whereas send just returns an error.
cotamer::send and cotamer::recv behave like the send and recv system
calls: they retry only while blocked, rather than looping until all data is
transferred. Use send_all or set MSG_WAITALL in flags to loop.
Signatures:
| Function | Description |
|---|---|
task<ioresult> send(fd, const void* buf, size_t count) |
Send to first success |
task<ioresult> send_all(fd, const void* buf, size_t count) |
Send to completion |
task<ioresult> sendv_all(fd, const iovec* iov, size_t iovcnt) |
Scatter-gather send to completion |
task<ioresult> sendto(fd, const void* buf, size_t count, const sockaddr* addr, socklen_t) |
Send datagram to explicit destination |
task<ioresult> sendmsg(fd, const msghdr* msg, int flags) |
Send message |
task<ioresult> recv(fd, void* buf, size_t count) |
Receive to first success |
task<ioresult> recvfrom(fd, void* buf, size_t count, sockaddr* addr, socklen_t*) |
Receive datagram and capture sender address |
task<ioresult> recvmsg(fd, msghdr* msg, int flags) |
Receive message |
task<> connect(fd, const sockaddr*, socklen_t) |
Connect client socket to address |
task<fd> accept(fd) |
Accept new nonblocking socket from listening fd |
TCP
Higher-level coroutines handle TCP connection setup:
| Function | Description |
|---|---|
task<fd> tcp_listen(std::string address [, int backlog]) |
bind and listen; returns a listening fd |
task<fd> tcp_connect(std::string address) |
connect to a remote address using TCP; returns a connected fd |
task<fd> tcp_accept(fd listen_fd) |
accept a connection on a TCP listening socket |
Addresses are strings like "127.0.0.1:8080", "www.google.com:80", or
":8080". DNS resolution uses the standard getaddrinfo(3) function, and
happens on a background thread to avoid blocking the event loop. tcp_connect
and tcp_accept both set the TCP_NODELAY socket option to disable Nagle’s
algorithm
(why?). If DNS resolution
fails, the functions throw std::runtime_error with an error description; on
other failures, they throw std::system_error with errno as error code.
cot::task<> echo_server() {
auto lfd = co_await cot::tcp_listen("127.0.0.1:9000");
while (true) {
auto cfd = co_await cot::tcp_accept(lfd);
handle_connection(std::move(cfd)).detach();
}
}
cot::task<> handle_connection(cot::fd f) {
while (true) {
char buf[4096];
auto n = co_await cot::recv(f, buf, sizeof(buf));
if (!n || *n == 0) {
break;
}
co_await cot::send_all(f, buf, *n);
}
}
cot::task<> echo_client(std::string address, std::string message) {
auto f = co_await cot::tcp_connect(address);
co_await cot::send_all(f, message.data(), message.size());
char buf[4096];
auto n = co_await cot::recv(f, buf, sizeof(buf));
if (n) {
std::print("echo: {}\n", std::string_view(buf, *n));
}
}
The tcp_listen_all(address) function can return multiple fds, supporting
addresses that correspond to multiple address families. For instance,
localhost has an IPv4 and an IPv6 version. Use it like this:
cot::task<> one_echo_server(cot::fd lfd) {
while (true) {
auto cfd = co_await cot::tcp_accept(lfd);
handle_connection(std::move(cfd)).detach();
}
}
cot::task<> all_echo_servers() {
for (auto& lfd : co_await cot::tcp_listen_all("localhost:9000")) {
one_echo_server(std::move(lfd)).detach();
}
}
UDP
UDP is a datagram protocol: a socket returned by cot::udp_listen exchanges
datagrams with any peer. Each recvfrom returns exactly one whole datagram
and reports the sender’s address; sendto directs a reply back.
| Function | Description |
|---|---|
task<fd> udp_listen(std::string address) |
bind a UDP socket to address |
task<fd> udp_connect(std::string address) |
create a UDP socket connected to address; plain send/recv go to that peer |
cot::task<> udp_echo_server() {
auto sock = co_await cot::udp_listen("127.0.0.1:9000");
while (true) {
sockaddr_storage src{};
socklen_t srclen = sizeof(src);
char buf[65536];
auto n = co_await cot::recvfrom(sock, buf, sizeof(buf),
reinterpret_cast<sockaddr*>(&src), &srclen);
if (!n) {
break;
}
co_await cot::sendto(sock, buf, *n,
reinterpret_cast<sockaddr*>(&src), srclen);
}
}
cot::task<> udp_echo_client(std::string address, std::string message) {
auto sock = co_await cot::udp_connect(address);
co_await cot::send(sock, message.data(), message.size());
char buf[65536];
auto n = co_await cot::recv(sock, buf, sizeof(buf));
if (n) {
std::print("echo: {}\n", std::string_view(buf, *n));
}
}
As with TCP, a udp_listen_all(address) function can return multiple fds.
Mutual exclusion
A logical operation like “write a message to a file descriptor” might require
several system calls, with co_await writable(f) suspensions in between. If
several coroutines share that file descriptor, another coroutine’s writes can
interleave during those suspensions, corrupting both messages. This is a
mutual exclusion problem.
The cotamer::mutex synchronization object can be locked by one coroutine to
obtain exclusive access. Any other coroutine that attempts to lock the mutex
will suspend until the mutex is unlocked. Unlike std::mutex, which blocks the
calling thread, cotamer::mutex suspends the calling coroutine; other
coroutines on the same thread continue to run.
Mutex functions are:
| Function | Description |
|---|---|
cotamer::mutex m; |
construct mutex |
co_await m.lock() |
acquire exclusive access |
co_await m |
equivalent to co_await m.lock() |
m.try_lock() |
try to acquire exclusive access without suspending |
m.unlock() |
release exclusive access |
Cotamer mutexes are thread-safe, so multiple threads can call m.lock()
concurrently (only one thread will win). Lock attempts are serviced in FIFO
order: if coroutine A calls m.lock() before coroutine B does, then A will
always acquire access before B (unless A is destroyed before acquiring
access).
Since coroutines can be destroyed at any suspension point, explicitly calling
m.lock() and m.unlock() risks abandoning a mutex in the locked state.
Avoid this with cotamer::unique_lock, a guarded lock ownership class
mirroring std::unique_lock. Pass the unique_lock constructor the ownership
token returned by co_await m or co_await m.lock(). For instance:
struct lockable_fd {
cot::fd f;
cot::mutex mutex;
};
cot::task<> write_message(lockable_fd& lfd, std::string message) {
cot::unique_lock guard(co_await lfd.mutex);
// Critical section. `cot::send_all` may suspend internally on
// `writable(lfd.f)`; the guard keeps interleaved writes from other
// coroutines from corrupting our message across those suspensions.
// The guard automatically unlocks the mutex when the coroutine exits.
co_await cot::send_all(lfd.f, message.data(), message.size());
}
cotamer::unique_lock can also be explicitly locked or unlocked (with
co_await guard.lock() and guard.unlock()), and supports the standard
construction tags:
cot::unique_lock guard(m, std::defer_lock); // don’t lock yet
co_await guard.lock(); // lock explicitly later
cot::unique_lock guard(m, std::try_to_lock); // non-blocking attempt
if (guard.owns_lock()) {
// acquired; `if (guard)` would also work
}
co_await m.lock();
cot::unique_lock guard(m, std::adopt_lock); // take ownership of existing lock
unique_lock::lock() attempts are safe to cancel, so cot::first or
cot::race can add timeouts to them. For instance, here, if the timeout fires
first, the lock attempt is safely removed from the mutex’s queue. (Using
m.lock() in a race is inherently unsafe.)
cot::unique_lock guard(m, std::defer_lock);
co_await cot::first(guard.lock(), cot::after(1s));
if (guard.owns_lock()) {
// acquired within 1 second
} else {
// timed out — lock was not acquired
}
Shared access
Most coroutine code only needs exclusive access, but cotamer::mutex also
supports a shared lock mode modelling readers–writer locking. The shared lock
mode uses *_shared mutex functions and the cotamer::shared_lock guard.
| Function | Description |
|---|---|
co_await m.lock_shared() |
acquire shared access |
m.try_lock_shared() |
try to acquire shared access without suspending |
m.unlock_shared() |
release shared access |
Mixtures of shared and exclusive lock requests are serviced in FIFO order. If
mutex m is acquired in shared mode, but an exclusive lock attempt has already
been enqueued, then another coroutine that calls m.lock_shared() will suspend
until the exclusive lock attempt has been serviced. On the other hand, if no
exclusive attempts are waiting, then m.lock_shared() will proceed right away.
HTTP
Cotamer integrates llhttp to parse and
build HTTP/1.x messages on top of a connected cotamer::fd.
cotamer::http_message represents a request or response in memory;
cotamer::http_parser wraps an fd and turns bytes on the wire into
messages and back.
The central class is cotamer::http_parser, which can read and write HTTP
messages. This example uses cot::http_parser to return the contents of
/robots.txt on a given HTTP host:
#include "cotamer/http.hh"
cot::task<std::string> fetch_robots_txt(std::string host) {
auto fd = co_await cot::tcp_connect(std::format("{}:80", host));
cot::http_parser hp(std::move(fd), cot::http_parser::client, host);
cot::http_message req(HTTP_GET, "/robots.txt");
auto ticket = co_await hp.send_request(std::move(req));
co_return (co_await hp.receive(std::move(ticket))).body();
}
(The third constructor argument tells http_parser to add a Host: header to
each outgoing request automatically.)
This uses cot::http_parser in server mode to serve HTTP on a given connection:
cot::task<> http_connection(cot::fd cfd) {
cot::http_parser hp(std::move(cfd), cot::http_parser::server);
do {
auto req = co_await hp.receive();
if (!hp.ok()) {
break; // peer closed or parse error
}
cot::http_message res;
res.status_code(200)
.header("Content-Type", "text/plain")
.body(std::format("you asked for {}\n", req.url()));
co_await hp.send(std::move(res));
} while (hp.should_keep_alive());
}
http_message
HTTP requests and responses are represented by the http_message type.
http_message uses “fluent” accessors to set parameters:
cot::http_message res;
res.status_code(200)
.header("Content-Type", "text/plain")
.body("hello\n");
| Member | Accesses | Message type | Mode |
|---|---|---|---|
method(), method_name() |
request method | Request | Read/write |
status_code() |
response status code (e.g. 200, 404) | Response | Read/write |
url() |
request URL | Request | Read/write |
body() |
message body | Any | Read/write |
header(name, value) |
header | Any | Write |
header(name) |
header by name | Any | Read |
path() |
URL path part (no ?... or #...) |
Request | Read |
header_begin/end() |
iterate over headers; each iterator has .name() and .value() |
Any | Read |
search_param_begin/end() |
iterate over ?name=value parameters |
Request | Read |
search_param(name) |
search parameter by name | Request | Read |
If nlohmann::json is enabled at build time, http_message::body(const nlohmann::json&) is also available; it serializes the JSON, sets the body,
and adds a Content-Type: application/json header if none is already
present.
Pipelining
HTTP/1.1 supports request pipelining: a client can pass multiple requests over
the connection without waiting for any responses; the server then returns their
responses in order. Cotamer http_parser also supports request pipelining, but
some care is required to ensure requests and responses match. The
http_parser::send_request coroutine therefore returns a ticket, the caller’s
reserved position in line. If you aren’t using pipelining, you can ignore the
ticket. If you are using pipelining, pass the ticket to receive() via
hp.receive(std::move(ticket)) to claim the matching response in line.
For fuller examples covering routing, JSON request and response bodies, error
responses, and a pipelined client, see examples/jsond.cc and
examples/jsond-client.cc.