Runtime error release unlocked lock

Hi @ChristianKuehnel, This is a follow up on #8. The program flow somehow tries to release an unreleased lock. I'm not sure what the originating cause of this could be, as the lock must have be...

Hi @ChristianKuehnel,

This is a follow up on #8. The program flow somehow tries to release an unreleased lock. I’m not sure what the originating cause of this could be, as the lock must have been previously acquired successfully. It may be some kind of race condition or concurrency issue. We could potentially just test if the lock is actually acquired. What do you think?

Stack-trace of the exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/btlewrap/bluepy.py", line 26, in _func_wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/btlewrap/bluepy.py", line 55, in connect
    self._peripheral = Peripheral(mac, iface=iface)
  File "/usr/local/lib/python3.7/site-packages/bluepy/btle.py", line 391, in __init__
    self._connect(deviceAddr, addrType, iface)
  File "/usr/local/lib/python3.7/site-packages/bluepy/btle.py", line 439, in _connect
    raise BTLEDisconnectError("Failed to connect to peripheral %s, addr type: %s" % (addr, addrType), rsp)
bluepy.btle.BTLEDisconnectError: Failed to connect to peripheral C4:7C:8D:66:E3:CC, addr type: public

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/btlewrap/base.py", line 44, in __enter__
    self._backend.connect(self._mac)
  File "/usr/local/lib/python3.7/site-packages/btlewrap/bluepy.py", line 32, in _func_wrapper
    raise BluetoothBackendException() from last_error
btlewrap.base.BluetoothBackendException

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "gateway.py", line 57, in <module>
    raise e
  File "gateway.py", line 47, in <module>
    mqtt.publish(_WORKERS_QUEUE.get(timeout=10).execute())
  File "/bt-mqtt-gateway/workers_manager.py", line 32, in execute
    messages = self._callback(*self._args)
  File "/bt-mqtt-gateway/workers/miflora.py", line 30, in status_update
    ret += self.update_device_state(name, poller)
  File "/bt-mqtt-gateway/workers/miflora.py", line 39, in update_device_state
    ret.append(MqttMessage(topic=self.format_topic(name, attr), payload=poller.parameter_value(attr)))
  File "/usr/local/lib/python3.7/site-packages/miflora/miflora_poller.py", line 132, in parameter_value
    self.fill_cache()
  File "/usr/local/lib/python3.7/site-packages/miflora/miflora_poller.py", line 68, in fill_cache
    with self._bt_interface.connect(self._mac) as connection:
  File "/usr/local/lib/python3.7/site-packages/btlewrap/base.py", line 47, in __enter__
    self._lock.release()
RuntimeError: release unlocked lock

Created on 2018-08-24 14:02 by uosiu, last changed 2022-04-11 14:59 by admin.

Files
File name Uploaded Description Edit
start_threads.py uosiu,
2018-08-24 14:02
test.sh uosiu,
2018-08-24 14:03
cond_race.py vstinner,
2018-08-27 12:31
Messages (10)
msg323998 — (view) Author: uosiu (uosiu) * Date: 2018-08-24 14:02
Sometimes when thread is starting it raises "RuntimeError: release unlocked lock". That is when signal handler is invoked in the same time.

Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib/python3.6/threading.py", line 304, in wait
    self._acquire_restore(saved_state)
  File "start_threads.py", line 12, in sighandler
    raise MyException("got signal")
__main__.MyException: got signal

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "start_threads.py", line 28, in <module>
    thread.start()
  File "/usr/lib/python3.6/threading.py", line 851, in start
    self._started.wait()
  File "/usr/lib/python3.6/threading.py", line 552, in wait
    return signaled
  File "/usr/lib/python3.6/threading.py", line 243, in __exit__
    return self._lock.__exit__(*args)
RuntimeError: release unlocked lock

I have attached to files that reproduce the problem. First runs python and starts noop threads in the loop. It uses signal handler that raises exception. Second is a bash script to run python script and send signal to it. After some time, when signal is handled in unfortunate place, python script breaks due to unreleased lock.
msg323999 — (view) Author: uosiu (uosiu) * Date: 2018-08-24 14:03
This scripts runs start_threads.py and send a signal to it. After some number of loops problem can be reproduced.
msg324170 — (view) Author: STINNER Victor (vstinner) * (Python committer) Date: 2018-08-27 12:31
cond_race.py: simplified example using threading.Lock and threading.Condition: the issue seems to be related to threading.Event which doesn't handle properly signals.
msg324490 — (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2018-09-03 03:47
It looks to me pure Python implementation could not handle such cases. Put aside other possible bugs, threading.Condition.wait re-acquire the lock using threading.Condition._acquire_restore which could be interrupted and not able to acquire the lock. So it seems in any way using Pure Python, it's not possible to guarantee the lock is acquired.
msg324655 — (view) Author: Vladimir Matveev (v2m) * Date: 2018-09-06 00:54
I agree. From code in threading.Condition.wait looks like if it is interrupted either after calling _release_save and before entering try block or in finally block before calling _acquire_restore - it will leave the lock in non-acquired state. 

First part in theory can be solved if _release_save is moved into try block and instead of returning saved_state as a result it will accept reference to saved_state local and set it in C code.

Second part looks more interesting ... :)
msg324785 — (view) Author: Serhiy Storchaka (serhiy.storchaka) * (Python committer) Date: 2018-09-07 19:02
This looks similar to to problems with the "with" statement: see issue29988 and issue34067. Except that there is no a "with" statement, and resolving that issues will not solve this issue.
msg324891 — (view) Author: hongweipeng (hongweipeng) * Date: 2018-09-09 16:35
It seems to me the problem is related to nested `finally` or `with`, which can't handle signals well.

class Event:
    ...
    def wait(self, timeout=None):
        self._cond.__enter__()
        signaled = self._flag
        if not signaled:
            signaled = self._cond.wait(timeout)
        self._cond.__exit__()
        return signaled


Reducing one layer of nesting can solve this issue,but it can't pass `test` module.
msg324893 — (view) Author: Xiang Zhang (xiang.zhang) * (Python committer) Date: 2018-09-09 17:11
It's not right to replace with statement by manually calling __enter__ and __exit__. If there is any exception after __enter__ and before __exit__, __exit__ method will be skipped. That's not what we want.
msg324896 — (view) Author: Vladimir Matveev (v2m) * Date: 2018-09-09 17:47
To bring in analogy: C# has lock statement that allow to run a block of code holding a mutual-exclusion lock on some object. 
```
lock(o) 
{  
}
```
is compiled as 
```
object _lock = o;
bool _lockTaken = false;
try
{
   System.Threading.Monitor.Enter(_lock, out _lockTaken);
   ...
}
finally 
{
   if (_lockTaken) 
   {
      System.Threading.Monitor.Exit(_lock);
   }
}
```
In C# System.ThreadAbortException can be raised in arbitrary point in code and yet lock statement needs to to enforce the invariant "if lock is taken it will be released". In order to do so:
- lock acquisition is performed inside the try block, as a side effect it sets the value of '_lockTaken' passed as out parameter - these two actions are performed atomically and cannot be interrupted by the asynchronous exception
- lock is released in finally block only if lock was previously taken. Also finally blocks in .NET has a property that they cannot be interrupted by asynchronous exceptions so call to Monitor.Exit is guaranteed to be run if control flow has entered matching try block.

I feel that something similar can be used to solve this issue as well. Discussions for issue29988 has already mentioned adding special semantic to __enter__/__exit__ methods or marking bytecode ranges as atomic to make sure that they are not interrupted. While the former case is specific to with statements, the latter one can probably be generalized to support finally blocks as well.
msg336070 — (view) Author: Radek (radek_kujawa) Date: 2019-02-20 11:11
Any progress on this topic? I think I've encountered this (or similar) issue:

>Traceback (most recent call last):
>  File "logging/__init__.py", line 1944, in shutdown
>  File "logging/__init__.py", line 813, in acquire
>  File "site-packages/utils/signals.py", line 58, in signal_wrapper
>  File "utils/utils.py", line 147, in sigterm_handler
>SystemExit: 0
>
>During handling of the above exception, another exception occurred:
>
>Traceback (most recent call last):
>  File "logging/__init__.py", line 1954, in shutdown
>  File "logging/__init__.py", line 821, in release
>RuntimeError: cannot release un-acquired lock
History
Date User Action Args
2022-04-11 14:59:05 admin set github: 78667
2022-02-03 17:42:54 maggyero set nosy:
+ maggyero
2019-02-20 11:11:51 radek_kujawa set nosy:
+ radek_kujawa
messages:
+ msg336070
2018-09-09 17:47:15 v2m set messages:
+ msg324896
2018-09-09 17:11:42 xiang.zhang set messages:
+ msg324893
2018-09-09 16:35:44 hongweipeng set nosy:
+ hongweipeng
messages:
+ msg324891
2018-09-07 19:02:32 serhiy.storchaka set messages:
+ msg324785
2018-09-06 00:54:33 v2m set nosy:
+ v2m
messages:
+ msg324655
2018-09-03 03:47:10 xiang.zhang set messages:
+ msg324490
2018-08-27 12:31:41 vstinner set files:
+ cond_race.py
nosy:
+ pitrou, serhiy.storchaka
messages:
+ msg324170
2018-08-25 07:26:44 xiang.zhang set nosy:
+ vstinner, xiang.zhang
2018-08-24 14:03:41 uosiu set files:
+ test.sh

messages:
+ msg323999

2018-08-24 14:02:35 uosiu create

Я вижу, что в журналах для моего приложения в Google App Engine (gqueues) появляется ошибка “Release unlocked lock” и не может понять, почему (полная трассировка стека ниже). Кто-нибудь знает, почему эта ошибка возникла?

Интересно, что приложение находится на python 2.7 с threadafe = NO, поэтому кажется странным, что вообще будет какая-либо блокировка или ошибки блокировки.

Предыстория: GQueues была на python2.5, а в начале апреля я перенес ее в 2.7 и установил threadafe = True. Все работало отлично в течение недели. Затем утром 9 апреля все приложение было недоступно, выплевывая 500 ошибок. В нем хранились новые экземпляры серверов, у которых были задержки более 30 секунд. Я заметил, что в своих журналах я увидел ошибку “Невозможно разблокировать блокировку блокировки”. Я не добавлял никаких механизмов блокировки в свой код во время миграции, поэтому я предположил, что это была блокировка механизма приложения, на которую ссылалась ошибка. Во всяком случае, я вернул свое приложение к предыдущей версии, которая все еще была на Python 2.5, и все снова заработало.

Позже в апреле AppEngine Downtime Notify сообщила, что 9 апреля возникла проблема, которая вызывала проблемы с приложениями python 2.7.
https://groups.google.com/d/topic/google-appengine-downtime-notify/QL8TmRn6Ay4/discussion

Я продолжил работу с Кристиной Илвенто из Google, и она подумала, что теперь лучше вернуться к Python 2.7 и включить threadafe = True. Я переместился обратно в 2.7, но оставил threadafe = False, думая, что я дам ему пару недель, чтобы убедиться, что все работает хорошо на 2.7, прежде чем снова включить threading (что, как я предполагал, вызвало проблему). Все работало нормально в течение нескольких недель, но теперь я вижу, что эта ошибка “разблокирована блокировка” даже при отключенном потоке. Кто-нибудь знает, что здесь происходит?

Кристина предложила мне опубликовать здесь, и команда App Engine будет следить за ней.

release unlocked lock
Traceback (most recent call last):
File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 545, in dispatch
return method(*args, **kwargs)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/util.py", line 69, in check_login
handler_method(self, *args)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/main.py", line 297, in get
self.generate('main.html', template_values)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/baserequest.py", line 257, in generate
self.response.out.write(template.render(path, values))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/webapp/template.py", line 92, in render
return t.render(Context(template_dict))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/webapp/template.py", line 172, in wrap_render
return orig_render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 173, in render
return self._render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 167, in _render
return self.nodelist.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 794, in render
bits.append(self.render_node(node, context))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 807, in render_node
return node.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/loader_tags.py", line 139, in render
return self.template.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 173, in render
return self._render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 167, in _render
return self.nodelist.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 794, in render
bits.append(self.render_node(node, context))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 807, in render_node
return node.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/defaulttags.py", line 173, in render
nodelist.append(node.render(context))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/defaulttags.py", line 257, in render
return self.nodelist_true.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 794, in render
bits.append(self.render_node(node, context))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 807, in render_node
return node.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/loader_tags.py", line 139, in render
return self.template.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 173, in render
return self._render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 167, in _render
return self.nodelist.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 794, in render
bits.append(self.render_node(node, context))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 807, in render_node
return node.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/loader_tags.py", line 139, in render
return self.template.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 173, in render
return self._render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 167, in _render
return self.nodelist.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 794, in render
bits.append(self.render_node(node, context))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 807, in render_node
return node.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/defaulttags.py", line 173, in render
nodelist.append(node.render(context))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/loader_tags.py", line 139, in render
return self.template.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 173, in render
return self._render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 167, in _render
return self.nodelist.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 794, in render
bits.append(self.render_node(node, context))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 807, in render_node
return node.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/loader_tags.py", line 139, in render
return self.template.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 173, in render
return self._render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 167, in _render
return self.nodelist.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 794, in render
bits.append(self.render_node(node, context))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/__init__.py", line 807, in render_node
return node.render(context)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/_internal/django/template/defaulttags.py", line 136, in render
values = list(values)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/db/__init__.py", line 2312, in next
return self.__model_class.from_entity(self.__iterator.next())
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 2809, in next
next_batch = self.__batcher.next()
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 2671, in next
return self.next_batch(self.AT_LEAST_ONE)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 2708, in next_batch
batch = self.__next_batch.get_result()
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/api/apiproxy_stub_map.py", line 592, in get_result
return self.__get_result_hook(self)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 2450, in __query_result_hook
self._batch_shared.conn.check_rpc_success(rpc)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py",

UPDATE:
Вот полная обратная трассировка для одной из ошибок

46.231.181.199 - sandra.martin [15/May/2012:03:28:44 -0700] "POST /items/ HTTP/1.1" 500 1306 "http://www.gqueues.com/main?q=ag1zfmdxdWV1ZXMtaHJkciILEgdBY2NvdW50GL-psgYMCxILU2hhcmVkUXVldWUYkU4M";;;; "Mozilla/5.0 (Windows NT 6.1; rv:12.0) Gecko/20100101 Firefox/12.0" "www.gqueues.com" ms=59858 cpu_ms=154948 api_cpu_ms=112433 cpm_usd=4.305925 instance=00c61b117cfbf1818fa4b0f779eacc7f4a9030
D 2012-05-15 05:27:45.121
postAction: reorder
E 2012-05-15 05:28:44.714
release unlocked lock
Traceback (most recent call last):
File "/base/python27_runtime/python27_lib/versions/third_party/webapp2-2.3/webapp2.py", line 545, in dispatch
return method(*args, **kwargs)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/util.py", line 139, in check_login
handler_method(self, *args)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/items.py", line 371, in post
self.reorder()
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/items.py", line 856, in reorder
QueueSyncHandler.syncQueueViewers(clientId, itemQueue)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/queuesync.py", line 197, in syncQueueViewers
jsonObj['items']        = util.getJSONItems(queueItems, filter, queueViewer.type, accountObj)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/util.py", line 1536, in getJSONItems
jsonItem['subitems'] = getJSONItems(subItems, filter, type, accountObj, assignmentCompleted=assignmentCompleted, accountEmail=accountEmail)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/util.py", line 1536, in getJSONItems
jsonItem['subitems'] = getJSONItems(subItems, filter, type, accountObj, assignmentCompleted=assignmentCompleted, accountEmail=accountEmail)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/util.py", line 1536, in getJSONItems
jsonItem['subitems'] = getJSONItems(subItems, filter, type, accountObj, assignmentCompleted=assignmentCompleted, accountEmail=accountEmail)
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/controllers/util.py", line 1481, in getJSONItems
if item.hasAssignments():
File "/base/data/home/apps/s~gqueues-hrd/gq3-7-5.358902352474055782/models.py", line 1512, in hasAssignments
itemAssignments = Assignment.all(keys_only=True).filter('item =', self).fetch(1)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/ext/db/__init__.py", line 2143, in fetch
return list(self.run(limit=limit, offset=offset, **kwargs))
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 2809, in next
next_batch = self.__batcher.next()
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 2671, in next
return self.next_batch(self.AT_LEAST_ONE)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 2708, in next_batch
batch = self.__next_batch.get_result()
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/api/apiproxy_stub_map.py", line 592, in get_result
return self.__get_result_hook(self)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_query.py", line 2450, in __query_result_hook
self._batch_shared.conn.check_rpc_success(rpc)
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/datastore/datastore_rpc.py", line 1206, in check_rpc_success
rpc.wait()
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/api/apiproxy_stub_map.py", line 533, in wait
self.__rpc.Wait()
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/api/apiproxy_rpc.py", line 119, in Wait
rpc_completed = self._WaitImpl()
File "/base/python27_runtime/python27_lib/versions/1/google/appengine/runtime/apiproxy.py", line 134, in _WaitImpl
self.__done_event.wait()
File "/base/python27_runtime/python27_dist/lib/python2.7/threading.py", line 407, in wait
self.__cond.release()
error: release unlocked lock

Дедлоки — распространенная проблема в многопоточном программировании. В больших приложениях вручную отслеживать порядок блокировок может быть достаточно сложно, причем эта проблема может не всплыть на этапе тестирования и случиться только в каких-то сложновоспроизводимых кейсах при реальном использовании. Существует много способов их избегания, но здесь мы рассмотрим только один — автоматическое выявление дедлоков на основе графа ожидания.

Эта статья — продолжение серии про многопоточное программирование на Python. Предыдущая была про хранение настроек в многопоточном приложении, но читать её для понимания этой не обязательно. Вся серия предназначена для программистов, знакомых с базовыми концепциями многопоточного программирования.

Примеры кода, которые вы здесь увидите, взяты из библиотеки locklib.

Как можно обнаружить дедлок?

Вообще говоря, существует два основных подхода к обнаружению дедлоков в программе:

  1. На этапе статического анализа исходного кода или компиляции.

  2. В рантайме.

К сожалению, статический анализ работает только в каких-то строго определённых случаях. Общего решения здесь не существует, поскольку оно было бы эквивалентно решению проблемы останова, а это считается невозможным. Кроме того, даже существующие способы статического обнаружения гораздо лучше работают в статически типизированных языках (например, в Go или C++). Для Python ни одного вменяемого решения при подготовке статьи я не нашёл.

Все способы обнаружения дедлоков в рантайме так или иначе усложняют или замедляют программу. Чем точнее способ, тем больше создаваемый им оверхед. К примеру, в том же Go используется очень неточный, но потому и малозатратный способ обнаружения дедлоков. Там дедлок будет обнаружен только если в программе не останется вообще ни одного «живого» потока. С другой стороны, способы на основе графа ожидания гораздо дороже, но могут обнаруживать любые дедлоки.

Что такое граф ожидания?

Обычно под капотом у мьютекса есть очередь, в которую за ним выстраиваются потоки. Когда один поток освобождает мьютекс, следующий в очереди разблокируется и делает свои дела. Таким образом, каждый из ожидающих потоков всегда либо ожидает какой-то конкретный другой поток, либо (если он первый в очереди) приступает к своим делам сразу. Получается, что мы можем построить граф ожидающих друг друга потоков, где нодами будут потоки, а рёбрами — объекты мьютексов. Дедлок будет выглядеть как цикл на таком графе.

Если мы хотим избежать дедлоков, нам нужно перед каждой операцией взятия лока проверять граф на наличие циклов обходом в глубину и в случае обнаружения — блокировку не давать. Альтернативный вариант (но здесь мы на нём подробно останавливаться не будем) заключается в том, чтобы блокировку дать сразу, но с таймаутом — и идти в граф только в случае его превышения.

Графы ожидания сегодня широко используются, к примеру, в СУБД (PostgreSQL таким образом ищет дедлоки в транзакциях, а в MySQL так делалось в старых версиях, сейчас же применяется похожий, но несколько модифицированный алгоритм), однако этот метод не лишён недостатков. Главный из них состоит в том, что получение каждой блокировки становится не просто дороже (поскольку включает в себя дополнительные действия), но и требует, по крайней мере на некоторое время, взять глобальную блокировку, которой, в свою очередь, защищён сам граф ожидания. Это приводит к увеличению доли последовательного кода в программе и, согласно закону Амдала, бьёт по выгоде от параллелизации вычислений вообще.

Приступим

Наша задача — написать нечто, что для пользователя будет работать примерно как обычный Lock из стандартной библиотеки (правда, без поддержки опциональных аргументов в данной конкретной реализации, то есть, например, без возможности указать продолжительность таймаута), но чтобы при попытке взять блокировку, приводящую к дедлоку, поднималось исключение. Внутри этой штуки, очевидно, где-то должен быть граф — и за него мы возьмёмся в первую очередь.

«Верхушка» класса, объектом которого является граф, выглядит так:

from threading import Lock
from collections import defaultdict

from locklib.errors import DeadLockError # Исключение, которое должно подниматься при обнаружении дедлока.


class LocksGraph:
    def __init__(self):
        self.links = defaultdict(set)
        self.lock = Lock()

Как видно, для защиты доступа к графу используется обычный Lock. Это необходимо, поскольку иначе разные потоки смогут редактировать его параллельно — и в отдельные моменты времени «посередине» он может оказаться неконсистентным. Кроме того, для хранения связей в графе мы используем словарь, в котором ключи — идентификаторы потоков, а значения — множества, заполненные теми же самыми идентификаторами.

По сути, наш граф должен поддерживать только две базовые операции:

  1. Создать связь между двумя потоками. При этом нужно проверить, не образуется ли от этого цикл, — и, если да, поднять DeadLockError.

  2. Удалить её.

Добавление связи выглядит так:

def add_link(self, _from, to):
    cycle_with = self.search_cycles(to, _from)
    if cycle_with is not None:
        cycle_with = ', '.join([str(x) for x in cycle_with])
        raise DeadLockError(f'A cycle between {_from}th and {to}th threads has been detected. The full path of the cycle: {cycle_with}.')
    self.links[_from].add(to)

А удаление — так:

def delete_link(self, _from, to):
    if _from in self.links:
        if to in self.links[_from]:
            self.links[_from].remove(to)
        if not self.links[_from]:
            del self.links[_from]

Мы могли бы здесь не делать лишних проверок — тогда методы выглядели бы ещё проще.

Ну и центральная часть — обход графа для обнаружения циклов — это всего лишь три коротких метода:

def get_links_from(self, _from):
    return self.links[_from]

def dfs(self, path, current_node, target):
    path.append(current_node)

    neighbors = self.get_links_from(current_node)

    if neighbors:
        for link in neighbors:
            if link == target:
                path.append(target)
                return path
            result_of_next_search = self.dfs(path, link, target)
            if result_of_next_search is not None:
                return result_of_next_search

    path.pop()

def search_cycles(self, _from, to):
    return self.dfs([], _from, to)

Входной точкой здесь служит метод search_cycles(), который по итогу возвращает None, если цикл не найден, либо список идентификаторов потоков, его образующих, если таки да. Список потом используется для составления человекочитаемого описания в поднимаемом исключении.

Итак, как видим, в графе нет ничего сложного. Из важного хотелось бы отметить, что внутри методов графа нигде не используется прикреплённая к нему блокировка. На самом деле она предназначена для использования в вызывающем коде, а не в самом графе. Собственно, к вызывающему коду мы и переходим.

«Верхушка» класса лока выглядит так:

from threading import Lock, get_native_id
from collections import deque

from locklib.locks.smart_lock.graph import LocksGraph


graph = LocksGraph()

class SmartLock:
    def __init__(self, local_graph=graph):
        self.graph = local_graph
        self.lock = Lock()
        self.deque = deque()
        self.local_locks = {}

Как видим, здесь используется глобальная переменная — экземпляр графа, который мы описали выше. Также мы видим экземпляр обычного лока — ведь наш является, по сути, обёрткой вокруг него.

self.deque и self.local_locks используются, как мы увидим дальше, для воспроизведения семантики обычного лока, внутри которого находится очередь ожидающих тредов. В словарь self.local_locks каждый поток кладёт экземпляр лока под ключом в виде собственного идентификатора. В очереди self.deque последовательно лежат идентификаторы потоков, которые попытались взять данную блокировку. Каждый новый поток добавляется в очередь, кладёт в словарь свой лок и берёт лок предыдущего потока. Получается что-то вроде этого:

Объект лока предлагает, по сути, два возможных действия: взять лок и отпустить его. В данном случае они представлены методами acquire() и release().

Начнём с acquire(). Алгоритм действий здесь следующий:

1. Взять лок внутри нашего объекта лока.
2. Взять лок на граф.
3. Если данный поток берёт эту блокировку первым (то есть он первый в очереди):
3.1. Положить id текущего потока в очередь.
3.2. Используя id потока в качестве ключа, положить в словарь self.local_locks объект обычного лока и сразу взять его.
4. Если очередь ожидающих потоков не пуста (то есть данный поток не первый, кто взял этот лок):
4.1. Создать связь в графе между идентификатором текущего потока и идентификатором последнего потока в очереди.
4.2. Положить id текущего потока в очередь.
4.3. Используя id потока в качестве ключа, положить в словарь self.local_locks объект обычного лока и сразу взять его.
4.4. Взять из очереди id предыдущего потока и по нему извлечь из словаря объект блокировки, который тот туда положил.
5. Отпустить лок на граф.
6. Отпустить лок внутри нашего лока.
7. Если очередь потоков была не пуста:
7.1. Взять блокировку, оставленную предыдущим потоком, которую мы успели извлечь из словаря на шаге 4.4.

В коде же это выглядит так:

def acquire(self):
    id = get_native_id()
    previous_element_lock = None

    with self.lock:
        with self.graph.lock:
            if not self.deque:
                self.deque.appendleft(id)
                self.local_locks[id] = Lock()
                self.local_locks[id].acquire()
            else:
                previous_element = self.deque[0]
                self.graph.add_link(id, previous_element)
                self.deque.appendleft(id)
                self.local_locks[id] = Lock()
                self.local_locks[id].acquire()
                previous_element_lock = self.local_locks[previous_element]

    if previous_element_lock is not None:
        previous_element_lock.acquire()

Теперь release(). Начнём снова с алгоритма:

1, 2. Взять локи внутри нашего объекта лока и внутри графа.
3. Удалить id текущего потока из очереди.
4. Достать из словаря self.local_locks объект лока по id текущего потока и сохранить его в переменную.
5. Удалить из словаря self.local_locks ключ, соответствующий id текущего потока.
6. Если за текущим потоком в очереди есть другие потоки:
6.1. Получить из очереди id потока, который попытался взять блокировку следующим.
6.2. Удалить из графа связь между текущим потоком и следующим.
7. Отпустить лок, извлечённый из словаря на шаге 4.
8, 9. Отпустить лок на граф и лок на лок.

И снова код:

def release(self):
    id = get_native_id()

    with self.lock:
        with self.graph.lock:
            if id not in self.local_locks:
                raise RuntimeError('Release unlocked lock.')

            self.deque.pop()
            lock = self.local_locks[id]
            del self.local_locks[id]

            if len(self.deque) != 0:
                next_element = self.deque[-1]
                self.graph.delete_link(next_element, id)

            lock.release()

Важным отличием от стандартного Lock() здесь является то, что отпустить наш лок может только взявший его поток, в то время как стандартный Lock() может отпустить и любой другой. Иначе пришлось бы сильно усложнить логику, связанную с перестройкой очереди и словаря с локами, для поддержания консистентности внутреннего состояния.

Что с производительностью?

Очевидно, что, поскольку наш лок является обёрткой над примитивом из стандартной библиотеки, добавляющей обязательные действия к каждой операции взятия/отпускания, он гарантированно будет медленнее. Вопрос только в том, насколько.

За основу возьмём скорость работы стандартного Lock, которую замерим с помощью следующего кода:

from time import perf_counter
from threading import Lock

lock = Lock()

t1 = perf_counter()

for _ in range(100000000):
    lock.acquire()
    lock.release()

print(perf_counter() - t1)

На моём компьютере (MacBook Air на M1) при первом запуске этого кода вышло 18,812158417 секунды на 100 000 000 итераций, то есть примерно 5 315 711 итераций в секунду.

Теперь протестируем наш лок (код отличается только во второй строчке):

from time import perf_counter
from locklib import SmartLock as Lock

lock = Lock()

t1 = perf_counter()

for _ in range(100000000):
    lock.acquire()
    lock.release()

print(perf_counter() - t1)

У меня при первом запуске вышло 216,22566979200002 секунды, то есть примерно 462 479 итераций в секунду. Это чуть более, чем в десять раз медленнее.

Я протестировал время работы в вырожденном случае, когда один поток постоянно берёт и отпускает блокировку, однако в реальности оно может варьироваться в широких пределах, в зависимости от размера и конкретной топологии графа ожидания.

Стоит оно того или нет — решать вам. Если время взятия/отпускания лока ничтожно по сравнению с прочими операциями, выполняемыми внутри потока, это может быть весьма выгодной сделкой, ведь взамен вы получаете безопасность вашего кода.

А это можно скачать бесплатно из интернета?

Библиотека доступна в PyPI:

$ pip install locklib

Пробуем импортировать из нее лок и спровоцировать состояние гонки:

from threading import Thread
from locklib import SmartLock


lock_1 = SmartLock()
lock_2 = SmartLock()

def function_1():
  while True:
    with lock_1:
      with lock_2:
        pass

def function_2():
  while True:
    with lock_2:
      with lock_1:
        pass

thread_1 = Thread(target=function_1)
thread_2 = Thread(target=function_2)
thread_1.start()
thread_2.start()

Получаем исключение!

...
locklib.errors.DeadLockError: A cycle between 1970256th and 1970257th threads has been detected.

Итак, мы разобрались:

  1. Какие вообще существуют способы обнаружения дедлоков и что такое граф ожидания.

  2. Как реализовать граф ожидания на Python.

  3. Как он влияет на производительность в сравнении с обычной блокировкой.

  4. Где можно скачать библиотеку с приведённым в статье кодом.

Теперь, когда у вас появилось новое знание, помните, что знание — сила, а большая сила — это большая ответственность.

Recommend Projects

  • React photo

    React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo

    Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo

    Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo

    TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo

    Django

    The Web framework for perfectionists with deadlines.

  • Laravel photo

    Laravel

    A PHP framework for web artisans

  • D3 photo

    D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Visualization

    Some thing interesting about visualization, use data art

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo

    Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo

    Microsoft

    Open source projects and samples from Microsoft.

  • Google photo

    Google

    Google ❤️ Open Source for everyone.

  • Alibaba photo

    Alibaba

    Alibaba Open Source for everyone

  • D3 photo

    D3

    Data-Driven Documents codes.

  • Tencent photo

    Tencent

    China tencent open source team.

Синхронизация потоков при помощи блокировок.

Синтаксис:

import threading

lock = threading.Lock()

Параметры:

  • нет.

Возвращаемое значение:

  • объект Lock.

Описание:

Класс Lock() модуля threading, реализует примитивные объекты блокировки. Как только поток получил блокировку, то последующие попытки получить его блокируются, пока поток не будет разблокирован. Любой поток может снять блокировку.

Обратите внимание, что класс threading.Lock() на самом деле является фабричным, который возвращает экземпляр наиболее эффективной версии конкретного класса threading.Lock(), поддерживаемого платформой.

Примитивная блокировка потока — это примитив синхронизации, который при блокировке не принадлежит конкретному потоку. В Python в настоящее время это самый низкий из доступных примитивов синхронизации.

Примитивная блокировка находится в одном из двух состояний: «locked» или «unlocked«. Экземпляр класса создается в разблокированном «unlocked» состоянии. У него есть два основных метода: Lock.acquire() и Lock.release().

Когда состояние «unlocked», то метод Lock.acquire() изменяет состояние на «locked» и немедленно возвращает результат экземпляру. Когда состояние «locked», то вызов метода Lock.acquire() блокируется до тех пор, пока вызов метода Lock.release() в другом потоке не изменит его на «unlocked», затем вызов Lock.acquire() сбрасывает его в «locked» и снова возвращает результат экземпляру.

Метод Lock.release() следует вызывать только когда экземпляр Lock находится в состоянии «locked». Этот метод меняет состояние экземпляра Lock на «unlocked» и немедленно возвращает результат экземпляру. Если будет сделана попытка вызвать Lock.release() для состояния «unlocked», то будет вызвана ошибка RuntimeError.

Когда в Lock.acquire() блокируется более одного потока, ожидающего перехода состояния в «unlocked», то после вызова Lock.release() только один поток переходит в состояние «unlocked». Какой из ожидающих потоков начинает работать, не определено и может варьироваться в разных реализациях.

Методы объекта threading.Lock.

Все методы выполняются атомарно.

  • Lock.acquire() устанавливает блокировку,
  • Lock.release() снимает блокировку,
  • Lock.locked() проверяет состояние блокировки,
  • Общий пример использования блокировки,
  • Применение блокировки при многопоточном объединении файлов.

Lock.acquire(blocking=True, timeout=-1):

Метод Lock.acquire() устанавливает блокировку, блокирующую или неблокирующую.

При вызове метода с аргументом blocking, установленным в True (по умолчанию) — блокирует потоки до тех пор, пока блокировка не будет снята, затем снова установит ее в состояние «locked» и вернет True.

При вызове метода с аргументом blocking, установленным в False, не ставит блокировку, а проверит, сможет ли метод с blocking=True поставить блокировку, если нет, то немедленно вернет False, в противном случае установит блокировку и возвратит True.

При вызове с аргументом timeout (значение может быть float), установленным в положительное значение, будет блокировать выполнение кода не более чем на количество секунд, заданное величиной timeout и до тех пор, пока блокировка не будет получена.

Аргумент тайм-аута timeout, равный -1, указывает на неограниченное ожидание. Запрещается указывать тайм-аут timeout при ложной блокировке.

Метод Lock.acquire() возвращает значение True, если блокировка получена успешно и False, если нет (например, если истекло время ожидания timeout).

Lock.release():

Метод Lock.release() снимает блокировку. Метод может быть вызван из любого потока, а не только из потока, который получил блокировку.

Когда блокировка включена, этот метод сбрасывает его до состояния «unlocked» и возвращает результат своему экземпляру. Если какие-либо другие потоки заблокированы, ожидая снятия блокировки, разрешает выполнение ровно одному из них.

Метод ничего не возвращает.

Lock.locked():

Метод Lock.locked() возвращает True, если блокировка получена.


Общие примеры использования блокировок threading.Lock().

Важно иметь возможность контролировать доступ к общим ресурсам для предотвращения повреждения или пропуска данных. Встроенные в Python структуры данных (списки, словари и т. д.) являются поточно-ориентированными, поскольку глобальная блокировка интерпретатора GIL, используемая для защиты внутренних структур данных Python, не снимается в середине обновления.

Другие структуры данных, реализованные в Python или более простые типы, такие как целые числа и числа с плавающей запятой, не имеют такой защиты. Для защиты от одновременного доступа к объекту нескольких потоков используйте объект threading.Lock().

В примере функция worker() увеличивает экземпляр счетчика, который управляет блокировкой, чтобы предотвратить одновременное изменение внутреннего состояния двух потоков. Если бы блокировка не использовалась, то была бы вероятность пропуска изменения атрибута value.

import threading, random, time

class Counter():

    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start

    def increment(self):
        th_name = threading.current_thread().name
        print(f'Th: {th_name} - ждет блокировку')
        self.lock.acquire()
        try:
            print(f'Th: {th_name} - получил блокировку')
            self.value = self.value + 1
        finally:
            self.lock.release()


def worker(c):
    for i in range(2):
        pause = random.random()
        th_name = threading.current_thread().name
        print(f'Th: {th_name} - заснул на {pause:0.02f}')
        time.sleep(pause)
        c.increment()
    print(f'Th: {th_name} - сделано.')


counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

print('Ожидание рабочих потоков')
main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
print(f'Счетчик: {counter.value}')


# Th: Thread-1 - заснул на 0.34
# Th: Thread-2 - заснул на 0.26
# Ожидание рабочих потоков
# Th: Thread-2 - ждет блокировку
# Th: Thread-2 - получил блокировку
# Th: Thread-2 - заснул на 0.34
# Th: Thread-1 - ждет блокировку
# Th: Thread-1 - получил блокировку
# Th: Thread-1 - заснул на 0.33
# Th: Thread-2 - ждет блокировку
# Th: Thread-2 - получил блокировку
# Th: Thread-2 - сделано.
# Th: Thread-1 - ждет блокировку
# Th: Thread-1 - получил блокировку
# Th: Thread-1 - сделано.
# Счетчик: 4

Чтобы узнать, получил ли другой поток блокировку, не задерживая текущий поток, необходимо передать False для аргумента blocking в методе Lock.acquire(). В следующем примере worker() пытается получить блокировку три раза и подсчитывает, сколько для этого нужно сделать попыток. Между тем, lock_holder() циклически переключает блокировку между удержанием и снятием блокировки с короткими паузами в каждом состоянии, для имитации нагрузки.

Функции worker() требуется более трех итераций, чтобы получить блокировку три отдельных раза.

import logging, threading, time


def lock_holder(lock):
    logging.debug('Запуск')
    while True:
        lock.acquire()
        try:
            logging.debug('Нагрузка...')
            time.sleep(0.5)
        finally:
            logging.debug('Работа закончена')
            lock.release()
        time.sleep(0.5)


def worker(lock):
    logging.debug('Запуск')
    num_tries = 0
    num_acquires = 0
    while num_acquires < 3:
        time.sleep(0.5)
        logging.debug('Попытка блокировки ресурса')
        have_it = lock.acquire(blocking=False)
        try:
            num_tries += 1
            if have_it:
                logging.debug(f'Попытка №{num_tries}: ПОЛУЧИЛОСЬ')
                num_acquires += 1
            else:
                logging.debug(f'Попытка №{num_tries}: НЕ ПОЛУЧИЛОСЬ')
        finally:
            if have_it:
                lock.release()
    logging.debug('Ресурс успешно блокирован 3 раза, после {num_tries} попыток...')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()

holder = threading.Thread(
    target=lock_holder,
    args=(lock,),
    name='LockHolder',
    daemon=True,
)
holder.start()

worker = threading.Thread(
    target=worker,
    args=(lock,),
    name='Worker',
)
worker.start()


# (LockHolder) Запуск
# (LockHolder) Нагрузка...
# (Worker    ) Запуск
# (LockHolder) Работа закончена
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №1: ПОЛУЧИЛОСЬ
# (LockHolder) Нагрузка...
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №2: НЕ ПОЛУЧИЛОСЬ
# (LockHolder) Работа закончена
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №3: ПОЛУЧИЛОСЬ
# (LockHolder) Нагрузка...
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №4: НЕ ПОЛУЧИЛОСЬ
# (LockHolder) Работа закончена
# (Worker    ) Попытка блокировки ресурса
# (Worker    ) Попытка №5: ПОЛУЧИЛОСЬ
# (Worker    ) Ресурс успешно блокирован 3 раза, после 5 попыток...

Пример блокировки общего ресурса при многопоточном объединении файлов.

В этом примере выбираются все текстовые файлы из директории test_dir и объединяются в один multi-thead-file.txt. Программа читает и обрабатывает файлы из каталога test_dir и пишет в общий файл multi-thead-file.txt в несколько потоков.

Предупреждение

. Выполнение этого кода не даст прирост производительности по сравнению с однопоточным режимом, т.к. запись в общий ресурс блокируется и по сути программа становиться однопоточной. Этот пример приведен чисто в учебных целях, что бы понять как организовать доступ к общему ресурсу из разных потоков не нарушая его целостности и увидеть применение класса threading.Lock() на практическом примере.

Здесь блокировка threading.Lock() используется, для того, что бы предотвратить одновременный доступ из других потоков к файлу с общими данными multi-thead-file.txt. Если убрать блокировку, то в итоговом файле можно увидеть, что строки из разных файлов перемешаны.

Места в коде, где используется блокировка, помечены !!!!! с обоих концов комментария.

Что бы запустить данный пример, необходимо подготовить данные скриптом prepare-data.py, приведенным в обзорной статье к модулю threading или использовать свои текстовые файлы (измените переменную test_dir).

import pathlib, threading, time, queue

class Worker(threading.Thread):

    def __init__(self, que, write_file, lock):
        super().__init__()
        self.daemon = True
        self.files_queue = files_queue
        self.write_file = write_file
        self.lock = lock
        
    # переопределяем метод
    def run(self):
        while True:
            # !!!!! блокируем доступ к файлу из других потоков, 
            # !!!!! что бы строки не писались вперемешку из других
            # !!!!! открытых файлов. Для блокировки/разблокировки
            # !!!!! используем менеджер контекста
            with self.lock: 
                # Получаем задание (имя файла) из очереди
                job = self.files_queue.get()
                print(f'Th:{self.name} обработка {job.name}')
                # открываем файл `job` на чтение, а `write_file` на 
                # ДОБАВЛЕНИЕ 'a+' данных к файлу с общими данными 
                with open(job, 'r') as fread, open(self.write_file, 'a+') as fwrite:
                    # пишем имя открытого файла
                    fwrite.write(f'nnДанные из файла: {job.name}nn')
                    # читаем данные построчно (экономим память)
                    for line in fread:
                        # здесь обрабатываем строку, 
                        # например, заменим букву у на 0
                        line = line.replace('у', '0')
                        # потом пишем в файл
                        fwrite.write(line)

            # Сообщаем очереди, что задача выполнена
            self.files_queue.task_done()


path = pathlib.Path('.')
# каталог с файлами
test_dir = 'test_dir'
path_dir = path.joinpath(test_dir)
# список файлов
list_files = path_dir.glob('*.txt')

# создаем и заполняем очередь именами файлов
files_queue = queue.Queue()
for file in list_files:
    files_queue.put(file)

# общий файл данных
write_file = 'multi-thead-file.txt'

if files_queue.empty():
    print('НЕТ файлов для обработки.') 
else:
    # !!!!! создаем блокировщик !!!!! 
    lock = threading.Lock()
    # Создаем и запускаем, например 3 потока
    for _ in range(3):
        th = Worker(files_queue, write_file, lock)
        th.start()

    # Блокируем выполнение программы до тех пор пока
    # потоки не обслужат все элементы очереди
    files_queue.join()

Понравилась статья? Поделить с друзьями:
  • Runtime error regsvr32 r6034
  • Runtime error r6035
  • Runtime error r6034 gta san andreas
  • Runtime error r6034 an application
  • Runtime error r6030 crt not initialized как исправить