Leetcode Problem 994- rotting oranges solution won’t work 🙁
So I’ve come across this problem while trying to enhance my skills. I watched the Neetcode solution video to it 2 days ago, where he goes over a detailed solution, and I decided to tackle it myself today since I didn’t have his code fresh on my mind, but I still had the intuition and hence, I thought I could maybe be able to solve it on my own, but oh how I was wrong. After SEVERAL errors thrown that I solved, I got stuck with this error RuntimeError: deque mutated during iteration which I don’t understand. I went through the logic and the solution makes sense to me & I don’t understand why it won’t budge. Watching a solution video and not being able to come to one myself due to some dumb runtime error makes me feel extremely inadequate about my skills and essential makes me feel like garbage. I put a lot of effort into making this solution, yet it just won’t damn run. If anyone could help me out, I would GREATLY appreciate that. Im very hopeless right now.
1 Answer 1
One refactoring keep as much as possible of posted code.
Code
Related
Hot Network Questions
Subscribe to RSS
To subscribe to this RSS feed, copy and paste this URL into your RSS reader.
Site design / logo © 2023 Stack Exchange Inc; user contributions licensed under CC BY-SA . rev 2023.1.10.43144
By clicking “Accept all cookies”, you agree Stack Exchange can store cookies on your device and disclose information in accordance with our Cookie Policy.
Источник
deque mutated during iteration #78
Comments
gbiddison commented Jul 21, 2016 •
I’m seeing this error infrequently (17.7.dev):
It looks like this stops the service advertisement completely.
The text was updated successfully, but these errors were encountered:
stephenrauch commented Jul 24, 2016
I love the fact that you are so hard on this code. Hopefully it will eventually get beaten into shape.
That exception is actually in the threading library. I don’t immediately know how to fix it expect to catch and ignore the exception. But that may also have some unintended consequences. I do not see how zeroconf’s usage of the condition variable is incorrect, but. I will spend some looking at this.
Thanks for reporting this.
stephenrauch commented Jul 24, 2016
Is this python bug biting you?
What version of 3.4 are you using?
stephenrauch commented Jul 24, 2016
Ubuntu 14.04 appears to be using python 3.4.0, so I am pretty sure issue 22185 is the problem. I have checked in a work around in my repo for the python library bug. Would be great if you could give it a try before I merge it.
gbiddison commented Jul 25, 2016
This could be it, my Ubuntu machines are running 3.4.3
I appreciate the feedback, wish I had more time to devote to this library, it’s been very helpful for us.
gbiddison commented Jul 26, 2016
Doing a more thorough audit of python versions here, we are running several versions on Ubuntu from 3.4.0 to 3.4.3
My strategy here is going to be to patch the python versions then wait & see, but looking at the commit history, this exception probably used to get logged & eaten by the general catch-all exception handler around the Engine.run function.
I am also seeing a second error from the same block:
If you like, I can open a separate issue since it’s a different error, but it seems related since it’s roughly the same code.
gbiddison commented Jul 26, 2016
And I am adding the work-around you suggested to my systems now; these race-bugs are pretty intermittent.
Источник
motion_detected fails with exception — «RuntimeError: deque mutated during iteration» #973
<>’s edit
I have a fairly «simple» loop on a Raspberry Pi Zero W that runs two applications: the first basically generates random «arrivals» of a simulated train and activates GPIO pins when those events occur; the second is a function that monitors the state of an external motion detector, and passes that state into a function which will return True if motion is detected.
The exception is as follows:
The offending function is this:
This function is run every 0.1 seconds in the main loop.
Full code can be found here.
I’m totally at a loss here, any tips would be appreciated.
EDIT: I have no idea why the line breaks all got lost in my code snippets.
Beta Was this translation helpful? Give feedback.
1 You must be logged in to vote
Replies: 7 suggested answers · 2 replies
EDIT: I have no idea why the line breaks all got lost in my code snippets.
Use three backticks to fence your code, not one (I’ve edited it)
Beta Was this translation helpful? Give feedback.
1 You must be logged in to vote
What version of gpiozero is this?
Beta Was this translation helpful? Give feedback.
1 You must be logged in to vote
What version of gpiozero is this?
Version 1.6.2 running on Python 3.7.3.
Beta Was this translation helpful? Give feedback.
1 You must be logged in to vote
It looks like you’re polling too fast for it to handle. You could try waiting a little longer.
Alternatively you could try another method of triggering changes from the sensor:
Beta Was this translation helpful? Give feedback.
1 You must be logged in to vote
It looks like you’re polling too fast for it to handle. You could try waiting a little longer.
Alternatively you could try another method of triggering changes from the sensor:
I definitely don’t mind a longer wait. I currently wait for 0.1 seconds, and the code ran for about two days before the exception in that test.
I wasn’t really clear on the wait_for_motion() method; it seemed like it was a blocking call; i.e. my loop would pause until motion occurred.
Beta Was this translation helpful? Give feedback.
1 You must be logged in to vote
I wasn’t really clear on the wait_for_motion() method; it seemed like it was a blocking call; i.e. my loop would pause until motion occurred.
Yes, it’s quite useful if that’s compatible with your program flow. There are 4 different ways of reacting to device changes: polling, blocking, event-driven and source/values. See the FAQ which covers this topic for buttons, and it’s much the same for other devices.
Beta Was this translation helpful? Give feedback.
1 You must be logged in to vote
Looking at the line that caused the exception:
. I’m not sure if any of @bennuttall ‘s workarounds would actually «fix» the problem?
Because a MotionSensor has «noisy» / «jittery» output, GpioZero runs a background thread collecting (noisy) values into self.queue , and when you try to get the .value it returns the (smooth) average of the values in the queue. So the bug is that if you’re very unlucky, the background thread might be attempting to add a new value to the queue at exactly the same time as the average is being calculated, which is what then leads to the deque mutated during iteration error. (and the chances of this bug occurring will be greater on a PiZero than on a Pi4, because the slower CPU in the PiZero will take (slightly) longer to run the average function)
I think the fix is therefore for https://github.com/gpiozero/gpiozero/blob/master/gpiozero/mixins.py#L538 to implement some kind of locking mechanism, so that the fill function doesn’t try to append a value to the queue while the value function is calculating the average of the queue.
His code is normally so comprehensive, that there’s part of me that’s surprised that @waveform80 didn’t already add this locking 😉
Beta Was this translation helpful? Give feedback.
1 You must be logged in to vote
@lurch is absolutely right — the fact this is a deque mutation error from within gpiozero’s code pretty much screams this is a bug. However, in this case I know why this has suddenly appeared and it’s horribly subtle (and yes, a case of me cutting corners!).
Prior to 1.6.x, the default average routine for MotionSensor (or any derivative of SmoothedInputDevice ) was median. Now this never caused an issue because the first thing in the process of calculating the median is to sort the list. The process of sorting the list first duplicates it. Hang on! Doesn’t that involve iteration? Well, yes, but within an explicitly locked portion of the interpreter so it’ll never get interrupted by another Python thread. But in 1.6.x the default average for MotionSensor changed to mean (can’t seem to find the thread/ticket where that got hashed out, but it had to do with giving MotionSensor a less «binary» value as I recall). Anyway, mean (obviously) doesn’t need to sort and so merrily iterates over the deque (which may be mutated during said iteration).
The fix is (as @lurch suggests), throw a lock around the call to self.average (or pass an atomic (as far as Python is concerned) copy of the deque to self.average , although frankly I lean towards the lock as a lighter-weight alternative).
Anyway, I’ll re-open the original ticket and assign it to me in a mo.
Beta Was this translation helpful? Give feedback.
Источник
Queue.Queue vs. collections.deque
I need a queue which multiple threads can put stuff into, and multiple threads may read from.
Python has at least two queue classes, Queue.Queue and collections.deque , with the former seemingly using the latter internally. Both claim to be thread-safe in the documentation.
However, the Queue docs also state:
collections.deque is an alternative implementation of unbounded queues with fast atomic append() and popleft() operations that do not require locking.
Which I guess I don’t quite unterstand: Does this mean deque isn’t fully thread-safe after all?
If it is, I may not fully understand the difference between the two classes. I can see that Queue adds blocking functionality. On the other hand, it loses some deque features like support for the in-operator.
Accessing the internal deque object directly, is
Also, why does Queue employ a mutex for it’s operations when deque is thread-safe already?
7 Answers 7
Queue.Queue and collections.deque serve different purposes. Queue.Queue is intended for allowing different threads to communicate using queued messages/data, whereas collections.deque is simply intended as a datastructure. That’s why Queue.Queue has methods like put_nowait() , get_nowait() , and join() , whereas collections.deque doesn’t. Queue.Queue isn’t intended to be used as a collection, which is why it lacks the likes of the in operator.
It boils down to this: if you have multiple threads and you want them to be able to communicate without the need for locks, you’re looking for Queue.Queue ; if you just want a queue or a double-ended queue as a datastructure, use collections.deque .
Finally, accessing and manipulating the internal deque of a Queue.Queue is playing with fire — you really don’t want to be doing that.
If all you’re looking for is a thread-safe way to transfer objects between threads, then both would work (both for FIFO and LIFO). For FIFO:
- Other operations on deque might not be thread safe, I’m not sure.
- deque does not block on pop() or popleft() so you can’t base your consumer thread flow on blocking till a new item arrives.
However, it seems that deque has a significant efficiency advantage. Here are some benchmark results in seconds using CPython 2.7.3 for inserting and removing 100k items
Here’s the benchmark code:
For information there is a Python ticket referenced for deque thread-safety (https://bugs.python.org/issue15329). Title «clarify which deque methods are thread-safe»
The deque’s append(), appendleft(), pop(), popleft(), and len(d) operations are thread-safe in CPython. The append methods have a DECREF at the end (for cases where maxlen has been set), but this happens after all of the structure updates have been made and the invariants have been restored, so it is okay to treat these operations as atomic.
Anyway, if you are not 100% sure and you prefer reliability over performance, just put a like Lock 😉
All single-element methods on deque are atomic and thread-safe. All other methods are thread-safe too. Things like len(dq) , dq[4] yield momentary correct values. But think e.g. about dq.extend(mylist) : you don’t get a guarantee that all elements in mylist are filed in a row when other threads also append elements on the same side — but thats usually not a requirement in inter-thread communication and for the questioned task.
20x faster than Queue (which uses a deque under the hood) and unless you don’t need the «comfortable» synchronization API (blocking / timeout), the strict maxsize obeyance or the «Override these methods (_put, _get, ..) to implement other queue organizations» sub-classing behavior, or when you take care of such things yourself, then a bare deque is a good and efficient deal for high-speed inter-thread communication.
Источник
Добавить в деку повторяется в Python?
У меня есть deque в Python, который я повторяю. Иногда deque меняется во время моего общения, что приводит к RuntimeError: deque mutated during iteration .
Если бы это был список Python, а не deque, я бы просто перебрал копию списка (через слайс, такой как my_list[:] , но поскольку операции слайса не могут использоваться в запросах, мне интересно, что является наиболее питоническим способ справиться с этим?
Мое решение состоит в том, чтобы импортировать модуль копирования и затем выполнить итерацию по копии, как for item in copy(my_deque): , что хорошо, но так как я искал эту тему высоко и низко, я решил опубликовать здесь, чтобы спросить?
2 ответа
Вы можете «заморозить» его, создав список. Нет необходимости копировать его в новую деку. Список, конечно, достаточно хорош, так как он нужен только для итерации.
list(x) создает список из любого итерируемого x , включая deque, и в большинстве случаев это самый питонический способ сделать это.
Имейте в виду, что это решение действительно только в том случае, если deque изменяется в том же потоке (то есть внутри цикла). Иначе, имейте в виду, что list(my_deque) не является атомарным, а также выполняет итерации по deque. Это означает, что если другой поток изменяет деку во время работы, вы в конечном итоге с той же ошибкой. Если вы находитесь в многопоточной среде, используйте блокировку.
Несмотря на то, что вы можете создать список из очереди, for elem in list(deque) , это не всегда оптимально, если это часто используемая функция: это требует затрат на производительность, особенно. если в деке много элементов, и вы постоянно меняете его на array структуру.
Возможной альтернативой без создания списка является использование цикла while с некоторым логическим значением var для управления условиями. Это обеспечивает временную сложность O (1).
Источник
I’m trying to implement market simulation for algorithmic trading and I’ve found this code on github https://github.com/DrAshBooth/PyLOB.
The problem is when I’m running my code for small window, for example, 2 days, everything is fine and I get my expected results. But when I increase window to 20 days or more, I get «RuntimeError: deque mutated during iteration».
I’ve checked my code but I’ve never found anything that could mutate deque during my runs.
Below is the part of the code that produces the error:
self.tape = deque(maxlen=None)
.
.
.
def avg_volume_traded(self):
if self.tape != None and len(self.tape) > 0:
num = 0
total_volume = 0
for entry in self.tape:
total_volume = entry['qty'] + total_volume
num += 1
if num != 0 and total_volume != None:
return total_volume, num
else:
return None, None
This is the actual error message:
Exception in thread Thread-10986:
Traceback (most recent call last):
File "/home/hamid/anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/hamid/anaconda3/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "exchange_envirnment.py", line 60, in _doit
self.func(self.args[0], self.args[1])
File "/home/hamid/dukto/src2/src_new/traders/market_maker_trader.py", line 46, in trade
self.type_three(lob_obj, reporter_obj)
File "/home/hamid/dukto/src2/src_new/traders/market_maker_trader.py", line 285, in type_three
max_volume = lob_obj.max_volume_traded()
File "/home/hamid/dukto/src2/src_new/PyLOB/orderbook.py", line 395, in max_volume_traded
for entry in self.tape:
RuntimeError: deque mutated during iteration
This is the main part that use threading in two parts(class Periodic and day_period):
class Periodic(object):
def __init__(self, object, compression_factor, args=[], kwargs={}):
self.compression_factor = compression_factor
self.object = object
self.func = object.trade
self.args = args
self.kwargs = kwargs
self.seppuku = Event()
def start(self):
self.seppuku.clear()
self.proc = Thread(target=self._doit)
self.proc.start()
def stop(self):
self.seppuku.set()
self.proc.join()
def _doit(self):
while True:
self.seppuku.wait(self.object.interval / self.compression_factor)
if self.seppuku.is_set():
break
self.func(self.args[0], self.args[1])
class day_period(object):
def __init__(self, object, compression_factor, args=[], kwargs={}):
self.period = (3600 * 4) / compression_factor
self.func = object.run
self.args = args
self.kwargs = kwargs
self.seppuku = Event()
def start(self):
self.seppuku.clear()
self.proc = Thread(target=self._doit)
self.proc.start()
def stop(self):
self.seppuku.set()
self.proc.join()
def _doit(self):
while True:
self.seppuku.wait(self.period)
if self.seppuku.is_set():
break
self.func(self.args)
class intra_day_traders_mng(object):
def __init__(self, simulation_config):
self.config = simulation_config
self.agents_list = []
self.agents_dict = {}
self.p_list = []
self.compression_factor = simulation_config['simulation_config']['compression_factor']
self.trader_creator()
self.first_time = True
self.day_of_simulation = simulation_config['simulation_config']['day_number']
def trader_creator(self):
for agent_name in self.config['agents']['intra_day']:
for config in self.config['agents']['intra_day'][agent_name]:
if agent_name == 'nonclassified_trader':
for k in range(config['n_traders']):
self.agents_list.append(NON_CLASSIFIED_TRADER_INTRADAY(config))
time.sleep(.1)
for agent_name in self.config['agents']['daily']:
for config in self.config['agents']['daily'][agent_name]:
if agent_name == 'nonclassified_trader':
for k in range(config['n_traders']):
self.agents_list.append(NON_CLASSIFIED_TRADER_DAILY(config))
time.sleep(0.1)
if agent_name == "market_maker_trader":
for k in range(config['n_traders']):
self.agents_list.append(MARKET_MAKER_TRADER_DAILY(config))
time.sleep(0.1)
for agent in self.agents_list:
self.agents_dict.update({agent.id: agent})
for agent in self.agents_list:
agent.set_trader_dict(self.agents_dict)
def random_initial(self):
agents_random_list = random.choices(self.agents_list, k=len(self.agents_list))
return agents_random_list
def run(self, args):
lob = args[0]
reporter_obj = args[1]
# when the trader running for first time
if self.first_time == True:
lob.time_obj.reset()
agents_random_list = self.random_initial()
for agent in agents_random_list:
self.p_list.append(Periodic(agent, self.compression_factor, args=(lob,reporter_obj)))
self.p_list[-1].start()
time.sleep(.1)
self.first_time = False
else:
for proc in self.p_list:
proc.stop()
for agent in self.agents_list:
agent.reset_trader(lob)
time_series = lob.ohcl()
if len(time_series) == self.day_of_simulation :
out = {'out':time_series}
with open('output.json', 'w') as outfile:
json.dump(out, outfile)
reporter_obj.save_as_csv()
trade_summary = lob.trade_summary()
with open('trade_report.csv', 'w') as csvFile:
writer = csv.writer(csvFile)
writer.writerows(trade_summary)
csvFile.close()
sys.exit()
print("***********************************************************************************")
print("day is:",lob.time_obj.day)
lob.time_obj.reset()
for proc in self.p_list:
proc.start()
time.sleep(.1)
if __name__ == '__main__':
with open('config.json', 'r') as f:
simulation_config = json.load(f)
intra_day_mng_obj = intra_day_traders_mng(simulation_config)
reporter_obj = REPORTER()
# for synchronization of time
time_obj = TIME_MANAGE(compression_factor=simulation_config['simulation_config']['compression_factor'])
lob = OrderBook(time_obj, tick_size=simulation_config['simulation_config']['tickSize'])
day_period(intra_day_mng_obj, simulation_config['simulation_config']['compression_factor'], args=(lob,reporter_obj)).start()
And finally the «OrderBook» that defines «self.tape» in the below code:
class OrderBook():
def __init__(self, time_obj, tick_size=0.0001):
self.tape = deque(maxlen=None) # Index [0] is most recent trade
self.bids = OrderTree()
self.asks = OrderTree()
self.lastTick = None
self.lastTimestamp = 0
self.tickSize = tick_size
self.time = 0
self.nextQuoteID = 0
self.time_series = []
self.time_obj = time_obj
def clipPrice(self, price):
return round(price, int(math.log10(1 / self.tickSize)))
def updateTime(self):
self.time = int(self.time_obj.now()['time'])
def processOrder(self, quote, fromData, verbose):
orderType = quote['type']
orderInBook = None
if fromData:
self.time = quote['timestamp']
else:
self.updateTime()
quote['timestamp'] = self.time
if quote['qty'] <= 0:
sys.exit('processLimitOrder() given order of qty <= 0')
if not fromData: self.nextQuoteID += 1
if orderType == 'market':
trades = self.processMarketOrder(quote, verbose)
elif orderType == 'limit':
quote['price'] = self.clipPrice(quote['price'])
trades, orderInBook = self.processLimitOrder(quote, fromData, verbose)
else:
sys.exit("processOrder() given neither 'market' nor 'limit'")
return trades, orderInBook
def processOrderList(self, side, orderlist,
qtyStillToTrade, quote, verbose):
trades = []
qtyToTrade = qtyStillToTrade
while len(orderlist) > 0 and qtyToTrade > 0:
headOrder = orderlist.getHeadOrder()
tradedPrice = headOrder.price
counterparty = headOrder.tid
if qtyToTrade < headOrder.qty:
tradedQty = qtyToTrade
newBookQty = headOrder.qty - qtyToTrade
headOrder.updateQty(newBookQty, headOrder.timestamp)
qtyToTrade = 0
elif qtyToTrade == headOrder.qty:
tradedQty = qtyToTrade
if side == 'bid':
self.bids.removeOrderById(headOrder.idNum)
else:
self.asks.removeOrderById(headOrder.idNum)
qtyToTrade = 0
else:
tradedQty = headOrder.qty
if side == 'bid':
self.bids.removeOrderById(headOrder.idNum)
else:
self.asks.removeOrderById(headOrder.idNum)
qtyToTrade -= tradedQty
if verbose: print('>>> TRADE nt=%d $%f n=%d p1=%d p2=%d' %
(self.time, tradedPrice, tradedQty,
counterparty, quote['tid']))
transactionRecord = {'timestamp': self.time,
'price': tradedPrice,
'qty': tradedQty,
'time': self.time,
'day': self.time_obj.now()['day']}
if side == 'bid':
transactionRecord['party1'] = [counterparty,
'bid',
headOrder.idNum]
transactionRecord['party2'] = [quote['tid'],
'ask',
None]
else:
transactionRecord['party1'] = [counterparty,
'ask',
headOrder.idNum]
transactionRecord['party2'] = [quote['tid'],
'bid',
None]
self.tape.append(transactionRecord)
trades.append(transactionRecord)
return qtyToTrade, trades
def processMarketOrder(self, quote, verbose):
trades = []
qtyToTrade = quote['qty']
side = quote['side']
if side == 'bid':
while qtyToTrade > 0 and self.asks:
bestPriceAsks = self.asks.minPriceList()
qtyToTrade, newTrades = self.processOrderList('ask',
bestPriceAsks,
qtyToTrade,
quote, verbose)
trades += newTrades
elif side == 'ask':
while qtyToTrade > 0 and self.bids:
bestPriceBids = self.bids.maxPriceList()
qtyToTrade, newTrades = self.processOrderList('bid',
bestPriceBids,
qtyToTrade,
quote, verbose)
trades += newTrades
else:
sys.exit('processMarketOrder() received neither "bid" nor "ask"')
return trades
def processLimitOrder(self, quote, fromData, verbose):
orderInBook = None
trades = []
qtyToTrade = quote['qty']
side = quote['side']
price = quote['price']
if side == 'bid':
while (self.asks and
price >= self.asks.minPrice() and
qtyToTrade > 0):
bestPriceAsks = self.asks.minPriceList()
qtyToTrade, newTrades = self.processOrderList('ask',
bestPriceAsks,
qtyToTrade,
quote, verbose)
trades += newTrades
if qtyToTrade > 0:
if not fromData:
quote['idNum'] = self.nextQuoteID
quote['qty'] = qtyToTrade
self.bids.insertOrder(quote)
orderInBook = quote
elif side == 'ask':
while (self.bids and
price <= self.bids.maxPrice() and
qtyToTrade > 0):
bestPriceBids = self.bids.maxPriceList()
qtyToTrade, newTrades = self.processOrderList('bid',
bestPriceBids,
qtyToTrade,
quote, verbose)
trades += newTrades
if qtyToTrade > 0:
if not fromData:
quote['idNum'] = self.nextQuoteID
quote['qty'] = qtyToTrade
self.asks.insertOrder(quote)
orderInBook = quote
else:
sys.exit('processLimitOrder() given neither bid nor ask')
return trades, orderInBook
def avg_volume_traded(self):
if self.tape != None and len(self.tape) > 0:
num = 0
total_volume = 0
for entry in self.tape:
total_volume = entry['qty'] + total_volume
num += 1
if num != 0 and total_volume != None:
return total_volume, num
else:
return None, None
Я пытаюсь реализовать рыночное моделирование для алгоритмической торговли, и я нашел этот код на github https: // github. ru / DrAshBooth / PyLOB. Проблема в том, что когда я запускаю свой код для небольшого окна, например, 2 дня, все в порядке, и я получаю ожидаемые результаты. Но когда я увеличиваю окно до 20 дней или более, я получаю «RuntimeError: deque mutated во время итерации». Я проверил свой код, но не нашел ничего, что могло бы изменить deque во время моих прогонов. Ниже приведена часть кода, вызывающая ошибку:
self.tape = deque(maxlen=None)
.
.
.
def avg_volume_traded(self):
if self.tape != None and len(self.tape) > 0:
num = 0
total_volume = 0
for entry in self.tape:
total_volume = entry['qty'] + total_volume
num += 1
if num != 0 and total_volume != None:
return total_volume, num
else:
return None, None
Это фактическое сообщение об ошибке:
Exception in thread Thread-10986:
Traceback (most recent call last):
File "/home/hamid/anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/home/hamid/anaconda3/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "exchange_envirnment.py", line 60, in _doit
self.func(self.args[0], self.args[1])
File "/home/hamid/dukto/src2/src_new/traders/market_maker_trader.py", line 46, in trade
self.type_three(lob_obj, reporter_obj)
File "/home/hamid/dukto/src2/src_new/traders/market_maker_trader.py", line 285, in type_three
max_volume = lob_obj.max_volume_traded()
File "/home/hamid/dukto/src2/src_new/PyLOB/orderbook.py", line 395, in max_volume_traded
for entry in self.tape:
RuntimeError: deque mutated during iteration
Это основная часть, использующая потоки в двух частях (класс Periodic и day_period):
class Periodic(object):
def __init__(self, object, compression_factor, args=[], kwargs={}):
self.compression_factor = compression_factor
self.object = object
self.func = object.trade
self.args = args
self.kwargs = kwargs
self.seppuku = Event()
def start(self):
self.seppuku.clear()
self.proc = Thread(target=self._doit)
self.proc.start()
def stop(self):
self.seppuku.set()
self.proc.join()
def _doit(self):
while True:
self.seppuku.wait(self.object.interval / self.compression_factor)
if self.seppuku.is_set():
break
self.func(self.args[0], self.args[1])
class day_period(object):
def __init__(self, object, compression_factor, args=[], kwargs={}):
self.period = (3600 * 4) / compression_factor
self.func = object.run
self.args = args
self.kwargs = kwargs
self.seppuku = Event()
def start(self):
self.seppuku.clear()
self.proc = Thread(target=self._doit)
self.proc.start()
def stop(self):
self.seppuku.set()
self.proc.join()
def _doit(self):
while True:
self.seppuku.wait(self.period)
if self.seppuku.is_set():
break
self.func(self.args)
class intra_day_traders_mng(object):
def __init__(self, simulation_config):
self.config = simulation_config
self.agents_list = []
self.agents_dict = {}
self.p_list = []
self.compression_factor = simulation_config['simulation_config']['compression_factor']
self.trader_creator()
self.first_time = True
self.day_of_simulation = simulation_config['simulation_config']['day_number']
def trader_creator(self):
for agent_name in self.config['agents']['intra_day']:
for config in self.config['agents']['intra_day'][agent_name]:
if agent_name == 'nonclassified_trader':
for k in range(config['n_traders']):
self.agents_list.append(NON_CLASSIFIED_TRADER_INTRADAY(config))
time.sleep(.1)
for agent_name in self.config['agents']['daily']:
for config in self.config['agents']['daily'][agent_name]:
if agent_name == 'nonclassified_trader':
for k in range(config['n_traders']):
self.agents_list.append(NON_CLASSIFIED_TRADER_DAILY(config))
time.sleep(0.1)
if agent_name == "market_maker_trader":
for k in range(config['n_traders']):
self.agents_list.append(MARKET_MAKER_TRADER_DAILY(config))
time.sleep(0.1)
for agent in self.agents_list:
self.agents_dict.update({agent.id: agent})
for agent in self.agents_list:
agent.set_trader_dict(self.agents_dict)
def random_initial(self):
agents_random_list = random.choices(self.agents_list, k=len(self.agents_list))
return agents_random_list
def run(self, args):
lob = args[0]
reporter_obj = args[1]
# when the trader running for first time
if self.first_time == True:
lob.time_obj.reset()
agents_random_list = self.random_initial()
for agent in agents_random_list:
self.p_list.append(Periodic(agent, self.compression_factor, args=(lob,reporter_obj)))
self.p_list[-1].start()
time.sleep(.1)
self.first_time = False
else:
for proc in self.p_list:
proc.stop()
for agent in self.agents_list:
agent.reset_trader(lob)
time_series = lob.ohcl()
if len(time_series) == self.day_of_simulation :
out = {'out':time_series}
with open('output.json', 'w') as outfile:
json.dump(out, outfile)
reporter_obj.save_as_csv()
trade_summary = lob.trade_summary()
with open('trade_report.csv', 'w') as csvFile:
writer = csv.writer(csvFile)
writer.writerows(trade_summary)
csvFile.close()
sys.exit()
print("***********************************************************************************")
print("day is:",lob.time_obj.day)
lob.time_obj.reset()
for proc in self.p_list:
proc.start()
time.sleep(.1)
if __name__ == '__main__':
with open('config.json', 'r') as f:
simulation_config = json.load(f)
intra_day_mng_obj = intra_day_traders_mng(simulation_config)
reporter_obj = REPORTER()
# for synchronization of time
time_obj = TIME_MANAGE(compression_factor=simulation_config['simulation_config']['compression_factor'])
lob = OrderBook(time_obj, tick_size=simulation_config['simulation_config']['tickSize'])
day_period(intra_day_mng_obj, simulation_config['simulation_config']['compression_factor'], args=(lob,reporter_obj)).start()
И, наконец, «OrderBook», который определяет «self.tape» в приведенном ниже коде:
class OrderBook():
def __init__(self, time_obj, tick_size=0.0001):
self.tape = deque(maxlen=None) # Index [0] is most recent trade
self.bids = OrderTree()
self.asks = OrderTree()
self.lastTick = None
self.lastTimestamp = 0
self.tickSize = tick_size
self.time = 0
self.nextQuoteID = 0
self.time_series = []
self.time_obj = time_obj
def clipPrice(self, price):
return round(price, int(math.log10(1 / self.tickSize)))
def updateTime(self):
self.time = int(self.time_obj.now()['time'])
def processOrder(self, quote, fromData, verbose):
orderType = quote['type']
orderInBook = None
if fromData:
self.time = quote['timestamp']
else:
self.updateTime()
quote['timestamp'] = self.time
if quote['qty'] <= 0:
sys.exit('processLimitOrder() given order of qty <= 0')
if not fromData: self.nextQuoteID += 1
if orderType == 'market':
trades = self.processMarketOrder(quote, verbose)
elif orderType == 'limit':
quote['price'] = self.clipPrice(quote['price'])
trades, orderInBook = self.processLimitOrder(quote, fromData, verbose)
else:
sys.exit("processOrder() given neither 'market' nor 'limit'")
return trades, orderInBook
def processOrderList(self, side, orderlist,
qtyStillToTrade, quote, verbose):
trades = []
qtyToTrade = qtyStillToTrade
while len(orderlist) > 0 and qtyToTrade > 0:
headOrder = orderlist.getHeadOrder()
tradedPrice = headOrder.price
counterparty = headOrder.tid
if qtyToTrade < headOrder.qty:
tradedQty = qtyToTrade
newBookQty = headOrder.qty - qtyToTrade
headOrder.updateQty(newBookQty, headOrder.timestamp)
qtyToTrade = 0
elif qtyToTrade == headOrder.qty:
tradedQty = qtyToTrade
if side == 'bid':
self.bids.removeOrderById(headOrder.idNum)
else:
self.asks.removeOrderById(headOrder.idNum)
qtyToTrade = 0
else:
tradedQty = headOrder.qty
if side == 'bid':
self.bids.removeOrderById(headOrder.idNum)
else:
self.asks.removeOrderById(headOrder.idNum)
qtyToTrade -= tradedQty
if verbose: print('>>> TRADE nt=%d $%f n=%d p1=%d p2=%d' %
(self.time, tradedPrice, tradedQty,
counterparty, quote['tid']))
transactionRecord = {'timestamp': self.time,
'price': tradedPrice,
'qty': tradedQty,
'time': self.time,
'day': self.time_obj.now()['day']}
if side == 'bid':
transactionRecord['party1'] = [counterparty,
'bid',
headOrder.idNum]
transactionRecord['party2'] = [quote['tid'],
'ask',
None]
else:
transactionRecord['party1'] = [counterparty,
'ask',
headOrder.idNum]
transactionRecord['party2'] = [quote['tid'],
'bid',
None]
self.tape.append(transactionRecord)
trades.append(transactionRecord)
return qtyToTrade, trades
def processMarketOrder(self, quote, verbose):
trades = []
qtyToTrade = quote['qty']
side = quote['side']
if side == 'bid':
while qtyToTrade > 0 and self.asks:
bestPriceAsks = self.asks.minPriceList()
qtyToTrade, newTrades = self.processOrderList('ask',
bestPriceAsks,
qtyToTrade,
quote, verbose)
trades += newTrades
elif side == 'ask':
while qtyToTrade > 0 and self.bids:
bestPriceBids = self.bids.maxPriceList()
qtyToTrade, newTrades = self.processOrderList('bid',
bestPriceBids,
qtyToTrade,
quote, verbose)
trades += newTrades
else:
sys.exit('processMarketOrder() received neither "bid" nor "ask"')
return trades
def processLimitOrder(self, quote, fromData, verbose):
orderInBook = None
trades = []
qtyToTrade = quote['qty']
side = quote['side']
price = quote['price']
if side == 'bid':
while (self.asks and
price >= self.asks.minPrice() and
qtyToTrade > 0):
bestPriceAsks = self.asks.minPriceList()
qtyToTrade, newTrades = self.processOrderList('ask',
bestPriceAsks,
qtyToTrade,
quote, verbose)
trades += newTrades
if qtyToTrade > 0:
if not fromData:
quote['idNum'] = self.nextQuoteID
quote['qty'] = qtyToTrade
self.bids.insertOrder(quote)
orderInBook = quote
elif side == 'ask':
while (self.bids and
price <= self.bids.maxPrice() and
qtyToTrade > 0):
bestPriceBids = self.bids.maxPriceList()
qtyToTrade, newTrades = self.processOrderList('bid',
bestPriceBids,
qtyToTrade,
quote, verbose)
trades += newTrades
if qtyToTrade > 0:
if not fromData:
quote['idNum'] = self.nextQuoteID
quote['qty'] = qtyToTrade
self.asks.insertOrder(quote)
orderInBook = quote
else:
sys.exit('processLimitOrder() given neither bid nor ask')
return trades, orderInBook
def avg_volume_traded(self):
if self.tape != None and len(self.tape) > 0:
num = 0
total_volume = 0
for entry in self.tape:
total_volume = entry['qty'] + total_volume
num += 1
if num != 0 and total_volume != None:
return total_volume, num
else:
return None, None
Мне нужна очередь, в которую могут вставлять несколько потоков, и несколько потоков могут читать.
Python имеет как минимум два класса очереди Queue.Queue и collections.deque, причем первый, по-видимому, использует его внутри. Оба документа утверждают, что они являются потокобезопасными в документации.
Однако в документах очереди также указывается:
collections.deque — альтернатива реализация неограниченных очередей с быстрым атомом append() и операции popleft(), которые не требуют блокировки.
Я думаю, что я не совсем понимаю: означает ли это, что deque не полностью потокобезопасен?
Если это так, я могу не полностью понять разницу между этими двумя классами. Я вижу, что в Queue добавлены блокирующие функции. С другой стороны, он теряет некоторые функции deque, такие как поддержка для оператора.
Доступ к внутреннему объекту deque напрямую,
x в очереди(). deque
потокобезопасна?
Также, почему Queue использует мьютекс для его операций, когда deque уже потокобезопасен?
04 апр. 2009, в 16:59
Поделиться
Источник
6 ответов
Queue.Queue
и collections.deque
служат для разных целей. Queue.Queue предназначен для передачи различным потокам сообщений с использованием сообщений/данных в очереди, тогда как collections.deque
просто предназначен как структура данных. Поэтому Queue.Queue
имеет такие методы, как put_nowait()
, get_nowait()
и join()
, тогда как collections.deque
нет. Queue.Queue
не предназначен для использования в качестве коллекции, поэтому ему не хватает операторов in
.
Это сводится к следующему: если у вас несколько потоков, и вы хотите, чтобы они могли общаться без необходимости блокировки, вы ищете Queue.Queue
; если вам просто нужна очередь или двойная очередь в качестве структуры данных, используйте collections.deque
.
Наконец, доступ и управление внутренним deque Queue.Queue
играет с огнем — вы действительно не хотите этого делать.
Keith Gaughan
04 апр. 2009, в 15:43
Поделиться
Если все, что вы ищете, это потокобезопасный способ передачи объектов между потоками, то оба они будут работать (как для FIFO, так и для LIFO). Для FIFO:
-
Queue.put()
иQueue.get()
являются потокобезопасными -
deque.append()
иdeque.popleft()
являются потокобезопасными
Примечание:
- Другие операции над
deque
могут не быть потокобезопасными, я не уверен. -
deque
не блокируется наpop()
илиpopleft()
, поэтому вы не можете генерировать поток потребительских потоков при блокировке до появления нового элемента.
Однако, кажется, что deque имеет значительное преимущество в эффективности. Ниже приведены некоторые результаты тестов в секундах с использованием CPython 2.7.3 для вставки и удаления элементов 100k.
deque 0.0747888759791
Queue 1.60079066852
Вот эталонный код:
import time
import Queue
import collections
q = collections.deque()
t0 = time.clock()
for i in xrange(100000):
q.append(1)
for i in xrange(100000):
q.popleft()
print 'deque', time.clock() - t0
q = Queue.Queue(200000)
t0 = time.clock()
for i in xrange(100000):
q.put(1)
for i in xrange(100000):
q.get()
print 'Queue', time.clock() - t0
Jonathan
02 дек. 2013, в 15:22
Поделиться
Для информации имеется билет на Python, на который ссылаются на безопасность потока deque (https://bugs.python.org/issue15329).
Title «уточнить, какие методы deque являются потокобезопасными»
Нижняя строка здесь: https://bugs.python.org/issue15329#msg199368
Deque append(), appendleft(), pop(), popleft() и len (d) операции являются поточно-безопасными в CPython. Методы добавления имеют DECREF в конце (для случаев, когда maxlen установлен), но это происходит после того, как были сделаны все обновления структуры, и инварианты были восстановлены, так что нормально обрабатывать эти операции как атомный.
В любом случае, если вы не уверены на 100% и предпочитаете надёжность по производительности, просто поставьте как Lock;)
BadWolf
12 дек. 2015, в 12:51
Поделиться
deque
является потокобезопасным. «операции, которые не требуют блокировки» означает, что вам не нужно делать блокировку самостоятельно, deque
позаботится об этом.
Взглянув на источник Queue
, внутренний deque называется self.queue
и использует мьютекс для аксессуаров и мутаций, поэтому Queue().queue
не является потокобезопасным для использования.
Если вы ищете оператора «in», то дека или очередь, возможно, не самая подходящая структура данных для вашей проблемы.
brian-brazil
04 апр. 2009, в 14:51
Поделиться
Все одноэлементные методы на deque
являются атомарными и потокобезопасными. Все остальные методы также являются потокобезопасными. Такие вещи, как len(dq)
, dq[4]
, дают мгновенные правильные значения. Но подумайте, например. about dq.extend(mylist)
: вы не получаете гарантию, что все элементы в mylist
подаются в строку, когда другие потоки также добавляют элементы на одной стороне — но это обычно не является обязательным требованием в межпоточной связи и для опрошенных задача.
Итак, deque
~ 20 раз быстрее, чем Queue
(который использует deque
под капотом), и если вам не нужен «удобный» интерфейс синхронизации (блокировка/тайм-аут), строгий maxsize
подчинение или «Переопределить эти методы (_put, _get,..), чтобы реализовать другие подсистемы очередей», или когда вы сами заботитесь о таких вещах, то голый deque
является хорошим и эффективным решением для высокоскоростная межпоточная связь.
Фактически интенсивное использование дополнительного метода mutex и дополнительного метода ._get()
и т.д. в Queue.py
вызвано ограничениями обратной совместимости, прошлым чрезмерным дизайном и отсутствием ухаживания за предоставление эффективного решения этой важной скорости проблема узких мест в межпоточной коммуникации. Список использовался в более старых версиях Python, но даже list.append()/. Pop (0) был и является атомарным и потокобезопасным…
kxr
19 апр. 2017, в 17:52
Поделиться
(кажется, у меня нет репутации, чтобы комментировать…)
Вам нужно быть осторожным, какие методы детекса вы используете из разных потоков.
deque.get() кажется потокобезопасным, но я обнаружил, что выполнение
for item in a_deque:
process(item)
может завершиться ошибкой, если другой поток добавляет элементы одновременно.
Я получил исключение RuntimeException, которое жаловалось, что «deque мутировал во время итерации».
Отметьте collectionsmodule.c, чтобы увидеть, какие действия затронуты этим
Eliot Blennerhassett
28 июль 2015, в 11:22
Поделиться
Ещё вопросы
- 0Предпочтительный выбор и визуальный разделитель в Symfony 2
- 1Утвердить регулярное выражение водительских прав Великобритании в JavaScript
- 1Как убрать перекрестие с полей в d3.js?
- 1Прохождение комнат в текстовой игре на Java
- 1Схема URL-адреса Bing Map для Windows Phone 8.1
- 0Как заблокировать значок, установленный сервером?
- 0Как настроить Wampserver для подключения к серверу mssql с помощью PDO?
- 1Как я могу транслировать аудио Alaw?
- 1Как добавить внешние файлы JS в проект Zurb Foundation 6.4 (веб-пакет)?
- 0Как я могу сделать раскрывающееся меню при нажатии
- 1Хранение пользовательских данных в MSI
- 1Как нажать кнопку после успешного Ajax?
- 0Изменение размера DIV делает текстовое поле на новой строке
- 1TypeError: while_loop () получил неожиданный аргумент ключевого слова «Maximum_iterations» в Jupyter Azure
- 0Проблема в сборке выпуска Visual Studio 2012
- 1Как получить количество всех вариантов (скажем, из опроса) в виде столбцов
- 0MonoDevelop (Ubuntu) и MySql
- 0Изменение минимального и максимального значений нескольких ползунков JQueryUI на странице из Выбрать
- 0Можете ли вы добавить N-го ребенка в JS?
- 1Смешивание пользовательских и базовых сериализаций?
- 1Из JSON в XML и обратно в Java
- 0CSS div позиционируется не так, как ожидалось
- 0Опубликуйте угловое веб-приложение с кодомigniter Rest
- 1Как создать две взаимодействующие, но не блокирующие службы в Android?
- 0Сессия формы не работает, когда ее используют несколько пользователей
- 1Как отключить звук определенных источников звука в Unity3D?
- 0Как я могу использовать функции apcu apc_store (), apc_fetch () в PHPNG / PHP7?
- 0Как я могу использовать angularjs OrderBy и ng-repeat для последовательной итерации списка (10-секундные интервалы)?
- 1Неавторизовать и приложение из учетной записи доступа AccountManager
- 1Android: как удалить учетную запись из базы данных, которая отображается в разделе «учетные записи и синхронизация» из моего собственного приложения
- 1Как играть в автоматический звонок на номер в Android?
- 1Как я могу скачать Android 2.0 SDK сейчас
- 1PipEnv: как работать с локально установленными пакетами .whl
- 0умный указатель в функции
- 0Встроенные видео не отображаются
- 1Как я могу отправить функции Python 2 на основе типа данных, передаваемых в функцию?
- 1VerticalScrollBar для списка внутри Expander — WPF
- 1Помогает с появлением спрайта, столкновением спрайта и отображением очков
- 1При подключении в PyQt4 QMessageBox не может вызвать метод слота
- 1получить подстроку Python внутри элементов списка — Web Scraping
- 1Asyncio против Asyncore для пользовательского сервера Python
- 0Как передать данные контроллера для просмотра в codeigniter
- 0JQuery-UI меню не работает, как ожидалось
- 1Является ли MemcacheService принципиально единственным
- 0Получение неверного абсолютного значения в C ++ / C
- 1Как реализовать push-уведомления для нескольких приложений Android?
- 1Java — закрытый открытый член внутреннего класса
- 1Получать данные udp с системного сервера udp на android?
- 1Разница между C # dll, созданной на локальном компьютере, и сервером сборки TFS
- 1Несколько целочисленных входов в одной строке Python
I have a fairly «simple» loop on a Raspberry Pi Zero W that runs two applications: the first basically generates random «arrivals» of a simulated train and activates GPIO pins when those events occur; the second is a function that monitors the state of an external motion detector, and passes that state into a function which will return True if motion is detected.
The exception is as follows:
Traceback (most recent call last): File "TrainGenerator.py", line 107, in <module> detector_state = garage_motion(mqttc, topic_state, motion_sensor ) File "TrainGenerator.py", line 54, in garage_motion if sensor.motion_detected: File "/home/pi/TrainGenerator/venv/lib/python3.7/site-packages/gpi ozero/input_devices.py", line 350, in is_active return self.value > self.threshold File "/home/pi/TrainGenerator/venv/lib/python3.7/site-packages/gpi ozero/input_devices.py", line 604, in value return super(MotionSensor, self).value File "/home/pi/TrainGenerator/venv/lib/python3.7/site-packages/gpi ozero/input_devices.py", line 325, in value return self._queue.value File "/home/pi/TrainGenerator/venv/lib/python3.7/site-packages/gpi ozero/mixins.py", line 587, in value return self.average(self.queue) File "/usr/lib/python3.7/statistics.py", line 311, in mean T, total, count = _sum(data) File "/usr/lib/python3.7/statistics.py", line 147, in _sum for n,d in map(_exact_ratio, values): RuntimeError: deque mutated during iteration
The offending function is this:
def garage_motion(mqtt_client, topic, sensor): if sensor.motion_detected: mqtt_publish(mqtt_client, topic, 'ON') return True else: mqtt_publish(mqtt_client, topic, 'OFF') return False
This function is run every 0.1 seconds in the main loop.
Full code can be found here.
I’m totally at a loss here, any tips would be appreciated.
EDIT: I have no idea why the line breaks all got lost in my code snippets.
Created on 2014-08-11 23:47 by dougz, last changed 2022-04-11 14:58 by admin. This issue is now closed.
Files | |||
---|---|---|---|
File name | Uploaded | Description | Edit |
fix.diff |
dougz, 2014-08-11 23:47 |
move self._waiters modification inside lock |
review |
Messages (5) | ||
---|---|---|
msg225208 — (view) | Author: Doug Zongker (dougz) * | Date: 2014-08-11 23:47 |
Condition.wait() modifies self._waiters without holding the lock (when a wait with timeout times out without the condition being notified). If this happens to occur in between construction of the _islice and _deque objects in Condition.notify(): def notify(self, n=1): [...] all_waiters = self._waiters waiters_to_notify = _deque(_islice(all_waiters, n)) then the result is a RuntimeError exception: File "/usr/lib/python3.4/threading.py", line 358, in notify_all self.notify(len(self._waiters)) File "/usr/lib/python3.4/threading.py", line 341, in notify waiters_to_notify = _deque(_islice(all_waiters, n)) RuntimeError: deque mutated during iteration (I have a server which makes extensive use of conditions on which this happens about once a day.) This patch fixes this bug by moving wait()'s modification of self._waiters to be inside the lock, as suggested by Antoine Pitrou here: http://bugs.python.org/issue17385#msg183875 |
||
msg225213 — (view) | Author: Tim Peters (tim.peters) * |
Date: 2014-08-12 00:18 |
+1. I agree it's a bug, that the diagnosis is correct, and that the patch will fix it :-) |
||
msg226068 — (view) | Author: Doug Zongker (dougz) * | Date: 2014-08-29 17:18 |
So, what happens now? What do I need to do to make progress on this? |
||
msg226088 — (view) | Author: Roundup Robot (python-dev) |
Date: 2014-08-29 21:27 |
New changeset 4cce39cfe46c by Antoine Pitrou in branch '3.4': Issue #22185: Fix an occasional RuntimeError in threading.Condition.wait() caused by mutation of the waiters queue without holding the lock. http://hg.python.org/cpython/rev/4cce39cfe46c New changeset 78a38f8bd5d9 by Antoine Pitrou in branch 'default': Issue #22185: Fix an occasional RuntimeError in threading.Condition.wait() caused by mutation of the waiters queue without holding the lock. http://hg.python.org/cpython/rev/78a38f8bd5d9 |
||
msg226089 — (view) | Author: Antoine Pitrou (pitrou) * |
Date: 2014-08-29 21:28 |
It only needed someone to push your patch, which I just did. Thank you very much Doug, your contribution is appreciated! |
History | |||
---|---|---|---|
Date | User | Action | Args |
2022-04-11 14:58:06 | admin | set | github: 66381 |
2014-08-29 21:28:26 | pitrou | set | status: open -> closed resolution: fixed messages: + msg226089 stage: resolved |
2014-08-29 21:27:42 | python-dev | set | nosy: + python-dev messages: + msg226088 |
2014-08-29 17:18:44 | dougz | set | messages: + msg226068 |
2014-08-16 12:43:17 | vstinner | set | nosy: + vstinner |
2014-08-12 00:18:58 | tim.peters | set | messages: + msg225213 |
2014-08-11 23:51:44 | pitrou | set | nosy: + tim.peters, rhettinger |
2014-08-11 23:47:11 | dougz | create |