this problem was fixed in a recent PR to python
https://github.com/python/cpython/pull/10305
if you want, you can make this change locally to make it work for you right away, without waiting for a python and anaconda release.
The communication protocol between processes uses pickling, and the pickled data is prefixed with the size of the pickled data. For your method, all arguments together are pickled as one object.
You produced an object that when pickled is larger than fits in a i
struct formatter (a four-byte signed integer), which breaks the assumptions the code has made.
You could delegate reading of your dataframes to the child process instead, only sending across the metadata needed to load the dataframe. Their combined size is nearing 1GB, way too much data to share over a pipe between your processes.
Quoting from the Programming guidelines section:
Better to inherit than pickle/unpickle
When using the
spawn
orforkserver
start methods many types frommultiprocessing
need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process.
If you are not running on Windows and use either the spawn
or forkserver
methods, you could load your dataframes as globals before starting your subprocesses, at which point the child processes will ‘inherit’ the data via the normal OS copy-on-write memory page sharing mechanisms.
Note that this limit was raised for non-Windows systems in Python 3.8, to an unsigned long long (8 bytes), and so you can now send and receive 4 EiB of data. See this commit, and Python issues #35152 and #17560.
If you can’t upgrade and you can’t make use of resource inheriting, and are not running on Windows, then use this patch:
import functools
import logging
import struct
import sys
logger = logging.getLogger()
def patch_mp_connection_bpo_17560():
"""Apply PR-10305 / bpo-17560 connection send/receive max size update
See the original issue at https://bugs.python.org/issue17560 and
https://github.com/python/cpython/pull/10305 for the pull request.
This only supports Python versions 3.3 - 3.7, this function
does nothing for Python versions outside of that range.
"""
patchname = "Multiprocessing connection patch for bpo-17560"
if not (3, 3) < sys.version_info < (3, 8):
logger.info(
patchname + " not applied, not an applicable Python version: %s",
sys.version
)
return
from multiprocessing.connection import Connection
orig_send_bytes = Connection._send_bytes
orig_recv_bytes = Connection._recv_bytes
if (
orig_send_bytes.__code__.co_filename == __file__
and orig_recv_bytes.__code__.co_filename == __file__
):
logger.info(patchname + " already applied, skipping")
return
@functools.wraps(orig_send_bytes)
def send_bytes(self, buf):
n = len(buf)
if n > 0x7fffffff:
pre_header = struct.pack("!i", -1)
header = struct.pack("!Q", n)
self._send(pre_header)
self._send(header)
self._send(buf)
else:
orig_send_bytes(self, buf)
@functools.wraps(orig_recv_bytes)
def recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("!i", buf.getvalue())
if size == -1:
buf = self._recv(8)
size, = struct.unpack("!Q", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size)
Connection._send_bytes = send_bytes
Connection._recv_bytes = recv_bytes
logger.info(patchname + " applied")
проблема
Я хочу сделать технику с использованием многопроцессорного модуля (multiprocessing.Pool.starmap()
. Однако он дает сообщение об ошибке следующим образом. Я думаю, это сообщение об ошибке имеет размер входов (2147483647 = 2 ^ 31 — 1?), так как тот же код работал плавно для фракции (frac=0.05)
входных данных (train_scala, test, ts). Я преобразовываю типы данных как можно меньше, но это не улучшается.
Версия anaconda — 4.3.30, а версия Python — 3,6 (64 бит). И размер памяти системы составляет более 128 ГБ с более чем 20 ядрами. Вы хотите предложить любой указатель или решение для решения этой проблемы? Если эта проблема вызвана большими данными для модуля многопроцессорности, сколько меньших данных следует использовать для использования модуля многопроцессорности на Python3?
Код:
from multiprocessing import Pool, cpu_count
from itertools import repeat
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)
Сообщение об ошибке:
Traceback (most recent call last):
File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
print('----Pool starmap start----')
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
put(task)
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
Дополнительная информация
- historyCutoffs — это список целых чисел
- train_scala — это панда DataFrame (377 МБ)
- тест — это pandas DataFrame (15 МБ)
- ts — это панда DataFrame (547 МБ)
- ul_parts_path — это список каталогов (строка)
- is_train_seq — список булевых
Дополнительный код: метод multiprocess_FE
def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
train_dict = {}
ts_dict = {}
msno_dict = {}
ul_dict = {}
if is_train == True:
train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
else:
train_dict[historyCutoff] = test
msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))
ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt")) ##.sample(frac=0.01, replace=False)
ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)
Проблема
Я хочу разработать функции, используя модуль многопроцессорности (multiprocessing.Pool.starmap()
. Однако он выдает следующее сообщение об ошибке. Я предполагаю, что это сообщение об ошибке связано с размером входных данных (2147483647 = 2 ^ 31-1?), Поскольку тот же код плавно работал для части (frac=0.05)
фреймов входных данных (train_scala, test, ts). Я конвертирую типы фрейма данных как можно меньшего размера, однако от этого не становится лучше.
Версия anaconda — 4.3.30, а версия Python — 3.6 (64 бит). И объем памяти системы превышает 128 ГБ с более чем 20 ядрами. Вы бы хотели предложить какой-либо указатель или решение для решения этой проблемы? Если эта проблема вызвана большим объемом данных для модуля многопроцессорности, насколько меньше данных я должен использовать для использования модуля многопроцессорности на Python3?
Код:
from multiprocessing import Pool, cpu_count
from itertools import repeat
p = Pool(8)
is_train_seq = [True]*len(historyCutoffs)+[False]
config_zip = zip(historyCutoffs, repeat(train_scala), repeat(test), repeat(ts), ul_parts_path, repeat(members), is_train_seq)
p.starmap(multiprocess_FE, config_zip)
Сообщение об ошибке:
Traceback (most recent call last):
File "main_1210_FE_scala_multiprocessing.py", line 705, in <module>
print('----Pool starmap start----')
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 274, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/pool.py", line 424, in _handle_tasks
put(task)
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/home/dmlab/ksedm1/anaconda3/envs/py36/lib/python3.6/multiprocessing/connection.py", line 393, in _send_bytes
header = struct.pack("!i", n)
struct.error: 'i' format requires -2147483648 <= number <= 2147483647
Дополнительная информация
- historyCutoffs — это список целых чисел
- train_scala — это фрейм данных pandas (377 МБ)
- test — это pandas DataFrame (15 МБ)
- ts — это фрейм данных pandas (547 МБ)
- ul_parts_path — это список каталогов (строка)
- is_train_seq — это список логических значений
Дополнительный код: метод multiprocess_FE
def multiprocess_FE(historyCutoff, train_scala, test, ts, ul_part_path, members, is_train):
train_dict = {}
ts_dict = {}
msno_dict = {}
ul_dict = {}
if is_train == True:
train_dict[historyCutoff] = train_scala[train_scala.historyCutoff == historyCutoff]
else:
train_dict[historyCutoff] = test
msno_dict[historyCutoff] = set(train_dict[historyCutoff].msno)
print('length of msno is {:d} in cutoff {:d}'.format(len(msno_dict[historyCutoff]), historyCutoff))
ts_dict[historyCutoff] = ts[(ts.transaction_date <= historyCutoff) & (ts.msno.isin(msno_dict[historyCutoff]))]
print('length of transaction is {:d} in cutoff {:d}'.format(len(ts_dict[historyCutoff]), historyCutoff))
ul_part = pd.read_csv(gzip.open(ul_part_path, mode="rt")) ##.sample(frac=0.01, replace=False)
ul_dict[historyCutoff] = ul_part[ul_part.msno.isin(msno_dict[historyCutoff])]
train_dict[historyCutoff] = enrich_by_features(historyCutoff, train_dict[historyCutoff], ts_dict[historyCutoff], ul_dict[historyCutoff], members, is_train)
2 ответа
Лучший ответ
Протокол обмена данными между процессами использует травление , и к маринованным данным добавляется префикс размера маринованных данных. Для вашего метода все аргументы вместе объединяются как один объект.
Вы создали объект, который в маринованном виде больше, чем помещается в средство форматирования структуры i
(четырехбайтовое целое число со знаком), что противоречит предположениям, сделанным в коде.
Вместо этого вы можете делегировать чтение фреймов данных дочернему процессу, отправляя только метаданные, необходимые для загрузки фрейма данных. Их общий размер приближается к 1 ГБ — слишком много данных для обмена между вашими процессами по каналу.
Цитата из раздела «Рекомендации по программированию » :
Лучше наследовать, чем мариновать / отрубать
При использовании методов запуска
spawn
илиforkserver
многие типы изmultiprocessing
должны быть выбираемыми, чтобы дочерние процессы могли их использовать. Однако обычно следует избегать отправки общих объектов другим процессам с помощью каналов или очередей. Вместо этого вам следует организовать программу так, чтобы процесс, которому требуется доступ к совместно используемому ресурсу, созданному где-то еще, мог унаследовать его от процесса-предка.
Если вы не работаете в Windows и используете методы spawn
или forkserver
, вы можете загружать фреймы данных как глобальные перед запуском подпроцессов, после чего дочерние процессы будут «наследовать» данные через обычные механизмы совместного использования страниц памяти «копирование при записи».
19
Martijn Pieters
9 Авг 2018 в 07:40
Эта проблема была исправлена в недавнем PR на python https://github.com/python/cpython/pull/10305
При желании вы можете внести это изменение локально, чтобы оно работало на вас сразу же, не дожидаясь выпуска python и anaconda.
11
Alex
19 Ноя 2018 в 04:04
If you keep getting this error when generating key as license for running IOL or IOS On Linux, (also called IOU or IOS On Unix) in EVE-NG
darwin@eve-ng:/$ cd /opt/unetlab/addons/iol/bin
darwin@eve-ng:/opt/unetlab/addons/iol/bin$ python keygen.py
*********************************************************************
Cisco IOU License Generator – Kal 2011, python port of 2006 C version
hostid=bada0c0f, hostname=eve-ng, ioukey=bada0e51
Traceback (most recent call last):
File “./keygen.py”, line 18, in <module>
md5input=iouPad1 + iouPad2 + struct.pack(‘!i’, ioukey) + iouPad1
struct.error: ‘i’ format requires -2147483648 <= number <= 2147483647
Step 1:
Solution: Modify the python script, change i to L
From: md5input=iouPad1 + iouPad2 + struct.pack(‘!i’, ioukey) + iouPad1
To: md5input=iouPad1 + iouPad2 + struct.pack(‘!L‘, ioukey) + iouPad1
Run again:
darwin@eve-ng:/opt/unetlab/addons/iol/bin$ python keygen.py
*********************************************************************
Cisco IOU License Generator – Kal 2011, python port of 2006 C version
hostid=bada0c0f, hostname=eve-ng, ioukey=bada0e51
Step 2: Add the following text to ~/.iourc:
[license]
eve-ng = f2630dfba88daedd;
You can disable the phone home feature with something like:
echo ‘127.0.0.127 xml.cisco.com’ >> /etc/hosts
Step 2: Input the license key in “iourc” file
vi iourc
[license]
eve-ng = f2630dfba88daedd;
Step 3: Test IOL
darwin@eve-ng:/opt/unetlab/addons/iol/bin$ LD_LIBRARY_PATH=/opt/unetlab/addons/iol/lib /opt/unetlab/addons/iol/bin/i86bi-linux-l3-adventerprisek9-15.4.2T.bin 1
***************************************************************
IOS On Unix – Cisco Systems confidential, internal use only
Under no circumstances is this software to be provided to any
non Cisco staff or customers. To do so is likely to result
in disciplinary action. Please refer to the IOU Usage policy at
wwwin-iou.cisco.com for more information.
***************************************************************
Restricted Rights Legend
Use, duplication, or disclosure by the Government is
subject to restrictions as set forth in subparagraph
(c) of the Commercial Computer Software – Restricted
Rights clause at FAR sec. 52.227-19 and subparagraph
(c) (1) (ii) of the Rights in Technical Data and Computer
Software clause at DFARS sec. 252.227-7013.
cisco Systems, Inc.
170 West Tasman Drive
San Jose, California 95134-1706
Cisco IOS Software, Linux Software (I86BI_LINUX-ADVENTERPRISEK9-M), Version 15.4(2)T, DEVELOPMENT TEST SOFTWARE
Technical Support: http://www.cisco.com/techsupport
Copyright (c) 1986-2014 by Cisco Systems, Inc.
Compiled Thu 27-Mar-14 01:08 by prod_rel_team
This product contains cryptographic features and is subject to United
–omitted lines —
The communication protocol between processes uses pickling , and the pickled data is prefixed with the size of the pickled data. В протоколе связи между процессами используется pickling , а перед маринованными данными стоит префикс размера маринованных данных. For your method, all arguments together are pickled as one object. Для вашего метода все аргументы вместе рассматриваются как один объект.
You produced an object that when pickled is larger than fits in a i
struct formatter (a four-byte signed integer), which breaks the assumptions the code has made. Вы создали объект, который при мариновании больше, чем помещается в средство форматирования структуры i
(четырехбайтовое целое число со знаком), что нарушает предположения, сделанные кодом.
You could delegate reading of your dataframes to the child process instead, only sending across the metadata needed to load the dataframe. Вместо этого вы можете делегировать чтение ваших фреймов данных дочернему процессу, отправляя только метаданные, необходимые для загрузки фрейма данных. Their combined size is nearing 1GB, way too much data to share over a pipe between your processes. Их общий размер приближается к 1 ГБ, слишком много данных для обмена по каналу между вашими процессами.
Quoting from the Programming guidelines section : Цитата из раздела « Руководство по программированию » :
Better to inherit than pickle/unpickle Лучше унаследовать, чем мариновать/рассолить
When using the
spawn
orforkserver
start methods many types frommultiprocessing
need to be picklable so that child processes can use them. При использовании методов запускаspawn
илиforkserver
многие типы изmultiprocessing
должны быть доступны для выбора, чтобы дочерние процессы могли их использовать. However, one should generally avoid sending shared objects to other processes using pipes or queues. Однако обычно следует избегать отправки общих объектов другим процессам с использованием каналов или очередей. Instead you should arrange the program so that a process which needs access to a shared resource created elsewhere can inherit it from an ancestor process. Вместо этого вы должны организовать программу так, чтобы процесс, которому требуется доступ к совместно используемому ресурсу, созданному в другом месте, мог наследовать его от процесса-предка.
If you are not running on Windows and use either the spawn
or forkserver
methods, you could load your dataframes as globals before starting your subprocesses, at which point the child processes will ‘inherit’ the data via the normal OS copy-on-write memory page sharing mechanisms. Если вы не работаете в Windows и используете методы spawn
или forkserver
, вы можете загрузить свои кадры данных как глобальные перед запуском подпроцессов, после чего дочерние процессы «наследуют» данные. через обычные механизмы совместного использования страниц памяти с копированием при записи ОС.
Note that this limit was raised for non-Windows systems in Python 3.8, to an unsigned long long (8 bytes), and so you can now send and receive 4 EiB of data. Обратите внимание, что это ограничение было увеличено для систем, отличных от Windows, в Python 3.8 до unsigned long long (8 байтов), поэтому теперь вы можете отправлять и получать 4 EiB данных. See this commit , and Python issues #35152 and #17560 . См. этот коммит и проблемы Python #35152 и #17560 .
If you can’t upgrade and you can’t make use of resource inheriting, and are not running on Windows, then use this patch: Если вы не можете выполнить обновление и не можете использовать наследование ресурсов и не работаете в Windows, используйте этот патч:
import functools
import logging
import struct
import sys
logger = logging.getLogger()
def patch_mp_connection_bpo_17560():
"""Apply PR-10305 / bpo-17560 connection send/receive max size update
See the original issue at https://bugs.python.org/issue17560 and
https://github.com/python/cpython/pull/10305 for the pull request.
This only supports Python versions 3.3 - 3.7, this function
does nothing for Python versions outside of that range.
"""
patchname = "Multiprocessing connection patch for bpo-17560"
if not (3, 3) < sys.version_info < (3, 8):
logger.info(
patchname + " not applied, not an applicable Python version: %s",
sys.version
)
return
from multiprocessing.connection import Connection
orig_send_bytes = Connection._send_bytes
orig_recv_bytes = Connection._recv_bytes
if (
orig_send_bytes.__code__.co_filename == __file__
and orig_recv_bytes.__code__.co_filename == __file__
):
logger.info(patchname + " already applied, skipping")
return
@functools.wraps(orig_send_bytes)
def send_bytes(self, buf):
n = len(buf)
if n > 0x7fffffff:
pre_header = struct.pack("!i", -1)
header = struct.pack("!Q", n)
self._send(pre_header)
self._send(header)
self._send(buf)
else:
orig_send_bytes(self, buf)
@functools.wraps(orig_recv_bytes)
def recv_bytes(self, maxsize=None):
buf = self._recv(4)
size, = struct.unpack("!i", buf.getvalue())
if size == -1:
buf = self._recv(8)
size, = struct.unpack("!Q", buf.getvalue())
if maxsize is not None and size > maxsize:
return None
return self._recv(size)
Connection._send_bytes = send_bytes
Connection._recv_bytes = recv_bytes
logger.info(patchname + " applied")
en
this problem was fixed in a recent PR to python https://github.com/python/cpython/pull/10305 эта проблема была исправлена в недавнем PR для python https://github.com/python/cpython/pull/10305
if you want, you can make this change locally to make it work for you right away, without waiting for a python and anaconda release. если вы хотите, вы можете внести это изменение локально, чтобы оно сразу заработало, не дожидаясь выпуска python и anaconda.
en