Timeout error aiohttp

Word of notice: This is my first approach with asyncio, so I might have done something really stupid. Scenario is as follows: I need to "http-ping" a humongous list of urls to check if they

To answer your question — no you did nothing wrong. I can’t see anything wrong with your code in terms of http request/response/timeout handling.

If indeed all your requests are timing out to the host (http://192.168.59.37) I suspect the issues are you are experiencing are most likely down to how your network is resolving requests (or how your code is building the url).

You can confirm whether requests are independently succeeding/failing using a tool like curl, eg:

curl "http://192.168.59.37/abc.html"

I tested it locally by using

python3 -m http.server 8080

and placing an empty files ‘abc’ and ‘abc.html’ in the same directory, updating the base_url

base_url = "http://127.0.0.1:8080"

with my minor updates (code below) here’s the output.

http://127.0.0.1:8080/.bashrc.php
#404
http://127.0.0.1:8080/.bashrc
#404
http://127.0.0.1:8080/.bashrc.html
#404
http://127.0.0.1:8080/abc
#HIT   2020-11-03 12:57:33 200  http://127.0.0.1:8080/abc
http://127.0.0.1:8080/zt.php
#404
http://127.0.0.1:8080/zt.html
#404
http://127.0.0.1:8080/zt
#404
http://127.0.0.1:8080/abc.html
#HIT   2020-11-03 12:57:33 200  http://127.0.0.1:8080/abc.html
http://127.0.0.1:8080/abc.php
#404
DONE

My updates are mostly minor but it might help with further debugging.

  • For debug, print the url. Important to determine if the code was building the url correctly. This highlighted to me that ‘php’ extension is missing a «.», so it would be looking for abcphp, not abc.php.
  • Use response.ok to test a successful http response, your code wasn’t handling 500 errors (instead it was returning hit).
  • using python f-string for cleaner formatting
import asyncio
import aiohttp
import datetime


async def get_data_coroutine(session, url, follow_redirects, timeout_seconds, retries):
    try:
        async with session.get(
            url, allow_redirects=False, timeout=timeout_seconds
        ) as response:
            print(url)
            now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            if response.ok:
                print(f"#HIT   {now} {response.status}  {url}")
            else:
                status = response.status
                if status == 404:
                    print("#404")
                elif 300 <= status and status < 400:
                    location = str(response).split("Location': '")[1].split("'")[0]
                    print(f"#HIT   {now}  {status} {url} ---> {location}")
                    if follow_redirects is True:
                        return await get_data_coroutine(
                            session, location, follow_redirects, timeout_seconds, retries
                        )
                else:
                    print("#ERROR ", response.status)
            return None
    except asyncio.TimeoutError as e:
        now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"#ERROR   {now} {url} TIMEOUT ", e)
        return None


async def main(loop):
    base_url = "http://127.0.0.1:8080"
    extensions = ["", ".html", ".php"]
    fd = open("/usr/share/wordlists/dirb/common.txt", "r")
    words_without_suffix = [x.strip() for x in fd.readlines()]
    words_with_suffix = [
        base_url + "/" + x + y for x in words_without_suffix for y in extensions
    ]
    follow = True
    total_timeout = aiohttp.ClientTimeout(total=60 * 60 * 24)
    timeout_seconds = 10
    retries = 1
    async with aiohttp.ClientSession(loop=loop, timeout=total_timeout) as session:
        tasks = [
            get_data_coroutine(session, url, follow, timeout_seconds, retries)
            for url in words_with_suffix
        ]
        await asyncio.gather(*tasks)
    print("DONE")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(main(loop))

To answer your question — no you did nothing wrong. I can’t see anything wrong with your code in terms of http request/response/timeout handling.

If indeed all your requests are timing out to the host (http://192.168.59.37) I suspect the issues are you are experiencing are most likely down to how your network is resolving requests (or how your code is building the url).

You can confirm whether requests are independently succeeding/failing using a tool like curl, eg:

curl "http://192.168.59.37/abc.html"

I tested it locally by using

python3 -m http.server 8080

and placing an empty files ‘abc’ and ‘abc.html’ in the same directory, updating the base_url

base_url = "http://127.0.0.1:8080"

with my minor updates (code below) here’s the output.

http://127.0.0.1:8080/.bashrc.php
#404
http://127.0.0.1:8080/.bashrc
#404
http://127.0.0.1:8080/.bashrc.html
#404
http://127.0.0.1:8080/abc
#HIT   2020-11-03 12:57:33 200  http://127.0.0.1:8080/abc
http://127.0.0.1:8080/zt.php
#404
http://127.0.0.1:8080/zt.html
#404
http://127.0.0.1:8080/zt
#404
http://127.0.0.1:8080/abc.html
#HIT   2020-11-03 12:57:33 200  http://127.0.0.1:8080/abc.html
http://127.0.0.1:8080/abc.php
#404
DONE

My updates are mostly minor but it might help with further debugging.

  • For debug, print the url. Important to determine if the code was building the url correctly. This highlighted to me that ‘php’ extension is missing a «.», so it would be looking for abcphp, not abc.php.
  • Use response.ok to test a successful http response, your code wasn’t handling 500 errors (instead it was returning hit).
  • using python f-string for cleaner formatting
import asyncio
import aiohttp
import datetime


async def get_data_coroutine(session, url, follow_redirects, timeout_seconds, retries):
    try:
        async with session.get(
            url, allow_redirects=False, timeout=timeout_seconds
        ) as response:
            print(url)
            now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            if response.ok:
                print(f"#HIT   {now} {response.status}  {url}")
            else:
                status = response.status
                if status == 404:
                    print("#404")
                elif 300 <= status and status < 400:
                    location = str(response).split("Location': '")[1].split("'")[0]
                    print(f"#HIT   {now}  {status} {url} ---> {location}")
                    if follow_redirects is True:
                        return await get_data_coroutine(
                            session, location, follow_redirects, timeout_seconds, retries
                        )
                else:
                    print("#ERROR ", response.status)
            return None
    except asyncio.TimeoutError as e:
        now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"#ERROR   {now} {url} TIMEOUT ", e)
        return None


async def main(loop):
    base_url = "http://127.0.0.1:8080"
    extensions = ["", ".html", ".php"]
    fd = open("/usr/share/wordlists/dirb/common.txt", "r")
    words_without_suffix = [x.strip() for x in fd.readlines()]
    words_with_suffix = [
        base_url + "/" + x + y for x in words_without_suffix for y in extensions
    ]
    follow = True
    total_timeout = aiohttp.ClientTimeout(total=60 * 60 * 24)
    timeout_seconds = 10
    retries = 1
    async with aiohttp.ClientSession(loop=loop, timeout=total_timeout) as session:
        tasks = [
            get_data_coroutine(session, url, follow, timeout_seconds, retries)
            for url in words_with_suffix
        ]
        await asyncio.gather(*tasks)
    print("DONE")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(main(loop))

Long story short

Hello
I have a strange issue with aiohttp.client, sometimes when I try to read a response body, it crashes by timeout.

My function:

async def test(self, value: str) -> dict:
       timeout = aiohttp.ClientTimeout(total=30)
       async with aiohttp.ClientSession(timeout=self.timeout) as session:
            resp = await session.get(
                urljoin(url, "api/"), params={"value": value}
            )
       if resp.status == 200:
            response = await resp.json()
            return response
       else:
            err_message = await resp.text()
            raise MyException(resp.status, err_message)

Traceback:

---------------------------------------------------------------------------
TimeoutError                              Traceback (most recent call last)
<ipython-input-95-032af7fa0ee9> in <module>
----> 1 resp = asyncio.run(st.query("select * from sources * where source matches "groennieuws.nl" and author matches "Wesley Van Der Linde";"))

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/runners.py in run(main, debug)
     41         events.set_event_loop(loop)
     42         loop.set_debug(debug)
---> 43         return loop.run_until_complete(main)
     44     finally:
     45         try:

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py in run_until_complete(self, future)
    582             raise RuntimeError('Event loop stopped before Future completed.')
    583 
--> 584         return future.result()
    585 
    586     def stop(self):

~/scripts/consumer.py in query(self, yql_query)
     77         if resp.status == 200:
     78             response = await resp.json()
---> 79             return response
     80         else:
     81             err_message = await resp.text()

~/.pyenv/versions/3.7.3/envs/test/lib/python3.7/site-packages/aiohttp/client_reqrep.py in json(self, encoding, loads, content_type)
   1015         """Read and decodes JSON response."""
   1016         if self._body is None:
-> 1017             await self.read()
   1018 
   1019         if content_type:

~/.pyenv/versions/3.7.3/envs/test/lib/python3.7/site-packages/aiohttp/client_reqrep.py in read(self)
    967         if self._body is None:
    968             try:
--> 969                 self._body = await self.content.read()
    970                 for trace in self._traces:
    971                     await trace.send_response_chunk_received(self._body)

~/.pyenv/versions/3.7.3/envs/test/lib/python3.7/site-packages/aiohttp/streams.py in read(self, n)
    357             blocks = []
    358             while True:
--> 359                 block = await self.readany()
    360                 if not block:
    361                     break

~/.pyenv/versions/3.7.3/envs/test/lib/python3.7/site-packages/aiohttp/streams.py in readany(self)
    379         # without feeding any data
    380         while not self._buffer and not self._eof:
--> 381             await self._wait('readany')
    382 
    383         return self._read_nowait(-1)

~/.pyenv/versions/3.7.3/envs/test/lib/python3.7/site-packages/aiohttp/streams.py in _wait(self, func_name)
    295             if self._timer:
    296                 with self._timer:
--> 297                     await waiter
    298             else:
    299                 await waiter

~/.pyenv/versions/3.7.3/envs/test/lib/python3.7/site-packages/aiohttp/helpers.py in __exit__(self, exc_type, exc_val, exc_tb)
    583 
    584         if exc_type is asyncio.CancelledError and self._cancelled:
--> 585             raise asyncio.TimeoutError from None
    586         return None
    587 

TimeoutError:

Steps to reproduce

resp = asyncio.run(test("test"))

Your environment

aiohttp==3.5.4
python3.7.3

Issue

Word of notice: This is my first approach with asyncio, so I might have done something really stupid.

Scenario is as follows:

I need to «http-ping» a humongous list of urls to check if they respond 200 or any other value. I get timeouts for each and every request, though tools like gobuster report 200,403, etc.

My code is sth similar to this:

import asyncio,aiohttp
import datetime 
#-------------------------------------------------------------------------------------
async def get_data_coroutine(session,url,follow_redirects,timeout_seconds,retries):
    #print('#DEBUG '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' '+url)
    try:
        async with session.get(url,allow_redirects=False,timeout=timeout_seconds) as response:
            status  =   response.status
            #res     =   await response.text()
            if(  status==404):
                pass
            elif(300<=status and status<400):
                location = str(response).split("Location': '")[1].split("'")[0]
                print('#HIT   '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' '+str(status)+' '+url+' ---> '+location)
                if(follow_redirects==True):
                    return await get_data_coroutine(session,location,follow_redirects,timeout_seconds,retries)
            else:
                print('#HIT   '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' '+str(status)+' '+url)
            return None
    except asyncio.exceptions.TimeoutError as e:
        print('#ERROR '+datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')+' '+'   '+' '+url+' TIMEOUT '+str(e))
        return None
#---------------------------------------------------------------------------    
async def main(loop):
        base_url                =   'http://192.168.59.37'
        extensions              =   ['','.html','php']
        fd                      =   open('/usr/share/wordlists/dirb/common.txt','r')
        words_without_suffix    =   [x.strip() for x in fd.readlines()]#[-5:] #DEBUG!
        words_with_suffix       =   [base_url+'/'+x+y for x in words_without_suffix for y in extensions]
        follow                  =   True
        total_timeout           =   aiohttp.ClientTimeout(total=60*60*24)
        timeout_seconds         =   10
        retries                 =   1
        async with aiohttp.ClientSession(loop=loop,timeout=total_timeout) as session:
            tasks = [get_data_coroutine(session,url,follow,timeout_seconds,retries) for url in words_with_suffix]
            await asyncio.gather(*tasks)
        print('DONE')
#---------------------------------------------------------------------------    
if(__name__=='__main__'):
    loop    =   asyncio.get_event_loop()
    result  =   loop.run_until_complete(main(loop))
   

Did I do something really wrong?

Any word of advice?

Thank you SO much!

Solution

Actually, I ended up finding an open issue in aio-libs/aiohttp:
https://github.com/aio-libs/aiohttp/issues/3203

This way, they suggest a workaround that achieves my needs:

session_timeout =   aiohttp.ClientTimeout(total=None,sock_connect=timeout_seconds,sock_read=timeout_seconds)
async with aiohttp.ClientSession(timeout=session_timeout) as session:
    async with session.get(url,allow_redirects=False,timeout=1) as response:
       ...

Answered By — glezo

Перевод официальной документации aiohttp http client

Для Python3.5+

Переводил почти дословно. Перевод так себе, но так как информации на русском о aiohttp почти нету, думаю, будет полезно.

Создание запроса (Request)

Импортируем aiohttp модуль:

import aiohttp

Теперь давайте попробуем получить веб-страницу. Например получим публичную хронику гитхаба

r = await aiohttp.get('https://api.github.com/events')

Теперь мы имеем объект ClientResponse с именем r. Мы можем получить всю информацию с этого объекта. Обязательный параметр aiohttp.get это URL страницы.
Для того чтобы сделать POST запрос нужно использовать aiohttp.post:

r = await aiohttp.post('http://httpbin.org/post', data=b'data')

Другие HTTP методы:

r = await aiohttp.put('http://httpbin.org/put', data=b'data')
r = await aiohttp.delete('http://httpbin.org/delete')
r = await aiohttp.head('http://httpbin.org/get')
r = await aiohttp.options('http://httpbin.org/get')
r = await aiohttp.patch('http://httpbin.org/patch', data=b'data')

Передача параметров в УРЛах (URLs)

Часто приходится передавать данные в строке запроса URL (query string). Эти данные будут приведены в виде пар ключ=значение после знака вопроса [?], например, httpbin.org/get?key=val. aiohttp позволяет передавать эти аргументы с словаря, используя именованный аргумент params. Например, если Вы хотите передать key1=value1 и key2=value2 (отправить запрос на урл http://httpbin.org/get?key1=value1&key2=value2) можно использовать следующий код:

>>> import asyncio
>>> import aiohttp
>>> async def test():
...     payload = {'key1': 'value1', 'key2': 'value2'}
...     async with aiohttp.get('http://httpbin.org/get', params=payload) as r:
...         print(r.url)
... 
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(test())
http://httpbin.org/get?key2=value2&key1=value1
>>> loop.close() 

Примечание
В официальной документации дан пример:

payload = [('key', 'value1'), ('key', 'value2')]
async with aiohttp.get('http://httpbin.org/get',
                       params=payload) as r:
    assert r.url == 'http://httpbin.org/get?key=value2&key=value1'

который неправильный, так как словарь отдает элементы не обязательно в таком порядке как они записаны в словаре, то есть вполне может быть такая ситуация:

>>> import asyncio
>>> import aiohttp
>>> async def test():
...     payload = {'key1': 'value1', 'key2': 'value2'}
...     async with aiohttp.get('http://httpbin.org/get', params=payload) as r:
...         print(r.url)
...         print(r.url == 'http://httpbin.org/get?key2=value2&key1=value1')
... 
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(test())
http://httpbin.org/get?key1=value1&key2=value2
False
>>> loop.close()

Вы можете увидеть что УРЛ правильно сформирован. При этом данные будут закодированы.
Также можно передать список двухэлементных кортежей, при такой передачи можно указать несколько значений для одного ключа:

...
>>> async def test():
...     payload = [('key', 'value1'), ('key', 'value2')]
...     async with aiohttp.get('http://httpbin.org/get', params=payload) as r:
...         print(r.url)
... 
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(test())
http://httpbin.org/get?key=value1&key=value2
…

Вы также можете передавать в параметр param строку, но данные не будут закодированы:

...
>>> async def test():
...     payload = {'key': 'value+1'}
...     async with aiohttp.get('http://httpbin.org/get', params=payload) as r:
...         print(r.url)
...     async with aiohttp.get('http://httpbin.org/get', params='key=value+1') as r:
...         print(r.url)
... 
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(test())
http://httpbin.org/get?key=value%2B1
http://httpbin.org/get?key=value+1
...

Содержимое ответа

Мы можем читать содержимое ответа. Рассмотрим хронику гитхаба:

r = await aiohttp.get('https://api.github.com/events')
print(await r.text())

и увидим что-то вроде:

'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{…

aiohttp автоматически декодирует данные полученные с сервера. Вы можете принудительно указать кодировку:

await r.text(encoding='windows-1251')

Двоичное содержимое ответа

Вы также можете получить ответ в байт-строке для нетекстовых запросов:

print(await r.read())
b'[{"created_at":"2015-06-12T14:06:22Z","public":true,"actor":{…

gzip и deflate компрессоры автоматически декодирует.

Ответ в формате JSON

Если Вы имеете дело с JSON данными, то также есть встроенный JSON декодер:

async with aiohttp.get('https://api.github.com/events') as r:
    print(await r.json())

В случае ошибки декодирования будет вызвано исключение. В метод также можно передавать кодировку и пользовательскую функцию для обработки JSON.

Потоковый ответ

Хоть методы read(), json() и text() удобны в использовании, но Вы должны использовать их осторожно, так как эти методы загружают все данные в память. Например, если Вы хотите загрузить файл размером в несколько гигабайт, то перед тем как с ним что-то делать эти методы сначала загрузят этот файл в оперативную память. Чтобы решить эту проблему Вы можете использовать атрибут content, он является экземпляром касса aiohttp.StreamReader. gzip и deflate автоматически декодируют данные:

async with aiohttp.get('https://api.github.com/events') as r:
    await r.content.read(10)

Для получения файла Вы можете использовать следующий паттерн:

with open(filename, 'wb') as fd:
    while True:
        chunk = await r.content.read(chunk_size)
        if not chunk:
            break
        fd.write(chunk)

После чтения с content Вы уже не можете использовать read(), json() и text()

Освобождение канала после получения ответа

Не забудьте освободить канал после получение ответа. Это будет гарантировать явное поведение и правильное пул соединений.
Самый простой способ это использование async with:

async with client.get(url) as resp:
    pass

Но также можно явно вызвать метод release():

await r.release()

Этого можно не делать если Вы используете read(), json() или text(), так как они это делает автоматически, но все же лучше перестраховываться.

Пользовательские заголовки (headers)

Если Вам нужно добавить свой HTTP заголовок в запрос, Вы можете передать словарь в именованный параметр headers. Например, Вы хотите указать тип содержимого:

import json
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}
headers = {'content-type': 'application/json'}
await aiohttp.post(url, data=json.dumps(payload), headers=headers) 

Пользовательские куки (cookies)

Для отправки своих куков при запросе, можно передать словарь в именованный параметр cookies. Например:

url = 'http://httpbin.org/cookies'
cookies = dict(cookies_are='working')

async with aiohttp.get(url, cookies=cookies) as r:
    assert await r.json() == {"cookies": {"cookies_are": "working"}}

Более сложные POST запросы

Возможно, вам требуется отправлять более сложные запросы, например — данные форм, как формы в HTML. Что бы сделать это — просто передайте словарь с данными именованному параметру data:

payload = {'key1': 'value1', 'key2': 'value2'}
async with aiohttp.post('http://httpbin.org/post',
                        data=payload) as r:
    print(await r.text())
{
  ...
  "form": {
    "key2": "value2",
    "key1": "value1"
  },
  ...
}

Есди нужно передать данные не в виде формы, для этого нужно передать строку вместо словаря и данные будут добавлены напрямую.
Например, GitHub API v3 принимает POST/PATCH в формате JSON:

import json
url = 'https://api.github.com/some/endpoint'
payload = {'some': 'data'}

r = await aiohttp.post(url, data=json.dumps(payload))

POST для файлов составной кодировки

Пример загрузки Multipart-encoded файлов:

url = 'http://httpbin.org/post'
files = {'file': open('report.xls', 'rb')}

await aiohttp.post(url, data=files)

Вы так же можете указать имя файла и тип контента явным образом:

url = 'http://httpbin.org/post'
data = FormData()
data.add_field('file',
               open('report.xls', 'rb'),
               filename='report.xls',
               content_type='application/vnd.ms-excel')

await aiohttp.post(url, data=data)

Если вы передаете объект файла в качестве параметра data, aiohttp будет автоматически отправлять его на сервер потоком. Проверяйте StreamReader на поддержку формата информации.
Потоковая загрузка файлов
aiohttp поддерживает несколько способов потоковой загрузки, которые позволяют отправлять большие файлы не загружая их целиком в память.
Простой способ это передать файловый объект в теле POST ответа:

with open('massive-body', 'rb') as f:
   await aiohttp.post('http://some.url/streamed', data=f)

Или вы можете использовать coroutine, которая будет отдавать байт-объекты

@asyncio.coroutine
def my_coroutine():
   chunk = yield from read_some_data_from_somewhere()
   if not chunk:
      return
   yield chunk

Предупреждение
yield запрещен внутри async def .
Примечание
Это не стандартная coroutine, поэтому здесь не можно использовать yield from my_coroutine(). aiohttp обрабатывает их внутри.

Также можно использовать StreamReader объект. Припустим мы хотим загрузить файл в другом запросе и вычислить хэш файла:

async def feed_stream(resp, stream):
    h = hashlib.sha256()

    while True:
        chunk = await resp.content.readany()
        if not chunk:
            break
        h.update(chunk)
        s.feed_data(chunk)

    return h.hexdigest()

resp = aiohttp.get('http://httpbin.org/post')
stream = StreamReader()
loop.create_task(aiohttp.post('http://httpbin.org/post', data=stream))

file_hash = await feed_stream(resp, stream)

Так как атрибут содержимого ответа является StreamReader, Вы можете объеденить get и post запросы вместе (aka HTTP pipelining):

r = await aiohttp.request('get', 'http://python.org')
await aiohttp.post('http://httpbin.org/post',
                   data=r.content)

Загрузка предварительно сжатых данных

Чтобы загрузить данные, которые уже сжатые, вызовите функцию запроса с compress=False, а в заголовке значение Content-Encoding должно указывать алгоритм сжатия (обычно это deflate или zlib):

@asyncio.coroutine
def my_coroutine( my_data):
    data = zlib.compress(my_data)
    headers = {'Content-Encoding': 'deflate'}
    yield from aiohttp.post(
        'http://httpbin.org/post', data=data, headers=headers,
        compress=False)

Keep-Alive, пулы соединений и совместное использоваение куки

Чтобы использовать куки в нескольких запросах нужно создать объект ClientSession:

session = aiohttp.ClientSession()
await session.post(
     'http://httpbin.org/cookies/set/my_cookie/my_value')
# получили куки и они сохранились в session
# и при следующем использовании session будут переданы ранее сохраненный куки
#
# читаем куки
async with session.get('http://httpbin.org/cookies') as r:
    json = await r.json()
    assert json['cookies']['my_cookie'] == 'my_value'

Вы можете установить значения заголовков куки по умолчанию, которые будут использоваться во всех запросах через session:

session = aiohttp.ClientSession(
    headers={"Authorization": "Basic bG9naW46cGFzcw=="})
async with s.get("http://httpbin.org/headers") as r:
    json = yield from r.json()
    assert json['headers']['Authorization'] == 'Basic bG9naW46cGFzcw=='

По-умолчанию aiohttp не использует пул соединения, то есть каждый вызов request() будет создавать новое соединение. Для того чтобы использовать пул, то есть не открывать каждый раз новое соединение, можно использовать ClientSession, объекты которого будут сами создавать пулы.

Коннекторы

Для настройки или изменения запросов на транспортном уровне Вы можете передать свой коннектор в aiohttp.request() и «семью» (get, post и т. д.). Например:

conn = aiohttp.TCPConnector()
r = await aiohttp.get('http://python.org', connector=conn)

В ClientSession также можно передавать свой коннектор:

session = aiohttp.ClientSession(connector=aiohttp.TCPConnector())

Ограничение количества соединений в пуле

Чтобы ограничить количество одновременных подключений к одной и той же конечной точки (уникальность конечной точки определяется тремя параметрами: host, port, is_ssl) Вы можете установить именованный параметр limit в коннекторе:

conn = aiohttp.TCPConnector(limit=30)

В этом примере максимальное количество одновременных соединений равно 30.

SSL проверка в TCP сокетах

Конструктор aiohttp.connector.TCPConnector принимает два взаимоисключающих параметра: verify_ssl и ssl_context. По умолчанию для протокола HTTPS коннектор проверяет ssl сертификат. Для отключения проверки нужно установить verify_ssl = False:

conn = aiohttp.TCPConnector(verify_ssl=False)
session = aiohttp.ClientSession(connector=conn)
r = await session.get('https://example.com')

Если Вы хотите использовать свои параметры SSL, например, использовать собственные файлы сертификации, то Вы можете создать экземпляр ssl.SSLContext и передать его в коннектор с помощью именованного параметра ssl_context:

sslcontext = ssl.create_default_context(cafile='/path/to/ca-bundle.crt')
conn = aiohttp.TCPConnector(ssl_context=sslcontext)
session = aiohttp.ClientSession(connector=conn)
r = await session.get('https://example.com')

Также можно проверять сертификаты через MD5, SHA1 или SHA256 fingerprint:

# Попытка подключится к https://www.python.org
# с фальшивым сертификатом
bad_md5 = b'xa2x06Gxadxaaxf5xd8\Jx99^by;x06='
conn = aiohttp.TCPConnector(fingerprint=bad_md5)
session = aiohttp.ClientSession(connector=conn)
exc = None
try:
    r = yield from session.get('https://www.python.org')
except FingerprintMismatch as e:
    exc = e
assert exc is not None
assert exc.expected == bad_md5

# www.python.org cert's actual md5
assert exc.got == b'xca;Ix9cuvx8esx138N$?x15xcaxcb'

Примечание. Этот пример с официальной документации, но при запуске у Вас будет вызвано исключение, так как не учли что сертификат может поменяться и сейчас:

. . .
...     print(exc.got)
. . .
b'xe6x85.78xdfxf3xf2x81gVxf7xf6$xd8xf3'

Сертификат передается в формате DER и если у Вас он в PEM, то Вам нужно конвертировать его в DER, например так:

openssl x509 -in crt.pem -inform PEM -outform DER > crt.der.

Совет: для конвертирования с шестнадцатеричного значения в двоичную байт-строку Вы можете использовать binascii.unhexlify:

>>> from binascii import unhexlify
>>> md5_hex = 'ca3b499c75768e7313384e243f15cacb'
>>> print(unhexlify(md5_hex) == b'xca;Ix9cuvx8esx138N$?x15xcaxcb')
True

Сокет домена UNIX (Unix domain sockets)

Если Ваш HTTP сервер использует сокет домена UNIX вы можете использовать aiohttp.connector.UnixConnector:

conn = aiohttp.UnixConnector(path='/path/to/socket')
r = await aiohttp.get('http://python.org', connector=conn)

Поддержка прокси

Для использования прокси Вы должны использовать aiohttp.connector.ProxyConnector:

conn = aiohttp.ProxyConnector(proxy="http://some.proxy.com")
r = await aiohttp.get('http://python.org',
                      connector=conn)

ProxyConnector также поддерживает прокси-авторизацию:

conn = aiohttp.ProxyConnector(
   proxy="http://some.proxy.com",
   proxy_auth=aiohttp.BasicAuth('user', 'pass'))
session = aiohttp.ClientSession(connector=conn)
async with session.get('http://python.org') as r:
    assert r.status == 200

Учетные данные могут быть переданы в урле:

conn = aiohttp.ProxyConnector(
    proxy="http://user:pass@some.proxy.com")
session = aiohttp.ClientSession(connector=conn)
async with session.get('http://python.org') as r:
    assert r.status == 200

Коды состояния ответов

Вы можете проверить код состояния ответа:

async with aiohttp.get('http://httpbin.org/get') as r:
    assert r.status == 200

Заголовки ответа

Вы можете посмотреть заголовки ответа сервера:

<_cimultidictproxy data-blogger-escaped-17:11:49="" data-blogger-escaped-19="" data-blogger-escaped-2015="" data-blogger-escaped-application="" data-blogger-escaped-at="" data-blogger-escaped-dec="" data-blogger-escaped-gmt="" data-blogger-escaped-json="" data-blogger-escaped-keep-alive="" data-blogger-escaped-nginx="" data-blogger-escaped-true="">

По RFC 7230, имена заголовков HTTP нечувствительны к регистру, поэтому Вы можете получить значение не в зависимости от регистра ключа:

>>> r.headers['Content-Type']
'application/json'
>>> r.headers.get('content-type')
'application/json'

Куки ответа

Если ответ содержит некоторые куки, Вы можете получить быстрый доступ к ним:

url = 'http://example.com/some/cookie/setting/url'
async with aiohttp.get(url) as r:
    print(r.cookies['example_cookie_name'])

Примечание:
Для того чтобы собрать все куки со всей цепочки перенаправлений нужно использовать aiohttp.ClientSession, иначе ответ будет содержать только куки с последнего элемента в цепочки перенаправлений.
История ответа
Если запрос был перенаправлен, Вы можете посмотреть историю ответов используя атрибут history в виде кортежа. Если перенаправлений не было или allow_redirects=False, то будет возвращена пустаой кортеж.

Таймауты

Вы должны использовать asyncio.wait_for() если Вы хотите ограничить время ожидание ответа:

>>> asyncio.wait_for(aiohttp.get('http://github.com'),
...                             0.001)
Traceback (most recent call last):
  File "", line 1, in 
asyncio.TimeoutError()
Или обернуть вызов клиента в контекстный менеджер Timeout:
with aiohttp.Timeout(0.001):
    async with aiohttp.get('https://github.com') as r:
        await r.text()

Примечание
Таймаут устанавливает ограничение не на загрузку ответа, а на ожидание получения
первого бита информации.

All public names from submodules
errors, multipart, parsers, protocol, utils,
websocket and wsgi are exported into aiohttp
namespace.

aiohttp.errors module¶

http related errors.

exception aiohttp.errors.DisconnectedError[source]

Bases: Exception

Disconnected.

exception aiohttp.errors.ClientDisconnectedError[source]

Bases: aiohttp.errors.DisconnectedError

Client disconnected.

exception aiohttp.errors.ServerDisconnectedError[source]

Bases: aiohttp.errors.DisconnectedError

Server disconnected.

exception aiohttp.errors.HttpProcessingError(*, code=None, message=», headers=None)[source]

Bases: Exception

Http error.

Shortcut for raising http errors with custom code, message and headers.

Parameters:
  • code (int) – HTTP Error code.
  • message (str) – (optional) Error message.
  • of [tuple] headers (list) – (optional) Headers to be sent in response.
code = 0
message = »
exception aiohttp.errors.BadHttpMessage(message, *, headers=None)[source]

Bases: aiohttp.errors.HttpProcessingError

code = 400
message = ‘Bad Request’
exception aiohttp.errors.HttpMethodNotAllowed(*, code=None, message=», headers=None)[source]

Bases: aiohttp.errors.HttpProcessingError

code = 405
message = ‘Method Not Allowed’
exception aiohttp.errors.HttpBadRequest(message, *, headers=None)[source]

Bases: aiohttp.errors.BadHttpMessage

code = 400
message = ‘Bad Request’
exception aiohttp.errors.HttpProxyError(*, code=None, message=», headers=None)[source]

Bases: aiohttp.errors.HttpProcessingError

Http proxy error.

Raised in aiohttp.connector.ProxyConnector if
proxy responds with status other than 200 OK
on CONNECT request.

exception aiohttp.errors.BadStatusLine(line=»)[source]

Bases: aiohttp.errors.BadHttpMessage

exception aiohttp.errors.LineTooLong(line, limit=’Unknown’)[source]

Bases: aiohttp.errors.BadHttpMessage

Bases: aiohttp.errors.BadHttpMessage

exception aiohttp.errors.ClientError[source]

Bases: Exception

Base class for client connection errors.

exception aiohttp.errors.ClientHttpProcessingError[source]

Bases: aiohttp.errors.ClientError

Base class for client http processing errors.

exception aiohttp.errors.ClientConnectionError[source]

Bases: aiohttp.errors.ClientError

Base class for client socket errors.

exception aiohttp.errors.ClientOSError[source]

Bases: aiohttp.errors.ClientConnectionError, OSError

OSError error.

exception aiohttp.errors.ClientTimeoutError[source]

Bases: aiohttp.errors.ClientConnectionError, concurrent.futures._base.TimeoutError

Client connection timeout error.

exception aiohttp.errors.ProxyConnectionError[source]

Bases: aiohttp.errors.ClientConnectionError

Proxy connection error.

Raised in aiohttp.connector.ProxyConnector if
connection to proxy can not be established.

exception aiohttp.errors.ClientRequestError[source]

Bases: aiohttp.errors.ClientHttpProcessingError

Connection error during sending request.

exception aiohttp.errors.ClientResponseError[source]

Bases: aiohttp.errors.ClientHttpProcessingError

Connection error during reading response.

exception aiohttp.errors.FingerprintMismatch(expected, got, host, port)[source]

Bases: aiohttp.errors.ClientConnectionError

SSL certificate does not match expected fingerprint.

exception aiohttp.errors.WSServerHandshakeError(*, code=None, message=», headers=None)[source]

Bases: aiohttp.errors.HttpProcessingError

websocket server handshake error.

exception aiohttp.errors.WSClientDisconnectedError[source]

Bases: aiohttp.errors.ClientDisconnectedError

Deprecated.

aiohttp.helpers module¶

Various helper functions

class aiohttp.helpers.FormData(fields=())[source]

Bases: object

Helper class for multipart/form-data and
application/x-www-form-urlencoded body generation.

add_field(name, value, *, content_type=None, filename=None, content_transfer_encoding=None)[source]
add_fields(*fields)[source]
content_type
is_multipart
aiohttp.helpers.parse_mimetype(mimetype)[source]

Parses a MIME type into its components.

Parameters: mimetype (str) – MIME type
Returns: 4 element tuple for MIME type, subtype, suffix and parameters
Return type: tuple

Example:

>>> parse_mimetype('text/html; charset=utf-8')
('text', 'html', '', {'charset': 'utf-8'})
class aiohttp.helpers.Timeout(timeout, *, loop=None)[source]

Bases: object

Timeout context manager.

Useful in cases when you want to apply timeout logic around block
of code or in cases when asyncio.wait_for is not suitable. For example:

>>> with aiohttp.Timeout(0.001):
...     async with aiohttp.get('https://github.com') as r:
...         await r.text()
Parameters:
  • timeout – timeout value in seconds
  • loop – asyncio compatible event loop

aiohttp.multipart module¶

class aiohttp.multipart.MultipartReader(headers, content)[source]

Bases: object

Multipart body reader.

at_eof()[source]

Returns True if the final boundary was reached or
False otherwise.

Return type: bool
fetch_next_part()[source]

Returns the next body part reader.

classmethod from_response(response)[source]

Constructs reader instance from HTTP response.

Parameters: responseClientResponse instance
multipart_reader_cls = None

Multipart reader class, used to handle multipart/* body parts.
None points to type(self)

next()[source]

Emits the next multipart body part.

part_reader_cls

Body part reader class for non multipart/* content types.

alias of BodyPartReader

release()[source]

Reads all the body parts to the void till the final boundary.

response_wrapper_cls

Response wrapper, used when multipart readers constructs from response.

alias of MultipartResponseWrapper

class aiohttp.multipart.MultipartWriter(subtype=’mixed’, boundary=None)[source]

Bases: object

Multipart body writer.

append(obj, headers=None)[source]

Adds a new body part to multipart writer.

append_form(obj, headers=None)[source]

Helper to append form urlencoded part.

append_json(obj, headers=None)[source]

Helper to append JSON part.

boundary
part_writer_cls

Body part reader class for non multipart/* content types.

alias of BodyPartWriter

serialize()[source]

Yields multipart byte chunks.

class aiohttp.multipart.BodyPartReader(boundary, headers, content)[source]

Bases: object

Multipart reader for single body part.

at_eof()[source]

Returns True if the boundary was reached or
False otherwise.

Return type: bool
chunk_size = 8192
decode(data)[source]

Decodes data according the specified Content-Encoding
or Content-Transfer-Encoding headers value.

Supports gzip, deflate and identity encodings for
Content-Encoding header.

Supports base64, quoted-printable encodings for
Content-Transfer-Encoding header.

Parameters: data (bytearray) – Data to decode.
Raises: RuntimeError — if encoding is unknown.
Return type: bytes
filename

Returns filename specified in Content-Disposition header or None
if missed or header is malformed.

form(*, encoding=None)[source]

Lke read(), but assumes that body parts contains form
urlencoded data.

Parameters: encoding (str) – Custom form encoding. Overrides specified
in charset param of Content-Type header
get_charset(default=None)[source]

Returns charset parameter from Content-Type header or default.

json(*, encoding=None)[source]

Lke read(), but assumes that body parts contains JSON data.

Parameters: encoding (str) – Custom JSON encoding. Overrides specified
in charset param of Content-Type header
next()[source]
read(*, decode=False)[source]

Reads body part data.

Parameters: decode (bool) – Decodes data following by encoding
method from Content-Encoding header. If it missed
data remains untouched
Return type: bytearray
read_chunk(size=8192)[source]

Reads body part content chunk of the specified size.

Parameters: size (int) – chunk size
Return type: bytearray
readline()[source]

Reads body part by line by line.

Return type: bytearray
release()[source]

Lke read(), but reads all the data to the void.

Return type: None
text(*, encoding=None)[source]

Lke read(), but assumes that body part contains text data.

Parameters: encoding (str) – Custom text encoding. Overrides specified
in charset param of Content-Type header
Return type: str
class aiohttp.multipart.BodyPartWriter(obj, headers=None, *, chunk_size=8192)[source]

Bases: object

Multipart writer for single body part.

filename

Returns filename specified in Content-Disposition header or None
if missed.

serialize()[source]

Yields byte chunks for body part.

set_content_disposition(disptype, **params)[source]

Sets Content-Disposition header.

Parameters:
  • disptype (str) – Disposition type: inline, attachment, form-data.
    Should be valid extension token (see RFC 2183)
  • params (dict) – Disposition params
exception aiohttp.multipart.BadContentDispositionHeader[source]

Bases: RuntimeWarning

exception aiohttp.multipart.BadContentDispositionParam[source]

Bases: RuntimeWarning

aiohttp.multipart.parse_content_disposition(header)[source]
aiohttp.multipart.content_disposition_filename(params)[source]

aiohttp.parsers module¶

Parser is a generator function (NOT coroutine).

Parser receives data with generator’s send() method and sends data to
destination DataQueue. Parser receives ParserBuffer and DataQueue objects
as a parameters of the parser call, all subsequent send() calls should
send bytes objects. Parser sends parsed term to destination buffer with
DataQueue.feed_data() method. DataQueue object should implement two methods.
feed_data() — parser uses this method to send parsed protocol data.
feed_eof() — parser uses this method for indication of end of parsing stream.
To indicate end of incoming data stream EofStream exception should be sent
into parser. Parser could throw exceptions.

There are three stages:

  • Data flow chain:

    1. Application creates StreamParser object for storing incoming data.

    2. StreamParser creates ParserBuffer as internal data buffer.

    3. Application create parser and set it into stream buffer:

      parser = HttpRequestParser()
      data_queue = stream.set_parser(parser)

    1. At this stage StreamParser creates DataQueue object and passes it
      and internal buffer into parser as an arguments.

      def set_parser(self, parser):

      output = DataQueue()
      self.p = parser(output, self._input)
      return output

    2. Application waits data on output.read()

      while True:

      msg = yield from output.read()

  • Data flow:

    1. asyncio’s transport reads data from socket and sends data to protocol
      with data_received() call.
    2. Protocol sends data to StreamParser with feed_data() call.
    3. StreamParser sends data into parser with generator’s send() method.
    4. Parser processes incoming data and sends parsed data
      to DataQueue with feed_data()
    5. Application received parsed data from DataQueue.read()
  • Eof:

    1. StreamParser receives eof with feed_eof() call.
    2. StreamParser throws EofStream exception into parser.
    3. Then it unsets parser.
_SocketSocketTransport ->
-> “protocol” -> StreamParser -> “parser” -> DataQueue <- “application”
exception aiohttp.parsers.EofStream[source]

Bases: Exception

eof stream indication.

class aiohttp.parsers.StreamParser(*, loop=None, buf=None, limit=65536, eof_exc_class=<class ‘RuntimeError’>, **kwargs)[source]

Bases: object

StreamParser manages incoming bytes stream and protocol parsers.

StreamParser uses ParserBuffer as internal buffer.

set_parser() sets current parser, it creates DataQueue object
and sends ParserBuffer and DataQueue into parser generator.

unset_parser() sends EofStream into parser and then removes it.

at_eof()[source]
exception()[source]
feed_data(data)[source]

send data to current parser or store in buffer.

feed_eof()[source]

send eof to all parsers, recursively.

output
set_exception(exc)[source]
set_parser(parser, output=None)[source]

set parser to stream. return parser’s DataQueue.

set_transport(transport)[source]
unset_parser()[source]

unset parser, send eof to the parser and then remove it.

class aiohttp.parsers.StreamProtocol(*, loop=None, disconnect_error=<class ‘RuntimeError’>, **kwargs)[source]

Bases: asyncio.streams.FlowControlMixin, asyncio.protocols.Protocol

Helper class to adapt between Protocol and StreamReader.

connection_lost(exc)[source]
connection_made(transport)[source]
data_received(data)[source]
eof_received()[source]
is_connected()[source]
class aiohttp.parsers.ParserBuffer(*args)[source]

Bases: object

ParserBuffer is NOT a bytearray extension anymore.

ParserBuffer provides helper methods for parsers.

exception()[source]
extend(data)[source]
feed_data(data)[source]
read(size)[source]

read() reads specified amount of bytes.

readsome(size=None)[source]

reads size of less amount of bytes.

readuntil(stop, limit=None)[source]
set_exception(exc)[source]
skip(size)[source]

skip() skips specified amount of bytes.

skipuntil(stop)[source]

skipuntil() reads until stop bytes sequence.

wait(size)[source]

wait() waits for specified amount of bytes
then returns data without changing internal buffer.

waituntil(stop, limit=None)[source]

waituntil() reads until stop bytes sequence.

class aiohttp.parsers.LinesParser(limit=65536)[source]

Bases: object

Lines parser.

Lines parser splits a bytes stream into a chunks of data, each chunk ends
with n symbol.

class aiohttp.parsers.ChunksParser(size=8192)[source]

Bases: object

Chunks parser.

Chunks parser splits a bytes stream into a specified
size chunks of data.

aiohttp.signals module¶

class aiohttp.signals.BaseSignal[source]

Bases: list

copy()
sort()
class aiohttp.signals.DebugSignal[source]

Bases: aiohttp.signals.BaseSignal

send(ordinal, name, *args, **kwargs)
class aiohttp.signals.PostSignal[source]

Bases: aiohttp.signals.DebugSignal

class aiohttp.signals.PreSignal[source]

Bases: aiohttp.signals.DebugSignal

ordinal()
class aiohttp.signals.Signal(app)[source]

Bases: aiohttp.signals.BaseSignal

Coroutine-based signal implementation.

To connect a callback to a signal, use any list method.

Signals are fired using the send() coroutine, which takes named
arguments.

send(*args, **kwargs)[source]

Sends data to all registered receivers.

aiohttp.streams module¶

exception aiohttp.streams.EofStream[source]

Bases: Exception

eof stream indication.

class aiohttp.streams.StreamReader(limit=65536, loop=None)[source]

Bases: asyncio.streams.StreamReader, aiohttp.streams.AsyncStreamReaderMixin

An enhancement of asyncio.StreamReader.

Supports asynchronous iteration by line, chunk or as available:

async for line in reader:
    ...
async for chunk in reader.iter_chunked(1024):
    ...
async for slice in reader.iter_any():
    ...
at_eof()[source]

Return True if the buffer is empty and ‘feed_eof’ was called.

exception()[source]
feed_data(data)[source]
feed_eof()[source]
is_eof()[source]

Return True if ‘feed_eof’ was called.

read(n=-1)[source]
read_nowait(n=None)[source]
readany()[source]
readexactly(n)[source]
readline()[source]
set_exception(exc)[source]
total_bytes = 0
unread_data(data)[source]

rollback reading some data from stream, inserting it to buffer head.

wait_eof()[source]
class aiohttp.streams.DataQueue(*, loop=None)[source]

Bases: object

DataQueue is a general-purpose blocking queue with one reader.

at_eof()[source]
exception()[source]
feed_data(data, size=0)[source]
feed_eof()[source]
is_eof()[source]
read()[source]
set_exception(exc)[source]
class aiohttp.streams.ChunksQueue(*, loop=None)[source]

Bases: aiohttp.streams.DataQueue

Like a DataQueue, but for binary chunked data transfer.

read()[source]
readany()
class aiohttp.streams.FlowControlStreamReader(stream, limit=65536, *args, **kwargs)[source]

Bases: aiohttp.streams.StreamReader

feed_data(data, size=0)
read(n=-1)
readany()
readexactly(n)
readline()
class aiohttp.streams.FlowControlDataQueue(stream, *, limit=65536, loop=None)[source]

Bases: aiohttp.streams.DataQueue

FlowControlDataQueue resumes and pauses an underlying stream.

It is a destination for parsed data.

feed_data(data, size)[source]
read()[source]
class aiohttp.streams.FlowControlChunksQueue(stream, *, limit=65536, loop=None)[source]

Bases: aiohttp.streams.FlowControlDataQueue

read()
readany()

aiohttp.websocket module¶

WebSocket protocol versions 13 and 8.

aiohttp.websocket.WebSocketParser(out, buf)[source]
class aiohttp.websocket.WebSocketWriter(writer, *, use_mask=False, random=<random.Random object at 0x347fdc8>)[source]

Bases: object

close(code=1000, message=b»)

Close the websocket, sending the specified code and message.

ping(message=b»)

Send ping message.

pong(message=b»)

Send pong message.

send(message, binary=False)

Send a frame over the websocket with message as its payload.

aiohttp.websocket.do_handshake(method, headers, transport, protocols=())[source]

Prepare WebSocket handshake.

It return http response code, response headers, websocket parser,
websocket writer. It does not perform any IO.

protocols is a sequence of known protocols. On successful handshake,
the returned response headers contain the first protocol in this list
which the server also knows.

class aiohttp.websocket.Message(tp, data, extra)

Bases: tuple

data

Alias for field number 1

Alias for field number 2

tp

Alias for field number 0

exception aiohttp.websocket.WebSocketError(code, message)[source]

Bases: Exception

WebSocket protocol parser error.

aiohttp.wsgi module¶

wsgi server.

TODO:
  • proxy protocol
  • x-forward security
  • wsgi file support (os.sendfile)
class aiohttp.wsgi.WSGIServerHttpProtocol(app, readpayload=False, is_ssl=False, *args, **kw)[source]

Bases: aiohttp.server.ServerHttpProtocol

HTTP Server that implements the Python WSGI protocol.

It uses ‘wsgi.async’ of ‘True’. ‘wsgi.input’ can behave differently
depends on ‘readpayload’ constructor parameter. If readpayload is set to
True, wsgi server reads all incoming data into BytesIO object and
sends it as ‘wsgi.input’ environ var. If readpayload is set to false
‘wsgi.input’ is a StreamReader and application should read incoming
data with “yield from environ[‘wsgi.input’].read()”. It defaults to False.

SCRIPT_NAME = »
create_wsgi_environ(message, payload)[source]
create_wsgi_response(message)[source]
handle_request(message, payload)[source]

Handle a single HTTP request

Я использую aiohttp с методом limited_as_completed для ускорения удаления (около 100 миллионов статических страниц веб-сайта). Однако код останавливается через несколько минут и возвращает ошибку TimeoutError. Я пробовал несколько вещей, но все равно не смог предотвратить поднять asyncio.TimeoutError. Могу я спросить, как я могу проигнорировать ошибку и продолжить?

Код, который я использую:

N=123
import html
from lxml import etree
import requests
import asyncio 
import aiohttp
from aiohttp import ClientSession, TCPConnector
import pandas as pd
import re 
import csv 
import time
from itertools import islice
import sys
from contextlib import suppress

start = time.time()
data = {}
data['name'] = []
filename = "C:\Users\xxxx"+ str(N) + ".csv"

def limited_as_completed(coros, limit):
    futures = [
        asyncio.ensure_future(c)
        for c in islice(coros, 0, limit)
    ]
    async def first_to_finish():
        while True:
            await asyncio.sleep(0)
            for f in futures:
                if f.done():
                    futures.remove(f)
                    try:
                        newf = next(coros)
                        futures.append(
                            asyncio.ensure_future(newf))
                    except StopIteration as e:
                        pass
                    return f.result()
    while len(futures) > 0:
        yield first_to_finish()

async def get_info_byid(i, url, session):
    async with session.get(url,timeout=20) as resp:
        print(url)
        with suppress(asyncio.TimeoutError):
            r = await resp.text()
            name = etree.HTML(r).xpath('//h2[starts-with(text(),"Customer Name")]/text()')
            data['name'].append(name)
            dataframe = pd.DataFrame(data)
            dataframe.to_csv(filename, index=False, sep='|')

limit = 1000
async def print_when_done(tasks):
    for res in limited_as_completed(tasks, limit):
        await res

url = "http://xxx.{}.html"
loop = asyncio.get_event_loop()

async def main():
    connector = TCPConnector(limit=10)
    async with ClientSession(connector=connector,headers=headers,raise_for_status=False) as session:
        coros = (get_info_byid(i, url.format(i), session) for i in range(N,N+1000000))
        await print_when_done(coros)

loop.run_until_complete(main())
loop.close()
print("took", time.time() - start, "seconds.")

Журнал ошибок:

Traceback (most recent call last):
  File "C:Usersxxx.py", line 111, in <module>
    loop.run_until_complete(main())
  File "C:UsersxxAppDataLocalProgramsPythonPython37-32libasynciobase_events.py", line 573, in run_until_complete
    return future.result()
  File "C:Usersxxx.py", line 109, in main
    await print_when_done(coros)
  File "C:Usersxxx.py", line 98, in print_when_done
    await res
  File "C:Usersxxx.py", line 60, in first_to_finish
    return f.result()
  File "C:Usersxxx.py", line 65, in get_info_byid
    async with session.get(url,timeout=20) as resp:
  File "C:UsersxxAppDataLocalProgramsPythonPython37-32libsite-packagesaiohttpclient.py", line 855, in __aenter__
    self._resp = await self._coro
  File "C:UsersxxAppDataLocalProgramsPythonPython37-32libsite-packagesaiohttpclient.py", line 391, in _request
    await resp.start(conn)
  File "C:UsersxxAppDataLocalProgramsPythonPython37-32libsite-packagesaiohttpclient_reqrep.py", line 770, in start
    self._continue = None
  File "C:UsersxxAppDataLocalProgramsPythonPython37-32libsite-packagesaiohttphelpers.py", line 673, in __exit__
    raise asyncio.TimeoutError from None
concurrent.futures._base.TimeoutError

Я пытался
1) добавить expect asyncio.TimeoutError: pass. Не работает

async def get_info_byid(i, url, session):
    async with session.get(url,timeout=20) as resp:
        print(url)
        try:
            r = await resp.text()
            name = etree.HTML(r).xpath('//h2[starts-with(text(),"Customer Name")]/text()')
            data['name'].append(name)
            dataframe = pd.DataFrame(data)
            dataframe.to_csv(filename, index=False, sep='|')
        except asyncio.TimeoutError:
            pass

2) подавить (asyncio.TimeoutError), как показано выше. Не работает

Я только вчера изучил aiohttp, так что, может быть, в моем коде есть другие ошибки, которые вызывают ошибку тайм-аута только через несколько минут работы? Большое спасибо, если кто знает, как с этим бороться!

Понравилась статья? Поделить с друзьями:

Читайте также:

  • Timeflow семейные посиделки баг как исправить
  • Timed out waiting for response from mongoose steam vr needs to be restarted как исправить
  • Time warpers как изменить ник на пиратке
  • Time slice error
  • Time service ошибка 134

  • 0 0 голоса
    Рейтинг статьи
    Подписаться
    Уведомить о
    guest

    0 комментариев
    Старые
    Новые Популярные
    Межтекстовые Отзывы
    Посмотреть все комментарии