Cotamer I/O Programming Manual
Cotamer provides asynchronous I/O through reference-counted file descriptors, readiness events, and a coroutine-aware mutex. These components let you write non-blocking network code in a sequential style.
File descriptors
A cotamer::fd is a reference-counted wrapper around a Unix file descriptor.
Constructing a cotamer::fd transfers ownership over the corresponding file:
when the last copy of the cotamer::fd is destroyed, the underlying file
descriptor is automatically closed.
cot::fd rfd(fildes); // takes ownership of raw fd
cot::fd rfd_copy = rfd; // shares the same underlying fd
assert(rfd.fileno() == rfd_copy.fileno());
| Member | Description |
|---|---|
fd() |
construct an invalid fd |
fd(int fileno) |
take ownership of a file descriptor |
fileno() |
return the file descriptor number |
valid(), operator bool |
true if the fd is open |
close() |
close fd |
Closing an fd, whether by calling close(), by destroying the last copy, or
by cancelling the owning task, triggers all outstanding readiness events on
that fd. This guarantees that coroutines waiting on a closed fd will always
wake up.
Readiness events
Three free functions create events tied to file descriptor readiness:
| Function | Triggers when... |
|---|---|
cotamer::readable(f) |
::read(f.fileno()) wouldn’t block |
cotamer::writable(f) |
::write(f.fileno()) wouldn’t block |
cotamer::closed(f) |
the fd encounters an error or closes |
Like all Cotamer events, these are one-shot: once triggered, they stay
triggered. To wait for readability again, call readable(f) again.
cot::fd rfd(fildes);
cot::set_nonblocking(rfd);
co_await cot::readable(rfd); // suspend until data available or EOF
char buf[256];
ssize_t n = ::read(rfd.fileno(), buf, sizeof(buf));
File descriptors must be in non-blocking mode for readiness events to work
correctly. I/O helpers that return a cotamer::fd (accept, tcp_listen,
tcp_connect, tcp_accept) automatically put that fd in non-blocking mode;
for file descriptors obtained in other ways, call
cotamer::set_nonblocking(fileno) or cotamer::set_nonblocking(fd).
When a file descriptor closes or experiences a serious error, Cotamer triggers
all three readiness events for that file descriptor. This means that most code
doesn’t need closed(). It can be useful for code that passively tracks files
without reading or writing them.
Wrapper functions
The cotamer::read_once, cotamer::write_once, cotamer::read, and
cotamer::write coroutines are suspendable wrappers around ::read and
::write. The *once versions retry while blocked; the other versions retry
until all requested bytes are transferred (or end-of-file or error). These
functions return the number of bytes transferred (which might be zero at
end-of-file).
Transient errors (EINTR, EAGAIN, and EWOULDBLOCK) cause these functions
to retry. On serious errors (anything else), the functions exit early. If any
data had been transferred, they return a short count; but if the error
occurred before data transfer, they throw a std::system_error with the
errno number as its code(). Recover the errno number with catch (const std::system_error& ex) { ... ex.code() ... }.
Since the functions throw exceptions instead of returning -1 on error, their
return type is task<size_t>, not task<ssize_t>.
cotamer::connect and cotamer::accept perform asynchronous versions of the
corresponding socket system calls. connect connects a client socket to a
remote address; the coroutine suspends until the connection attempt succeeds.
accept suspends until a new connection request arrives on lfd; when one
does, it is accepted, put into non-blocking mode, and returned as an fd
object. Again, these functions throw std::system_error on serious error.
Signatures:
| Function | Description |
|---|---|
task<size_t> read_once(fd, void* buf, size_t count) |
Read until first success |
task<size_t> read(fd, void* buf, size_t count) |
Read until completion |
task<size_t> write_once(fd, const void* buf, size_t count) |
Write until first success |
task<size_t> write(fd, const void* buf, size_t count) |
Write until completion |
task<> connect(fd, const sockaddr*, socklen_t) |
Connect client socket to address |
task<fd> accept(fd) |
Accept new nonblocking socket from listening fd |
Here is a complete pipe example:
cot::task<> pipe_echo() {
int piperaw[2];
pipe(piperaw);
cot::fd rfd(piperaw[0]), wfd(piperaw[1]);
cot::set_nonblocking(rfd);
cot::set_nonblocking(wfd);
// writer
auto writer = [&]() -> cot::task<> {
co_await cot::write(wfd, "hello", 5);
};
writer().detach();
// reader
char buf[64] = {};
auto r = co_await cot::read_once(rfd, buf, sizeof(buf));
std::print("read {} bytes: {}\n", r, std::string_view(buf, r));
}
TCP
Higher-level coroutines handle TCP connection setup:
| Function | Description |
|---|---|
task<fd> tcp_listen(std::string address [, int backlog]) |
bind and listen; returns a listening fd |
task<fd> tcp_connect(std::string address) |
connect to a remote address using TCP; returns a connected fd |
task<fd> tcp_accept(fd listen_fd) |
accept a connection on a TCP listening socket |
Addresses are strings like "127.0.0.1:8080", "www.google.com:80", or
":8080". DNS resolution uses the standard getaddrinfo(3) function, and
happens on a background thread to avoid blocking the event loop. tcp_connect
and tcp_accept both set the TCP_NODELAY socket option to disable Nagle’s
algorithm
(why?). If DNS resolution
fails, the functions throw std::runtime_error with an error description; on
other failures, they throw std::system_error with errno as error code.
cot::task<> echo_server() {
auto lfd = co_await cot::tcp_listen("127.0.0.1:9000");
while (true) {
auto cfd = co_await cot::tcp_accept(lfd);
handle_connection(std::move(cfd)).detach();
}
}
cot::task<> handle_connection(cot::fd f) {
char buf[4096];
while (true) {
auto n = co_await cot::read_once(f, buf, sizeof(buf));
if (n == 0) {
break;
}
co_await cot::write(f, buf, n);
}
}
Mutual exclusion
When multiple coroutines share a file descriptor, a logical operation like
“write this entire message” might require several system calls, with co_await writable(f) suspensions in between. Without protection, another coroutine
could interleave its own writes during those suspensions, corrupting both
messages. A cotamer::mutex solves this: a coroutine can hold a mutex for the
duration of its logical operation, ensuring that each multi-call sequence
completes atomically with respect to other coroutines.
Unlike std::mutex, which blocks the calling thread, cotamer::mutex
suspends the calling coroutine. Other coroutines on the same thread
continue to run.
Mutex functions are:
| Function | Description |
|---|---|
cotamer::mutex m; |
construct mutex |
co_await m.lock() |
acquire exclusive access |
co_await m |
equivalent to co_await m.lock() |
m.try_lock() |
try to acquire exclusive access without suspending |
m.unlock() |
release exclusive access |
Cotamer mutexes are thread-safe, so multiple threads can call m.lock()
concurrently (only one thread will win). Lock attempts are serviced in FIFO
order: if coroutine A calls m.lock() before coroutine B does, then A will
always acquire access before B (unless A is destroyed before acquiring
access).
Since coroutines can be destroyed at any suspension point, explicitly calling
m.lock() and m.unlock() risks abandoning a mutex in the locked state.
Avoid this with cotamer::unique_lock, a guarded lock ownership class
mirroring std::unique_lock. Pass the unique_lock constructor the ownership
token returned by co_await m or co_await m.lock(). For instance:
struct lockable_fd {
cot::fd f;
cot::mutex mutex;
};
cot::task<> write_message(lockable_fd& lfd, std::string message) {
cot::unique_lock guard(co_await lfd.mutex);
// critical section — automatically unlocked when guard goes out of scope
// or coroutine is destroyed
size_t pos = 0;
while (pos != message.size()) {
auto rv = ::write(lfd.f.fileno(), message.data() + pos, message.size() - pos);
if (rv > 0) {
pos += rv;
} else if (errno == EAGAIN || errno == EWOULDBLOCK) {
// suspension point: coroutine might be destroyed!
co_await cot::writable(lfd.f);
} else {
throw std::system_error(errno, std::generic_category());
}
}
}
cotamer::unique_lock can also be explicitly locked or unlocked (with
co_await guard.lock() and guard.unlock()), and supports the standard
construction tags:
cot::unique_lock guard(m, std::defer_lock); // don't lock yet
co_await guard.lock(); // lock explicitly later
cot::unique_lock guard(m, std::try_to_lock); // non-blocking attempt
if (guard) {
// acquired
}
co_await m.lock();
cot::unique_lock guard(m, std::adopt_lock); // take ownership of existing lock
Active attempts to lock a mutex may be cancelled by destroying the
corresponding task. For instance, here, if the timeout fires, the m.lock()
attempt is safely removed from the mutex’s queue:
auto result = co_await cot::attempt(m.lock(), cot::after(1s));
if (result) {
cot::unique_lock guard(*result);
// acquired within 1 second
} else {
// timed out — lock was not acquired; this coroutine loses its place in line
}
Shared access
The exclusive lock mode is most useful in typical coroutine code, but
cotamer::mutex also supports a shared lock mode modelling readers–writer
locking. The shared lock mode uses *_shared mutex functions and the
cotamer::shared_lock guard.
| Function | Description |
|---|---|
co_await m.lock_shared() |
acquire shared access |
m.try_lock_shared() |
try to acquire shared access without suspending |
m.unlock_shared() |
release shared access |
The FIFO rule also applies to mixtures of shared and exclusive access. If
mutex m is acquired in shared mode, but an exclusive lock attempt has
already been enqueued, then another coroutine that calls m.lock_shared()
will suspend until the exclusive lock attempt has been serviced. On the other
hand, if no exclusive attempts are waiting, then m.lock_shared() will
proceed right away.