[datafeed] attempt to acquire synchronization lock even when force is given

instead of the implementation in e717da3dc164, raise an error if the lock is already grabbed and catch this error in the caller.

See discussion on https://www.cubicweb.org/revision/10790765

Closes #10451635

authorSylvain Thénault <sylvain.thenault@logilab.fr>
changeset8bad8fd44e34
branchdefault
phasedraft
hiddenyes
parent revision#2dafcdd19c99 pep8 bits
child revision<not specified>
files modified by this revision
cubicweb/server/serverctl.py
cubicweb/server/sources/datafeed.py
# HG changeset patch
# User Sylvain Thénault <sylvain.thenault@logilab.fr>
# Date 1455814272 -3600
# Thu Feb 18 17:51:12 2016 +0100
# Node ID 8bad8fd44e344f8d278d3d8f7951f1901937bb66
# Parent 2dafcdd19c9918f6d0bde86b6afdcaf07c1722b3
[datafeed] attempt to acquire synchronization lock even when force is given

instead of the implementation in e717da3dc164, raise an error if the lock is
already grabbed and catch this error in the caller.

See discussion on https://www.cubicweb.org/revision/10790765

Closes #10451635

diff --git a/cubicweb/server/serverctl.py b/cubicweb/server/serverctl.py
@@ -33,11 +33,11 @@
1  from logilab.common.configuration import Configuration, merge_options
2  from logilab.common.shellutils import ASK, generate_password
3 
4  from logilab.database import get_db_helper, get_connection
5 
6 -from cubicweb import AuthenticationError, ExecutionError, ConfigurationError
7 +from cubicweb import AuthenticationError, ExecutionError, ConfigurationError, SourceException
8  from cubicweb.toolsutils import Command, CommandHandler, underline_title
9  from cubicweb.cwctl import CWCTL, check_options_consistency, ConfigureInstanceCommand
10  from cubicweb.server import SOURCE_TYPES
11  from cubicweb.server.serverconfig import (
12      USER_OPTIONS, ServerConfiguration, SourceConfiguration,
@@ -999,11 +999,14 @@
13              try:
14                  source = repo.sources_by_uri[args[1]]
15              except KeyError:
16                  raise ExecutionError('no source named %r' % args[1])
17              with repo.internal_cnx() as cnx:
18 -                stats = source.pull_data(cnx, force=True, raise_on_error=True)
19 +                try:
20 +                    stats = source.pull_data(cnx, force=True, raise_on_error=True)
21 +                except SourceException as exc:
22 +                    print("can't synchronize the source:", exc)
23          finally:
24              repo.shutdown()
25          for key, val in stats.items():
26              if val:
27                  print(key, ':', val)
diff --git a/cubicweb/server/sources/datafeed.py b/cubicweb/server/sources/datafeed.py
@@ -32,11 +32,11 @@
28  from pytz import utc
29  from lxml import etree
30 
31  from logilab.common.deprecation import deprecated
32 
33 -from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid
34 +from cubicweb import RegistryNotFound, ObjectNotFound, ValidationError, UnknownEid, SourceException
35  from cubicweb.server.repository import preprocess_inlined_relations
36  from cubicweb.server.sources import AbstractSource
37  from cubicweb.appobject import AppObject
38 
39 
@@ -169,18 +169,15 @@
40          self.latest_retrieval = datetime.now(tz=utc)
41          cnx.execute('SET X latest_retrieval %(date)s WHERE X eid %(x)s',
42                      {'x': self.eid, 'date': self.latest_retrieval})
43          cnx.commit()
44 
45 -    def acquire_synchronization_lock(self, cnx, force=False):
46 +    def acquire_synchronization_lock(self, cnx):
47          # XXX race condition until WHERE of SET queries is executed using
48          # 'SELECT FOR UPDATE'
49          now = datetime.now(tz=utc)
50 -        if force:
51 -            maxdt = now
52 -        else:
53 -            maxdt = now - self.max_lock_lifetime
54 +        maxdt = now - self.max_lock_lifetime
55          if not cnx.execute(
56                  'SET X in_synchronization %(now)s WHERE X eid %(x)s, '
57                  'X in_synchronization NULL OR X in_synchronization < %(maxdt)s',
58                  {'x': self.eid, 'now': now, 'maxdt': maxdt}):
59              self.error('concurrent synchronization detected, skip pull')
@@ -200,11 +197,13 @@
60          This method is responsible to handle commit/rollback on the given
61          connection.
62          """
63          if not force and self.fresh():
64              return {}
65 -        if not self.acquire_synchronization_lock(cnx, force):
66 +        if not self.acquire_synchronization_lock(cnx):
67 +            if force:
68 +                raise SourceException("a concurrent synchronization is already running")
69              return {}
70          try:
71              return self._pull_data(cnx, force, raise_on_error)
72          finally:
73              cnx.rollback() # rollback first in case there is some dirty