[PATCH 01 of 23 cubicweb V2] [database/pool/fix] use thread safe counter

Laurent Peuch cortex at worlddomination.be
Fri Jan 17 14:36:58 CET 2020


# HG changeset patch
# User Laurent Peuch <cortex at worlddomination.be>
# Date 1579263387 -3600
#      Fri Jan 17 13:16:27 2020 +0100
# Node ID dfaeea73d730524c596a49c46cfe680bf8fcde48
# Parent  2c0b217709d3bff1654b0f4167815928dbd308ce
# Available At https://hg.logilab.org/users/lpeuch/cubicweb
#              hg pull https://hg.logilab.org/users/lpeuch/cubicweb -r dfaeea73d730
[database/pool/fix] use thread safe counter

diff --git a/cubicweb/server/repository.py b/cubicweb/server/repository.py
--- a/cubicweb/server/repository.py
+++ b/cubicweb/server/repository.py
@@ -30,6 +30,7 @@ from itertools import chain
 from contextlib import contextmanager
 from logging import getLogger
 import queue
+import threading
 
 from logilab.common.decorators import cached, clear_cache
 
@@ -143,6 +144,17 @@ class NullEventBus(object):
         pass
 
 
+# based on Julien Danjou's work https://julien.danjou.info/atomic-lock-free-counters-in-python/
+class FastReadCounter(object):
+    def __init__(self, value=0):
+        self.value = value
+        self._lock = threading.Lock()
+
+    def increment(self):
+        with self._lock:
+            self.value += 1
+
+
 class _CnxSetPool:
 
     def __init__(self, source, size, min_size=3, min_timeout=0.1, max_timeout=5):
@@ -160,7 +172,7 @@ class _CnxSetPool:
             if self.min_size > self.size:
                 self.min_size = max(1, int(self.size / 3))
 
-            self.current_size = self.min_size
+            self.current_size = FastReadCounter(value=self.min_size)
 
             for i in range(self.min_size):
                 cnxset = source.wrapped_connection()
@@ -188,16 +200,16 @@ class _CnxSetPool:
             except queue.Empty:
                 pass  # probably for some race condition we missed it
 
-        if self.current_size < self.size:
+        if self.current_size.value < self.size:
             try:
                 return self._queue.get(block=True, timeout=self.min_timeout)
             except queue.Empty:
                 # size could have increased during waiting
-                if self.current_size < self.size:
+                if self.current_size.value < self.size:
                     # we have load, open another connection
                     cnxset = self.source.wrapped_connection()
                     self._cnxsets.append(cnxset)
-                    self.current_size += 1
+                    self.current_size.increment()
                     return cnxset
 
         # here, we can't open new connections because we are full, just wait



More information about the cubicweb-devel mailing list