aiochannel - Closable Queues for asyncio
Years ago when I still did a lot of python and when asyncio was fairly new, I build a library for closable queues inspired by Go’s channels.
I was recently cleaning up and reviewing my github repos and realized that I have never actually written anything about aiochannel.
I can see that it still receives some downloads so I guess this is as good a time as any to write about why I think it’s a useful library.
§aiochannel
Aiochannel is a small queue-like library built for asyncio that adds
the ability to mark a queue as closed to be sure that no further items can be added.
This is really useful when you have a finite amount of “stuff” to work on.
The main class that aiochannel introduces is the Channel
.
The implementation is similar to the asyncio.Queue
module also and mirrors it’s API.
Channel
introduces two new methods close()
and closed()
.
close()
which marks the channel so that no further work can be put()
onto it.
closed()
simple returns True
or False
depending on if the queue is closed and drained or not.
Both the Channel
and the Queue
has a concept of being “finished”,
but there are some key differences in how it behaves:
In the case of a Queue
it is marked as “finished” when the queue size is exactly zero.
This “finished” state is the condition for Queue
’s join()
, so once the queue
size reaches 0, the queue joins. However, if work is later put()
back onto the queue,
the finished state is un-set again.
Channel
also joins once it is marked as “finished” but in contrast to Queue
it is only marked “finished” when it is both drained and marked as closed, guaranteeing that no futher work can appear on the channel.
A Channel
’s “finished” state can never be un-set and neither can it’s close
state.
This means that once a Channel
joins you know that you’ve done all the work that could
ever appear on that Channel
object.
In addition, a Channel
does not have Queue
’s task_done()
. The signal for when
all work is completed is that the Channel
joins, meaning it is closed and there are zero items left in the channel.
Because of this behaviour it’s pretty intuitive how an async for
would behave over
this object. The async for
runs as log as there is possible work to do.
Once the Channel
joins the loop is done.
This means that building a worker over the channel is trivially easy, as you can just spawn the following code in a task:
This is - in my opinion - a much better abstraction for most of the use-cases
where I’d otherwise reach for asyncio.Queue
and some Event
signals.
With a Channel
you can’t forget to mark work as done and you can’t accidentally
race to having a Queue
that has joined and then later received more work.