Error asyncio task was destroyed but it is pending

I am working a sample program that reads from a datasource (csv or rdbms) in chunks, makes some transformation and sends it via socket to a server. But because the csv is very large, for testing pu...

I am working a sample program that reads from a datasource (csv or rdbms) in chunks, makes some transformation and sends it via socket to a server.

But because the csv is very large, for testing purpose I want to break the reading after few chunks.
Unfortunately something goes wrong and I do not know what and how to fix it. Probably I have to do some cancellation, but now sure where and how. I get the following error:

Task was destroyed but it is pending!
task: <Task pending coro=<<async_generator_athrow without __name__>()>>

The sample code is:

import asyncio
import json

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.001)
    yield chunk

async def send(row):
    j = json.dumps(row)
    print(f"to be sent: {j}")
    await asyncio.sleep(0.001)


async def main():
    i = 0
    async for chunk in readChunks():
        for k, v in chunk.items():
            await asyncio.gather(send({k:v}))
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")
    

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

MisterMiyagi's user avatar

MisterMiyagi

41.5k10 gold badges98 silver badges110 bronze badges

asked Jul 1, 2020 at 19:38

spyder's user avatar

2

Many async resources, such as generators, need to be cleaned up with the help of an event loop. When an async for loop stops iterating an async generator via break, the generator is cleaned up by the garbage collector only. This means the task is pending (waits for the event loop) but gets destroyed (by the garbage collector).

The most straightforward fix is to aclose the generator explicitly:

async def main():
    i = 0
    aiter = readChunks()      # name iterator in order to ...
    try:
        async for chunk in aiter:
            ...
            i += 1
            if i > 5:
                break
    finally:
        await aiter.aclose()  # ... clean it up when done

These patterns can be simplified using the asyncstdlib (disclaimer: I maintain this library). asyncstdlib.islice allows to take a fixed number of items before cleanly closing the generator:

import asyncstdlib as a

async def main():
    async for chunk in a.islice(readChunks(), 5):
        ...

If the break condition is dynamic, scoping the iterator guarantees cleanup in any case:

import asyncstdlib as a

async def main():
    async with a.scoped_iter(readChunks()) as aiter:
        async for idx, chunk in a.enumerate(aiter):
            ...
            if idx >= 5:
                break

answered Jul 1, 2020 at 20:01

MisterMiyagi's user avatar

MisterMiyagiMisterMiyagi

41.5k10 gold badges98 silver badges110 bronze badges

4

This works…

import asyncio
import json
import logging

logging.basicConfig(format='%(asctime)s.%(msecs)03d %(message)s',
                    datefmt='%S')
root = logging.getLogger()
root.setLevel(logging.INFO)

async def readChunks():
  # this is basically a dummy alternative for reading csv in chunks
  df = [{"chunk_" + str(x) : [r for r in range(10)]} for x in range(10)]
  for chunk in df:
    await asyncio.sleep(0.002)
    root.info('readChunks: next chunk coming')
    yield chunk

async def send(row):
    j = json.dumps(row)
    root.info(f"to be sent: {j}")
    await asyncio.sleep(0.002)


async def main():
    i = 0
    root.info('main: starting to read chunks')
    async for chunk in readChunks():
        for k, v in chunk.items():
            root.info(f'main: sending an item')
            #await asyncio.gather(send({k:v}))
            stuff = await send({k:v})
        i += 1
        if i > 5:
            break
        #print(f"item in main via async generator is {chunk}")

##loop = asyncio.get_event_loop()
##loop.run_until_complete(main())
##loop.close()

if __name__ == '__main__':

    asyncio.run(main())

… At least it runs and finishes.


The issue with stopping an async generator by reaking out of an async for loop is described in bugs.python.org/issue38013 and looks like it was fixed in 3.7.5.

However, using

loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(main())
loop.close()

I get a debug error but no Exception in Python 3.8.

Task was destroyed but it is pending!
task: <Task pending name='Task-8' coro=<<async_generator_athrow without __name__>()>>

Using the higher level API asyncio.run(main()) with debugging ON I do not get the debug message. If you are going to try and upgrade to Python 3.7.5-9 you probably should still use asyncio.run().

answered Jul 1, 2020 at 20:20

wwii's user avatar

wwiiwwii

22.8k7 gold badges37 silver badges76 bronze badges

5

The problem is simple. You do early exit from loop, but async generator is not exhausted yet(its pending):

...
if i > 5:
    break
...

answered Jul 1, 2020 at 19:53

alex_noname's user avatar

alex_nonamealex_noname

23.1k3 gold badges60 silver badges77 bronze badges

1

Your readChunks is running in async and your loop. and without completing the program you are breaking it.

That’s why it gives asyncio task was destroyed but it is pending

In short async task was doing its work in the background but you killed it by breaking the loop (stopping the program).

answered Jul 1, 2020 at 20:02

7u5h4r's user avatar

7u5h4r7u5h4r

4693 silver badges10 bronze badges

async def run_check(shell_command):
    p = await asyncio.create_subprocess_shell(shell_command,
                    stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    fut = p.communicate()
    try:
        pcap_run = await asyncio.wait_for(fut, timeout=5)
    except asyncio.TimeoutError:
        p.kill()
        await p.communicate()

def get_coros(pcap_list):
    for pcap_loc in pcap_list:
        for pcap_check in get_pcap_executables():
            tmp_coro = (run_check('{args}'
            .format(e=sys.executable, args=args)))
            if tmp_coro != False:
                coros.append(tmp_coro)
     return coros

async def main():
    pcap_list_gen = print_dir_cointent()
    for pcap_list in pcap_list_gen:
        p_coros = get_coros(pcap_list)
        for f in asyncio.as_completed(p_coros):
            res = await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

pcap_list_gen is a generator of PCAP lists which contain multile lists.
I get [OS: Error: To many open file] if I pass all pcaps in a single list, so decided to group them into list of smaller size and process one at a time.

Each pcap_list is a list of multiple PCAPS.
I want the next iteration of the loop to only start once the first iteration is finished.

for pcap_list in pcap_list_gen:
    p_coros = get_coros(pcap_list)
        for f in asyncio.as_completed(p_coros):
            res = await f

As per my knowledge: The first loop is getting executed properly asa the iteration goes to next loop it throws error.

Traceback:

Exception ignored in: <generator object BaseSubprocessTransport._connect_pipes at 0x7f7b7c165830>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 188, in _connect_pipes
    waiter.set_exception(exc)
  File "/usr/lib/python3.5/asyncio/futures.py", line 349, in set_exception
    self._schedule_callbacks()
  File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
    self._loop.call_soon(callback, self)
  File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
    handle = self._call_soon(callback, args)
  File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
    self._check_closed()
  File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
[ERROR]  Task was destroyed but it is pending!
task: <Task pending coro=<BaseSubprocessTransport._connect_pipes() done, defined at /usr/lib/python3.5/asyncio/base_subprocess.py:156> wait_for=<Future pending cb=[Task._wakeup()]>>

More logs

RuntimeError: Event loop is closed
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=70435 running stdin=<_UnixWritePipeTransport closing fd=12 open>>>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
  File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
[ERROR]  Task exception was never retrieved
future: <Task finished coro=<ClassificationCheck.run_check() done, defined at ./regression.py:159> exception=RuntimeError('cannot reuse already awaited coroutine',)>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine

O/P of _p_coros_ after for loop iteration in main() as

P_COROS(1st iteration):  [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>]
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
P_COROS(2nd iteration):  [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>, <coroutine object ClassificationCheck.run_check at 0x7f746d943048>, <coroutine object ClassificationCheck.run_check at 0x7f746d9430f8>]
Traceback (most recent call last):
  File "./regression.py", line 325, in <module>
    loop.run_until_complete(ClassCheck.main())
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "./regression.py", line 201, in main
    res = await f
  File "/usr/lib/python3.5/asyncio/tasks.py", line 492, in _wait_for_one
    return f.result()  # May raise f.exception().
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine
Exception ignored in: <bound method BaseEventLoop.__del__ of <_UnixSelectorEventLoop running=False closed=True debug=False>>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/base_events.py", line 431, in __del__
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 58, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 139, in remove_signal_handler
  File "/usr/lib/python3.5/signal.py", line 47, in signal
TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=89531 running stdin=<_UnixWritePipeTransport closing fd=8 open>>>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
  File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed

Issue

async def run_check(shell_command):
    p = await asyncio.create_subprocess_shell(shell_command,
                    stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    fut = p.communicate()
    try:
        pcap_run = await asyncio.wait_for(fut, timeout=5)
    except asyncio.TimeoutError:
        p.kill()
        await p.communicate()

def get_coros(pcap_list):
    for pcap_loc in pcap_list:
        for pcap_check in get_pcap_executables():
            tmp_coro = (run_check('{args}'
            .format(e=sys.executable, args=args)))
            if tmp_coro != False:
                coros.append(tmp_coro)
     return coros

async def main():
    pcap_list_gen = print_dir_cointent()
    for pcap_list in pcap_list_gen:
        p_coros = get_coros(pcap_list)
        for f in asyncio.as_completed(p_coros):
            res = await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

pcap_list_gen is a generator of PCAP lists which contain multile lists.
I get [OS: Error: To many open file] if I pass all pcaps in a single list, so decided to group them into list of smaller size and process one at a time.

Each pcap_list is a list of multiple PCAPS.
I want the next iteration of the loop to only start once the first iteration is finished.

for pcap_list in pcap_list_gen:
    p_coros = get_coros(pcap_list)
        for f in asyncio.as_completed(p_coros):
            res = await f

As per my knowledge: The first loop is getting executed properly asa the iteration goes to next loop it throws error.

Traceback:

Exception ignored in: <generator object BaseSubprocessTransport._connect_pipes at 0x7f7b7c165830>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 188, in _connect_pipes
    waiter.set_exception(exc)
  File "/usr/lib/python3.5/asyncio/futures.py", line 349, in set_exception
    self._schedule_callbacks()
  File "/usr/lib/python3.5/asyncio/futures.py", line 242, in _schedule_callbacks
    self._loop.call_soon(callback, self)
  File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
    handle = self._call_soon(callback, args)
  File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
    self._check_closed()
  File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
[ERROR]  Task was destroyed but it is pending!
task: <Task pending coro=<BaseSubprocessTransport._connect_pipes() done, defined at /usr/lib/python3.5/asyncio/base_subprocess.py:156> wait_for=<Future pending cb=[Task._wakeup()]>>

More logs

RuntimeError: Event loop is closed
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=70435 running stdin=<_UnixWritePipeTransport closing fd=12 open>>>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
  File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed
[ERROR]  Task exception was never retrieved
future: <Task finished coro=<ClassificationCheck.run_check() done, defined at ./regression.py:159> exception=RuntimeError('cannot reuse already awaited coroutine',)>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine

O/P of _p_coros_ after for loop iteration in main() as

P_COROS(1st iteration):  [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>]
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
awating in block 'finally'
P_COROS(2nd iteration):  [<coroutine object ClassificationCheck.run_check at 0x7f746d984ca8>, <coroutine object ClassificationCheck.run_check at 0x7f746d984db0>, <coroutine object ClassificationCheck.run_check at 0x7f746d984f68>, <coroutine object ClassificationCheck.run_check at 0x7f746d984fc0>, <coroutine object ClassificationCheck.run_check at 0x7f746d943048>, <coroutine object ClassificationCheck.run_check at 0x7f746d9430f8>]
Traceback (most recent call last):
  File "./regression.py", line 325, in <module>
    loop.run_until_complete(ClassCheck.main())
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "./regression.py", line 201, in main
    res = await f
  File "/usr/lib/python3.5/asyncio/tasks.py", line 492, in _wait_for_one
    return f.result()  # May raise f.exception().
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
RuntimeError: cannot reuse already awaited coroutine
Exception ignored in: <bound method BaseEventLoop.__del__ of <_UnixSelectorEventLoop running=False closed=True debug=False>>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/base_events.py", line 431, in __del__
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 58, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 139, in remove_signal_handler
  File "/usr/lib/python3.5/signal.py", line 47, in signal
TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object
Exception ignored in: <bound method BaseSubprocessTransport.__del__ of <_UnixSubprocessTransport closed pid=89531 running stdin=<_UnixWritePipeTransport closing fd=8 open>>>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 126, in __del__
  File "/usr/lib/python3.5/asyncio/base_subprocess.py", line 101, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 568, in close
  File "/usr/lib/python3.5/asyncio/unix_events.py", line 560, in write_eof
  File "/usr/lib/python3.5/asyncio/base_events.py", line 497, in call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 506, in _call_soon
  File "/usr/lib/python3.5/asyncio/base_events.py", line 334, in _check_closed
RuntimeError: Event loop is closed

Solution

Warning says that at the moment you call loop.close() something related to process still running. I guess you should wait() for it’s termination (read also note at the link). Try this:

try:
    pcap_run = await asyncio.wait_for(fut, timeout=5)
except asyncio.TimeoutError:
    p.terminate()
finally:
    await p.wait()

Upd:

Oh, you probably used global coros variable:

def get_coros(pcap_list):
    coros = []  # <--------------- create new list to fill
    for pcap_loc in pcap_list:
        for pcap_check in get_pcap_executables():
            tmp_coro = (run_check('{args}'
            .format(e=sys.executable, args=args)))
            if tmp_coro != False:
                coros.append(tmp_coro)
     return coros

Answered By — Mikhail Gerasimov

Maybe it will help

task.print_stack() output

Stack for <Task pending name='Task-12' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:306> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 306, in _send_loop
    self._send(await self._send_queue.get())
Stack for <Task pending name='Task-63' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:324> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 324, in _recv_loop
    data = await self._recv()
Stack for <Task pending name='Task-6' coro=<MTProtoSender._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:485> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 485, in _recv_loop
    body = await self._connection.recv()
Stack for <Task pending name='Task-3' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:306> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 306, in _send_loop
    self._send(await self._send_queue.get())
Stack for <Task pending name='Task-13' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:324> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 324, in _recv_loop
    data = await self._recv()
Stack for <Task pending name='Task-64' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 442, in _send_loop
    batch, data = await self._send_queue.get()
Stack for <Task pending name='Task-7' coro=<UpdateMethods._update_loop() running at /usr/local/lib/python3.8/site-packages/telethon/client/updates.py:328> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/client/updates.py", line 328, in _update_loop
    await asyncio.wait_for(
Stack for <Task pending name='Task-4' coro=<Connection._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:324> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 324, in _recv_loop
    data = await self._recv()
Stack for <Task pending name='Task-14' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 442, in _send_loop
    batch, data = await self._send_queue.get()
Stack for <Task pending name='Task-65' coro=<MTProtoSender._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:485> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 485, in _recv_loop
    body = await self._connection.recv()
Stack for <Task pending name='Task-62' coro=<Connection._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py:306> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/connection/connection.py", line 306, in _send_loop
    self._send(await self._send_queue.get())
Stack for <Task pending name='Task-5' coro=<MTProtoSender._send_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:442> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 442, in _send_loop
    batch, data = await self._send_queue.get()
Stack for <Task pending name='Task-15' coro=<MTProtoSender._recv_loop() running at /usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py:485> wait_for=<Future cancelled>> (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/telethon/network/mtprotosender.py", line 485, in _recv_loop
    body = await self._connection.recv()

This runtime warning can happen in many scenarios, but the cause are same:
A coroutine object is created by the invocation of an async function, but is never inserted into an EventLoop.

Consider following async function foo():

If you want to call foo() as an asynchronous task, and doesn’t care about the result:

prepare_for_foo()
foo()                                           

remaining_work_not_depends_on_foo()

This is because invoking foo() doesn’t actually runs the function foo(), but created a «coroutine object» instead.
This «coroutine object» will be executed when current EventLoop gets a chance: awaited/yield from is called or all previous tasks are finished.

To execute an asynchronous task without await, use loop.create_task() with loop.run_until_complete():

prepare_for_foo()
task = loop.create_task(foo())
remaining_work_not_depends_on_foo()
loop.run_until_complete(task)

If the coroutine object is created and inserted into an `EventLoop`, but was never finished, the next warning will appear.

The problem comes from closing the loop immediately after cancelling the tasks. As the cancel() docs state

«This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop.»

Take this snippet of code:

import asyncio
import signal


async def pending_doom():
    await asyncio.sleep(2)
    print(">> Cancelling tasks now")
    for task in asyncio.Task.all_tasks():
        task.cancel()

    print(">> Done cancelling tasks")
    asyncio.get_event_loop().stop()


def ask_exit():
    for task in asyncio.Task.all_tasks():
        task.cancel()


async def looping_coro():
    print("Executing coroutine")
    while True:
        try:
            await asyncio.sleep(0.25)
        except asyncio.CancelledError:
            print("Got CancelledError")
            break

        print("Done waiting")

    print("Done executing coroutine")
    asyncio.get_event_loop().stop()


def main():
    asyncio.async(pending_doom())
    asyncio.async(looping_coro())

    loop = asyncio.get_event_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    loop.run_forever()

    # I had to manually remove the handlers to
    # avoid an exception on BaseEventLoop.__del__
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.remove_signal_handler(sig)


if __name__ == '__main__':
    main()

Notice ask_exit cancels the tasks but does not stop the loop, on the next cycle looping_coro() stops it. The output if you cancel it is:

Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine

Notice how pending_doom cancels and stops the loop immediately after. If you let it run until the pending_doom coroutines awakes from the sleep you can see the same warning you’re getting:

Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>

The meaning of the issue is that a loop doesn’t have time to finish all the tasks.

This arranges for a CancelledError to be thrown into the wrapped coroutine on the next cycle through the event loop.

There is no chance to do a «next cycle» of the loop in your approach. To make it properly you should move a stop operation to a separate non-cyclic coroutine to give your loop a chance to finish.

Second significant thing is CancelledError raising.

Unlike Future.cancel(), this does not guarantee that the task will be cancelled: the exception might be caught and acted upon, delaying cancellation of the task or preventing cancellation completely. The task may also return a value or raise a different exception.

Immediately after this method is called, cancelled() will not return True (unless the task was already cancelled). A task will be marked as cancelled when the wrapped coroutine terminates with a CancelledError exception (even if cancel() was not called).

So after cleanup your coroutine must raise CancelledError to be marked as cancelled.

Using an extra coroutine to stop the loop is not an issue because it is not cyclic and be done immediately after execution.

def main():                                              
    loop = asyncio.get_event_loop()                      
    asyncio.ensure_future(listen_to_ipc_channel_layer()) 
                                                     
    for sig in (signal.SIGINT, signal.SIGTERM):          
        loop.add_signal_handler(sig, ask_exit)           
    loop.run_forever()                                   
    print("Close")                                       
    loop.close()                                         
                                                     
                                                     
@asyncio.coroutine                                       
def listen_to_ipc_channel_layer():                       
    while True:                                          
        try:                                             
            print("Running")                                 
            yield from asyncio.sleep(0.1)                
        except asyncio.CancelledError as e:              
            print("Break it out")                        
            raise e # Raise a proper error
                                                     
                                          
# Stop the loop concurrently           
@asyncio.coroutine                                       
def exit():                                              
    loop = asyncio.get_event_loop()                      
    print("Stop")                                        
    loop.stop()                                          


def ask_exit():                          
    for task in asyncio.Task.all_tasks():
        task.cancel()                    
    asyncio.ensure_future(exit())        
                                     
                                     
if __name__ == "__main__":               
    main()                               

The reasons this happens is as explained by @Yeray Diaz Diaz
In my case, I wanted to cancel all the tasks that were not done after the first finished, so I ended up cancelling the extra jobs, then using loop._run_once() to run the loop a bit more and allow them to stop:

    loop = asyncio.get_event_loop()
    job = asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    tasks_finished,tasks_pending, = loop.run_until_complete(job)
    tasks_done = [t for t in tasks_finished if t.exception() is None]
    if tasks_done == 0:
        raise Exception("Failed for all tasks.")
    assert len(tasks_done) == 1
    data = tasks_done[0].result()
    for t in tasks_pending:
        t.cancel()
        t.cancel()
    while not all([t.done() for t in tasks_pending]):
        loop._run_once()

Понравилась статья? Поделить с друзьями:
  • Error bin sh does not point to bash
  • Error asyncio task exception was never retrieved
  • Error assignment to expression with array type си
  • Error bgi graphics not supported under windows
  • Error assignment to cast is illegal lvalue casts are not supported