I am working a sample program that reads from a datasource (csv or rdbms) in chunks, makes some transformation and sends it via socket to a server.
But because the csv is very large, for testing purpose I want to break the reading after few chunks.
Unfortunately something goes wrong and I do not know what and how to fix it. Probably I have to do some cancellation, but now sure where and how. I get the following error:
Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>
The sample code is:
import asyncio
import json
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
for chunk in df:
await asyncio.sleep(0.001)
yield chunk
async def send(row):
j = json.dumps(row)
print(f"to be sent: {j}")
await asyncio.sleep(0.001)
async def main():
i = 0
async for chunk in readChunks():
for k, v in chunk.items():
await asyncio.gather(send({k:v}))
i += 1
if i > 5:
break
#print(f"item in main via async generator is {chunk}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
MisterMiyagi
41.5k10 gold badges98 silver badges110 bronze badges
asked Jul 1, 2020 at 19:38
2
Many async
resources, such as generators, need to be cleaned up with the help of an event loop. When an async for
loop stops iterating an async generator via break
, the generator is cleaned up by the garbage collector only. This means the task is pending (waits for the event loop) but gets destroyed (by the garbage collector).
The most straightforward fix is to aclose
the generator explicitly:
async def main():
i = 0
aiter = readChunks() # name iterator in order to ...
try:
async for chunk in aiter:
...
i += 1
if i > 5:
break
finally:
await aiter.aclose() # ... clean it up when done
These patterns can be simplified using the asyncstdlib
(disclaimer: I maintain this library). asyncstdlib.islice
allows to take a fixed number of items before cleanly closing the generator:
import asyncstdlib as a
async def main():
async for chunk in a.islice(readChunks(), 5):
...
If the break
condition is dynamic, scoping the iterator guarantees cleanup in any case:
import asyncstdlib as a
async def main():
async with a.scoped_iter(readChunks()) as aiter:
async for idx, chunk in a.enumerate(aiter):
...
if idx >= 5:
break
answered Jul 1, 2020 at 20:01
MisterMiyagiMisterMiyagi
41.5k10 gold badges98 silver badges110 bronze badges
4
This works…
import asyncio
import json
import logging
logging.basicConfig(format='%(asctime)s.%(msecs)03d %(message)s',
datefmt='%S')
root = logging.getLogger()
root.setLevel(logging.INFO)
async def readChunks():
# this is basically a dummy alternative for reading csv in chunks
df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
for chunk in df:
await asyncio.sleep(0.002)
root.info('readChunks: next chunk coming')
yield chunk
async def send(row):
j = json.dumps(row)
root.info(f"to be sent: {j}")
await asyncio.sleep(0.002)
async def main():
i = 0
root.info('main: starting to read chunks')
async for chunk in readChunks():
for k, v in chunk.items():
root.info(f'main: sending an item')
#await asyncio.gather(send({k:v}))
stuff = await send({k:v})
i += 1
if i > 5:
break
#print(f"item in main via async generator is {chunk}")
##loop = asyncio.get_event_loop()
##loop.run_until_complete(main())
##loop.close()
if __name__ == '__main__':
asyncio.run(main())
… At least it runs and finishes.
The issue with stopping an async generator by reaking out of an async for
loop is described in bugs.python.org/issue38013 and looks like it was fixed in 3.7.5.
However, using
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()
I get a debug error but no Exception in Python 3.8.
Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<<async_generator_athrow without __name__>()>>
Using the higher level API asyncio.run(main())
with debugging ON I do not get the debug message. If you are going to try and upgrade to Python 3.7.5-9 you probably should still use asyncio.run()
.
answered Jul 1, 2020 at 20:20
wwiiwwii
22.8k7 gold badges37 silver badges76 bronze badges
5
The problem is simple. You do early exit from loop, but async generator is not exhausted yet(its pending):
...
if i > 5:
break
...
answered Jul 1, 2020 at 19:53
alex_nonamealex_noname
23.1k3 gold badges60 silver badges77 bronze badges
1
Your readChunks
is running in async and your loop. and without completing the program you are breaking it.
That’s why it gives asyncio task was destroyed but it is pending
In short async task was doing its work in the background but you killed it by breaking the loop (stopping the program).
answered Jul 1, 2020 at 20:02
7u5h4r7u5h4r
4693 silver badges10 bronze badges
async def run_check(shell_command):
p = await asyncio.create_subprocess_shell(shell_command,
stdin=PIPE, stdout=PIPE, stderr=STDOUT)
fut = p.communicate()
try:
pcap_run = await asyncio.wait_for(fut, timeout=5)
except asyncio.TimeoutError:
p.kill()
await p.communicate()
def get_coros(pcap_list):
for pcap_loc in pcap_list:
for pcap_check in get_pcap_executables():
tmp_coro = (run_check('{args}'
.format(e=sys.executable, args=args)))
if tmp_coro != False:
coros.append(tmp_coro)
return coros
async def main():
pcap_list_gen = print_dir_cointent()
for pcap_list in pcap_list_gen:
p_coros = get_coros(pcap_list)
for f in asyncio.as_completed(p_coros):
res = await f
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
pcap_list_gen is a generator of PCAP lists which contain multile lists.
I get [OS: Error: To many open file] if I pass all pcaps in a single list, so decided to group them into list of smaller size and process one at a time.
Each pcap_list is a list of multiple PCAPS.
I want the next iteration of the loop to only start once the first iteration is finished.
for pcap_list in pcap_list_gen:
p_coros = get_coros(pcap_list)
for f in asyncio.as_completed(p_coros):
res = await f
As per my knowledge: The first loop is getting executed properly asa the iteration goes to next loop it throws error.
Traceback:
Exception ignored in: <generator object BaseSubprocessTransport._connect_pipes at 0x7f7b7c165830>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 188, in _connect_pipes
waiter.set_exception(exc)
File "/usr/lib/python3.5/asyncio/futures.py", line 349, in set_exception
self._schedule_callbacks()
File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
self._loop.call_soon(callback, self)
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
handle = self._call_soon(callback, args)
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
self._check_closed()
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
[ERROR] Task was destroyed but it is pending!
task: <Task pending coro=<BaseSubprocessTransport._connect_pipes() done, defined at /usr/lib/python3.5/asyncio/base_subprocess.py:156> wait_for=<Future pending cb=[Task._wakeup()]>>
More logs
RuntimeError: Event loop is closed
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=70435 running stdin=<_UnixWritePipeTransport closing fd=12 open>>>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
[ERROR] Task exception was never retrieved
future: <Task finished coro=<ClassificationCheck.run_check() done, defined at ./regression.py:159> exception=RuntimeError('cannot reuse already awaited coroutine',)>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine
O/P of _p_coros_ after for loop iteration in main() as
P_COROS(1st iteration): [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>]
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
P_COROS(2nd iteration): [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>, <coroutine object ClassificationCheck.run_check at 0x7f746d943048>, <coroutine object ClassificationCheck.run_check at 0x7f746d9430f8>]
Traceback (most recent call last):
File "./regression.py", line 325, in <module>
loop.run_until_complete(ClassCheck.main())
File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "./regression.py", line 201, in main
res = await f
File "/usr/lib/python3.5/asyncio/tasks.py", line 492, in _wait_for_one
return f.result() # May raise f.exception().
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine
Exception ignored in: <bound method BaseEventLoop.__del__ of <_UnixSelectorEventLoop running=False closed=True debug=False>>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_events.py", line 431, in __del__
File "/usr/lib/python3.5/asyncio/unix_events.py", line 58, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 139, in remove_signal_handler
File "/usr/lib/python3.5/signal.py", line 47, in signal
TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=89531 running stdin=<_UnixWritePipeTransport closing fd=8 open>>>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
Issue
async def run_check(shell_command):
p = await asyncio.create_subprocess_shell(shell_command,
stdin=PIPE, stdout=PIPE, stderr=STDOUT)
fut = p.communicate()
try:
pcap_run = await asyncio.wait_for(fut, timeout=5)
except asyncio.TimeoutError:
p.kill()
await p.communicate()
def get_coros(pcap_list):
for pcap_loc in pcap_list:
for pcap_check in get_pcap_executables():
tmp_coro = (run_check('{args}'
.format(e=sys.executable, args=args)))
if tmp_coro != False:
coros.append(tmp_coro)
return coros
async def main():
pcap_list_gen = print_dir_cointent()
for pcap_list in pcap_list_gen:
p_coros = get_coros(pcap_list)
for f in asyncio.as_completed(p_coros):
res = await f
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
pcap_list_gen is a generator of PCAP lists which contain multile lists.
I get [OS: Error: To many open file] if I pass all pcaps in a single list, so decided to group them into list of smaller size and process one at a time.
Each pcap_list is a list of multiple PCAPS.
I want the next iteration of the loop to only start once the first iteration is finished.
for pcap_list in pcap_list_gen:
p_coros = get_coros(pcap_list)
for f in asyncio.as_completed(p_coros):
res = await f
As per my knowledge: The first loop is getting executed properly asa the iteration goes to next loop it throws error.
Traceback:
Exception ignored in: <generator object BaseSubprocessTransport._connect_pipes at 0x7f7b7c165830>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 188, in _connect_pipes
waiter.set_exception(exc)
File "/usr/lib/python3.5/asyncio/futures.py", line 349, in set_exception
self._schedule_callbacks()
File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
self._loop.call_soon(callback, self)
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
handle = self._call_soon(callback, args)
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
self._check_closed()
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
[ERROR] Task was destroyed but it is pending!
task: <Task pending coro=<BaseSubprocessTransport._connect_pipes() done, defined at /usr/lib/python3.5/asyncio/base_subprocess.py:156> wait_for=<Future pending cb=[Task._wakeup()]>>
More logs
RuntimeError: Event loop is closed
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=70435 running stdin=<_UnixWritePipeTransport closing fd=12 open>>>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
[ERROR] Task exception was never retrieved
future: <Task finished coro=<ClassificationCheck.run_check() done, defined at ./regression.py:159> exception=RuntimeError('cannot reuse already awaited coroutine',)>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine
O/P of _p_coros_ after for loop iteration in main() as
P_COROS(1st iteration): [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>]
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
P_COROS(2nd iteration): [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>, <coroutine object ClassificationCheck.run_check at 0x7f746d943048>, <coroutine object ClassificationCheck.run_check at 0x7f746d9430f8>]
Traceback (most recent call last):
File "./regression.py", line 325, in <module>
loop.run_until_complete(ClassCheck.main())
File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "./regression.py", line 201, in main
res = await f
File "/usr/lib/python3.5/asyncio/tasks.py", line 492, in _wait_for_one
return f.result() # May raise f.exception().
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine
Exception ignored in: <bound method BaseEventLoop.__del__ of <_UnixSelectorEventLoop running=False closed=True debug=False>>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_events.py", line 431, in __del__
File "/usr/lib/python3.5/asyncio/unix_events.py", line 58, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 139, in remove_signal_handler
File "/usr/lib/python3.5/signal.py", line 47, in signal
TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=89531 running stdin=<_UnixWritePipeTransport closing fd=8 open>>>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
Solution
Warning says that at the moment you call loop.close()
something related to process still running. I guess you should wait() for it’s termination (read also note at the link). Try this:
try:
pcap_run = await asyncio.wait_for(fut, timeout=5)
except asyncio.TimeoutError:
p.terminate()
finally:
await p.wait()
Upd:
Oh, you probably used global coros
variable:
def get_coros(pcap_list):
coros = [] # <--------------- create new list to fill
for pcap_loc in pcap_list:
for pcap_check in get_pcap_executables():
tmp_coro = (run_check('{args}'
.format(e=sys.executable, args=args)))
if tmp_coro != False:
coros.append(tmp_coro)
return coros
Answered By — Mikhail Gerasimov
Maybe it will help
task.print_stack() output
Stack for <Task pending name='Task-12' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:306> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 306, in _send_loop
self._send(await self._send_queue.get())
Stack for <Task pending name='Task-63' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:324> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 324, in _recv_loop
data = await self._recv()
Stack for <Task pending name='Task-6' coro=<MTProtoSender._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:485> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 485, in _recv_loop
body = await self._connection.recv()
Stack for <Task pending name='Task-3' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:306> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 306, in _send_loop
self._send(await self._send_queue.get())
Stack for <Task pending name='Task-13' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:324> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 324, in _recv_loop
data = await self._recv()
Stack for <Task pending name='Task-64' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 442, in _send_loop
batch, data = await self._send_queue.get()
Stack for <Task pending name='Task-7' coro=<UpdateMethods._update_loop() running at /usr/local/lib/python3.8/site-packages/telethon/client/updates.py:328> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/client/updates.py", line 328, in _update_loop
await asyncio.wait_for(
Stack for <Task pending name='Task-4' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:324> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 324, in _recv_loop
data = await self._recv()
Stack for <Task pending name='Task-14' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 442, in _send_loop
batch, data = await self._send_queue.get()
Stack for <Task pending name='Task-65' coro=<MTProtoSender._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:485> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 485, in _recv_loop
body = await self._connection.recv()
Stack for <Task pending name='Task-62' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:306> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 306, in _send_loop
self._send(await self._send_queue.get())
Stack for <Task pending name='Task-5' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 442, in _send_loop
batch, data = await self._send_queue.get()
Stack for <Task pending name='Task-15' coro=<MTProtoSender._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:485> wait_for=<Future cancelled>> (most recent call last):
File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 485, in _recv_loop
body = await self._connection.recv()
This runtime warning can happen in many scenarios, but the cause are same:
A coroutine object is created by the invocation of an async
function, but is never inserted into an EventLoop
.
Consider following async
function foo()
:
If you want to call foo()
as an asynchronous task, and doesn’t care about the result:
prepare_for_foo() foo() remaining_work_not_depends_on_foo()
This is because invoking foo()
doesn’t actually runs the function foo()
, but created a «coroutine object» instead.
This «coroutine object» will be executed when current EventLoop
gets a chance: awaited/yield from
is called or all previous tasks are finished.
To execute an asynchronous task without await
, use loop.create_task()
with loop.run_until_complete()
:
prepare_for_foo()
task = loop.create_task(foo())
remaining_work_not_depends_on_foo()
loop.run_until_complete(task)
If the coroutine object is created and inserted into an `EventLoop`, but was never finished, the next warning will appear.
The problem comes from closing the loop immediately after cancelling the tasks. As the cancel() docs state
«This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop.»
Take this snippet of code:
import asyncio
import signal
async def pending_doom():
await asyncio.sleep(2)
print(">> Cancelling tasks now")
for task in asyncio.Task.all_tasks():
task.cancel()
print(">> Done cancelling tasks")
asyncio.get_event_loop().stop()
def ask_exit():
for task in asyncio.Task.all_tasks():
task.cancel()
async def looping_coro():
print("Executing coroutine")
while True:
try:
await asyncio.sleep(0.25)
except asyncio.CancelledError:
print("Got CancelledError")
break
print("Done waiting")
print("Done executing coroutine")
asyncio.get_event_loop().stop()
def main():
asyncio.async(pending_doom())
asyncio.async(looping_coro())
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
loop.run_forever()
# I had to manually remove the handlers to
# avoid an exception on BaseEventLoop.__del__
for sig in (signal.SIGINT, signal.SIGTERM):
loop.remove_signal_handler(sig)
if __name__ == '__main__':
main()
Notice ask_exit
cancels the tasks but does not stop
the loop, on the next cycle looping_coro()
stops it. The output if you cancel it is:
Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine
Notice how pending_doom
cancels and stops the loop immediately after. If you let it run until the pending_doom
coroutines awakes from the sleep you can see the same warning you’re getting:
Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>
The meaning of the issue is that a loop doesn’t have time to finish all the tasks.
This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop.
There is no chance to do a «next cycle» of the loop in your approach. To make it properly you should move a stop operation to a separate non-cyclic coroutine to give your loop a chance to finish.
Second significant thing is CancelledError
raising.
Unlike Future.cancel(), this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.
Immediately after this method is called, cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).
So after cleanup your coroutine must raise CancelledError
to be marked as cancelled.
Using an extra coroutine to stop the loop is not an issue because it is not cyclic and be done immediately after execution.
def main():
loop = asyncio.get_event_loop()
asyncio.ensure_future(listen_to_ipc_channel_layer())
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, ask_exit)
loop.run_forever()
print("Close")
loop.close()
@asyncio.coroutine
def listen_to_ipc_channel_layer():
while True:
try:
print("Running")
yield from asyncio.sleep(0.1)
except asyncio.CancelledError as e:
print("Break it out")
raise e # Raise a proper error
# Stop the loop concurrently
@asyncio.coroutine
def exit():
loop = asyncio.get_event_loop()
print("Stop")
loop.stop()
def ask_exit():
for task in asyncio.Task.all_tasks():
task.cancel()
asyncio.ensure_future(exit())
if __name__ == "__main__":
main()
The reasons this happens is as explained by @Yeray Diaz Diaz
In my case, I wanted to cancel all the tasks that were not done after the first finished, so I ended up cancelling the extra jobs, then using loop._run_once()
to run the loop a bit more and allow them to stop:
loop = asyncio.get_event_loop()
job = asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
tasks_finished,tasks_pending, = loop.run_until_complete(job)
tasks_done = [t for t in tasks_finished if t.exception() is None]
if tasks_done == 0:
raise Exception("Failed for all tasks.")
assert len(tasks_done) == 1
data = tasks_done[0].result()
for t in tasks_pending:
t.cancel()
t.cancel()
while not all([t.done() for t in tasks_pending]):
loop._run_once()