Discussion:
Better integration of multiprocessing with asyncio
Dan O'Reilly
2014-07-26 20:59:16 UTC
Permalink
I think it would be helpful for folks using the asyncio module to be able
to make non-blocking calls to objects in the multiprocessing module more
easily. While some use-cases for using multiprocessing can be replaced with
ProcessPoolExecutor/run_in_executor, there are others that cannot; more
advanced usages of multiprocessing.Pool aren't supported by
ProcessPoolExecutor (initializer/initargs, contexts, etc.), and other
multiprocessing classes like Lock and Queue have blocking methods that
could be made into coroutines.

Consider this (extremely contrived, but use your imagination) example of a
asyncio-friendly Queue:

import asyncio
import time

def do_proc_work(q, val, val2):
time.sleep(3) # Imagine this is some expensive CPU work.
ok = val + val2
print("Passing {} to parent".format(ok))
q.put(ok) # The Queue can be used with the normal blocking API, too.
item = q.get()
print("got {} back from parent".format(item))

def do_some_async_io_task():
# Imagine there's some kind of asynchronous I/O
# going on here that utilizes asyncio.
asyncio.sleep(5)

@asyncio.coroutine
def do_work(q):
loop.run_in_executor(ProcessPoolExecutor(),
do_proc_work, q, 1, 2)
do_some_async_io_task()
item = yield from q.coro_get() # Non-blocking get that won't affect our
io_task
print("Got {} from worker".format(item))
item = item + 25
yield from q.coro_put(item)


if __name__ == "__main__":
q = AsyncProcessQueue() # This is our new asyncio-friendly version of
multiprocessing.Queue
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))

I have seen some rumblings about a desire to do this kind of integration on
the bug tracker (http://bugs.python.org/issue10037#msg162497 and
http://bugs.python.org/issue9248#msg221963) though that discussion is
specifically tied to merging the enhancements from the Billiard library
into multiprocessing.Pool. Are there still plans to do that? If so, should
asyncio integration with multiprocessing be rolled into those plans, or
does it make sense to pursue it separately?

Even more generally, do people think this kind of integration is a good
idea to begin with? I know using asyncio is primarily about *avoiding* the
headaches of concurrent threads/processes, but there are always going to be
cases where CPU-intensive work is going to be required in a primarily
I/O-bound application. The easier it is to for developers to handle those
use-cases, the better, IMO.

Note that the same sort of integration could be done with the threading
module, though I think there's a fairly limited use-case for that; most
times you'd want to use threads over processes, you could probably just use
non-blocking I/O instead.

Thanks,
Dan
Guido van Rossum
2014-07-27 02:39:23 UTC
Permalink
I actually know very little about multiprocessing (have never used it) but
I imagine the way you normally interact with multiprocessing is using a
synchronous calls that talk to the subprocesses and their work queues and
so on, right?

In the asyncio world you would put that work in a thread and then use
run_in_executor() with a thread executor -- the thread would then be
managing the subprocesses and talking to them. While you are waiting for
that thread to complete your other coroutines will still work.

Unless you want to rewrite the communication and process management as
coroutines, but that sounds like a lot of work.
Post by Dan O'Reilly
I think it would be helpful for folks using the asyncio module to be able
to make non-blocking calls to objects in the multiprocessing module more
easily. While some use-cases for using multiprocessing can be replaced with
ProcessPoolExecutor/run_in_executor, there are others that cannot; more
advanced usages of multiprocessing.Pool aren't supported by
ProcessPoolExecutor (initializer/initargs, contexts, etc.), and other
multiprocessing classes like Lock and Queue have blocking methods that
could be made into coroutines.
Consider this (extremely contrived, but use your imagination) example of a
import asyncio
import time
time.sleep(3) # Imagine this is some expensive CPU work.
ok = val + val2
print("Passing {} to parent".format(ok))
q.put(ok) # The Queue can be used with the normal blocking API, too.
item = q.get()
print("got {} back from parent".format(item))
# Imagine there's some kind of asynchronous I/O
# going on here that utilizes asyncio.
asyncio.sleep(5)
@asyncio.coroutine
loop.run_in_executor(ProcessPoolExecutor(),
do_proc_work, q, 1, 2)
do_some_async_io_task()
item = yield from q.coro_get() # Non-blocking get that won't affect
our io_task
print("Got {} from worker".format(item))
item = item + 25
yield from q.coro_put(item)
q = AsyncProcessQueue() # This is our new asyncio-friendly version of
multiprocessing.Queue
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))
I have seen some rumblings about a desire to do this kind of integration
on the bug tracker (http://bugs.python.org/issue10037#msg162497 and
http://bugs.python.org/issue9248#msg221963) though that discussion is
specifically tied to merging the enhancements from the Billiard library
into multiprocessing.Pool. Are there still plans to do that? If so, should
asyncio integration with multiprocessing be rolled into those plans, or
does it make sense to pursue it separately?
Even more generally, do people think this kind of integration is a good
idea to begin with? I know using asyncio is primarily about *avoiding* the
headaches of concurrent threads/processes, but there are always going to be
cases where CPU-intensive work is going to be required in a primarily
I/O-bound application. The easier it is to for developers to handle those
use-cases, the better, IMO.
Note that the same sort of integration could be done with the threading
module, though I think there's a fairly limited use-case for that; most
times you'd want to use threads over processes, you could probably just use
non-blocking I/O instead.
Thanks,
Dan
_______________________________________________
Python-ideas mailing list
https://mail.python.org/mailman/listinfo/python-ideas
Code of Conduct: http://python.org/psf/codeofconduct/
--
--Guido van Rossum (python.org/~guido)
Dan O'Reilly
2014-07-27 03:34:29 UTC
Permalink
Right, this is the same approach I've used myself. For example, the
AsyncProcessQueue in my example above was implemented like this:

def AsyncProcessQueue(maxsize=0):
m = Manager()
q = m.Queue(maxsize=maxsize)
return _ProcQueue(q)

class _ProcQueue(object):
def __init__(self, q):
self._queue = q
self._executor = self._get_executor()
self._cancelled_join = False

def __getstate__(self):
self_dict = self.__dict__
self_dict['_executor'] = None
return self_dict

def _get_executor(self):
return ThreadPoolExecutor(max_workers=cpu_count())

def __setstate__(self, self_dict):
self_dict['_executor'] = self._get_executor()
self.__dict__.update(self_dict)

def __getattr__(self, name):
if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
'get', 'get_nowait', 'close']:
return getattr(self._queue, name)
else:
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))

@asyncio.coroutine
def coro_put(self, item):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.put,
item))

@asyncio.coroutine
def coro_get(self):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.get))

def cancel_join_thread(self):
self._cancelled_join = True
self._queue.cancel_join_thread()

def join_thread(self):
self._queue.join_thread()
if self._executor and not self._cancelled_join:
self._executor.shutdown()

I'm wondering if a complete library providing this kind of behavior for all
or some subset of multiprocessing is worth adding to the the asyncio
module, or if you prefer users to deal with this on their own (or perhaps
just distribute something that provides this behavior as a stand-alone
library). I suppose adding asyncio-friendly methods to the existing objects
in multiprocessing is also an option, but I doubt its desirable to add
asyncio-specific code to modules other than asyncio.

It also sort of sounds like some of the work that's gone on in Billiard
would make the alternative, more complicated approach you mentioned a
realistic possibility, at least going by this comment by Ask Solem (from
we have a version of multiprocessing.Pool using async IO and one pipe per process that drastically improves performance and also avoids the threads+forking issues (well, not the initial fork), but I have not yet adapted it to use the new asyncio module in 3.4.
I don't know the details there, though. Hopefully someone more
familiar with Billiard/multiprocessing than I am can provide some
additional information.
I actually know very little about multiprocessing (have never used it) but
I imagine the way you normally interact with multiprocessing is using a
synchronous calls that talk to the subprocesses and their work queues and
so on, right?
In the asyncio world you would put that work in a thread and then use
run_in_executor() with a thread executor -- the thread would then be
managing the subprocesses and talking to them. While you are waiting for
that thread to complete your other coroutines will still work.
Unless you want to rewrite the communication and process management as
coroutines, but that sounds like a lot of work.
Post by Dan O'Reilly
I think it would be helpful for folks using the asyncio module to be able
to make non-blocking calls to objects in the multiprocessing module more
easily. While some use-cases for using multiprocessing can be replaced with
ProcessPoolExecutor/run_in_executor, there are others that cannot; more
advanced usages of multiprocessing.Pool aren't supported by
ProcessPoolExecutor (initializer/initargs, contexts, etc.), and other
multiprocessing classes like Lock and Queue have blocking methods that
could be made into coroutines.
Consider this (extremely contrived, but use your imagination) example of
import asyncio
import time
time.sleep(3) # Imagine this is some expensive CPU work.
ok = val + val2
print("Passing {} to parent".format(ok))
q.put(ok) # The Queue can be used with the normal blocking API, too.
item = q.get()
print("got {} back from parent".format(item))
# Imagine there's some kind of asynchronous I/O
# going on here that utilizes asyncio.
asyncio.sleep(5)
@asyncio.coroutine
loop.run_in_executor(ProcessPoolExecutor(),
do_proc_work, q, 1, 2)
do_some_async_io_task()
item = yield from q.coro_get() # Non-blocking get that won't affect
our io_task
print("Got {} from worker".format(item))
item = item + 25
yield from q.coro_put(item)
q = AsyncProcessQueue() # This is our new asyncio-friendly version
of multiprocessing.Queue
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))
I have seen some rumblings about a desire to do this kind of integration
on the bug tracker (http://bugs.python.org/issue10037#msg162497 and
http://bugs.python.org/issue9248#msg221963) though that discussion is
specifically tied to merging the enhancements from the Billiard library
into multiprocessing.Pool. Are there still plans to do that? If so, should
asyncio integration with multiprocessing be rolled into those plans, or
does it make sense to pursue it separately?
Even more generally, do people think this kind of integration is a good
idea to begin with? I know using asyncio is primarily about *avoiding* the
headaches of concurrent threads/processes, but there are always going to be
cases where CPU-intensive work is going to be required in a primarily
I/O-bound application. The easier it is to for developers to handle those
use-cases, the better, IMO.
Note that the same sort of integration could be done with the threading
module, though I think there's a fairly limited use-case for that; most
times you'd want to use threads over processes, you could probably just use
non-blocking I/O instead.
Thanks,
Dan
_______________________________________________
Python-ideas mailing list
https://mail.python.org/mailman/listinfo/python-ideas
Code of Conduct: http://python.org/psf/codeofconduct/
--
--Guido van Rossum (python.org/~guido)
Guido van Rossum
2014-07-27 03:43:07 UTC
Permalink
I'm going to go out on a limb here and say that it feels too early to me.
First someone has to actually solve this problem well as a 3rd party
package before we can talk about adding it to the asyncio package. It
doesn't actually sound like Billiards has adapted to asyncio yet (not that
I have any idea what Billiards is -- it sounds like a fork of
multiprocessing actually?).
Post by Dan O'Reilly
Right, this is the same approach I've used myself. For example, the
m = Manager()
q = m.Queue(maxsize=maxsize)
return _ProcQueue(q)
self._queue = q
self._executor = self._get_executor()
self._cancelled_join = False
self_dict = self.__dict__
self_dict['_executor'] = None
return self_dict
return ThreadPoolExecutor(max_workers=cpu_count())
self_dict['_executor'] = self._get_executor()
self.__dict__.update(self_dict)
if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
return getattr(self._queue, name)
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))
@asyncio.coroutine
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.put,
item))
@asyncio.coroutine
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.get))
self._cancelled_join = True
self._queue.cancel_join_thread()
self._queue.join_thread()
self._executor.shutdown()
I'm wondering if a complete library providing this kind of behavior for
all or some subset of multiprocessing is worth adding to the the asyncio
module, or if you prefer users to deal with this on their own (or perhaps
just distribute something that provides this behavior as a stand-alone
library). I suppose adding asyncio-friendly methods to the existing objects
in multiprocessing is also an option, but I doubt its desirable to add
asyncio-specific code to modules other than asyncio.
It also sort of sounds like some of the work that's gone on in Billiard
would make the alternative, more complicated approach you mentioned a
realistic possibility, at least going by this comment by Ask Solem (from
we have a version of multiprocessing.Pool using async IO and one pipe per process that drastically improves performance and also avoids the threads+forking issues (well, not the initial fork), but I have not yet adapted it to use the new asyncio module in 3.4.
I don't know the details there, though. Hopefully someone more familiar with Billiard/multiprocessing than I am can provide some additional information.
I actually know very little about multiprocessing (have never used it)
but I imagine the way you normally interact with multiprocessing is using a
synchronous calls that talk to the subprocesses and their work queues and
so on, right?
In the asyncio world you would put that work in a thread and then use
run_in_executor() with a thread executor -- the thread would then be
managing the subprocesses and talking to them. While you are waiting for
that thread to complete your other coroutines will still work.
Unless you want to rewrite the communication and process management as
coroutines, but that sounds like a lot of work.
Post by Dan O'Reilly
I think it would be helpful for folks using the asyncio module to be
able to make non-blocking calls to objects in the multiprocessing module
more easily. While some use-cases for using multiprocessing can be replaced
with ProcessPoolExecutor/run_in_executor, there are others that cannot;
more advanced usages of multiprocessing.Pool aren't supported by
ProcessPoolExecutor (initializer/initargs, contexts, etc.), and other
multiprocessing classes like Lock and Queue have blocking methods that
could be made into coroutines.
Consider this (extremely contrived, but use your imagination) example of
import asyncio
import time
time.sleep(3) # Imagine this is some expensive CPU work.
ok = val + val2
print("Passing {} to parent".format(ok))
q.put(ok) # The Queue can be used with the normal blocking API, too.
item = q.get()
print("got {} back from parent".format(item))
# Imagine there's some kind of asynchronous I/O
# going on here that utilizes asyncio.
asyncio.sleep(5)
@asyncio.coroutine
loop.run_in_executor(ProcessPoolExecutor(),
do_proc_work, q, 1, 2)
do_some_async_io_task()
item = yield from q.coro_get() # Non-blocking get that won't affect
our io_task
print("Got {} from worker".format(item))
item = item + 25
yield from q.coro_put(item)
q = AsyncProcessQueue() # This is our new asyncio-friendly version
of multiprocessing.Queue
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))
I have seen some rumblings about a desire to do this kind of integration
on the bug tracker (http://bugs.python.org/issue10037#msg162497 and
http://bugs.python.org/issue9248#msg221963) though that discussion is
specifically tied to merging the enhancements from the Billiard library
into multiprocessing.Pool. Are there still plans to do that? If so, should
asyncio integration with multiprocessing be rolled into those plans, or
does it make sense to pursue it separately?
Even more generally, do people think this kind of integration is a good
idea to begin with? I know using asyncio is primarily about *avoiding* the
headaches of concurrent threads/processes, but there are always going to be
cases where CPU-intensive work is going to be required in a primarily
I/O-bound application. The easier it is to for developers to handle those
use-cases, the better, IMO.
Note that the same sort of integration could be done with the threading
module, though I think there's a fairly limited use-case for that; most
times you'd want to use threads over processes, you could probably just use
non-blocking I/O instead.
Thanks,
Dan
_______________________________________________
Python-ideas mailing list
https://mail.python.org/mailman/listinfo/python-ideas
Code of Conduct: http://python.org/psf/codeofconduct/
--
--Guido van Rossum (python.org/~guido)
--
--Guido van Rossum (python.org/~guido)
Ryan Hiebert
2014-07-27 03:48:02 UTC
Permalink
I'm going to go out on a limb here and say that it feels too early to me. First someone has to actually solve this problem well as a 3rd party package before we can talk about adding it to the asyncio package. It doesn't actually sound like Billiards has adapted to asyncio yet (not that I have any idea what Billiards is -- it sounds like a fork of multiprocessing actually?).
Yep, Billiard is a fork of multiprocessing: https://pypi.python.org/pypi/billiard
Nick Coghlan
2014-07-27 03:47:49 UTC
Permalink
Post by Dan O'Reilly
I'm wondering if a complete library providing this kind of behavior for all
or some subset of multiprocessing is worth adding to the the asyncio module,
or if you prefer users to deal with this on their own (or perhaps just
distribute something that provides this behavior as a stand-alone library).
I suppose adding asyncio-friendly methods to the existing objects in
multiprocessing is also an option, but I doubt its desirable to add
asyncio-specific code to modules other than asyncio.
Actually, having asyncio act as a "nexus" for asynchronous IO backends
is one of the reasons for its existence. The asyncio event loop is
pluggable, so making multiprocessing asyncio friendly (whether
directly, or as an addon library that bridges the two) *also* has the
effect of making it compatible with all the other asynchronous event
loops that can be plugged into the asyncio framework.

I'm inclined to agree with Guido, though - while I think making
asyncio and multiprocessing play well together is a good idea in
principle, I think we're still in the "third party exploration phase"
of that integration. Once folks figure out good ways to do it, *then*
we can start talking about making that integration a default part of
Python 3.5 or 3.6+.

Cheers,
Nick.
--
Nick Coghlan | ncoghlan-***@public.gmane.org | Brisbane, Australia
Loading...