[zmq] Implement a ZMQ-based Repository (closes #2290125)

This Repository server behave mainly like the Pyro-based repository.

authorDavid Douard <david.douard@logilab.fr>
changesete1c05bf6fdeb
branchdefault
phasepublic
hiddenno
parent revision#fdb796435d7b backport stable
child revision#02f4f01375e8 [repository] fire 'server_shutdown' hooks before waiting for theads
files modified by this revision
hooks/zmq.py
server/cwzmq.py
server/serverconfig.py
server/serverctl.py
# HG changeset patch
# User David Douard <david.douard@logilab.fr>
# Date 1333551069 -7200
# Wed Apr 04 16:51:09 2012 +0200
# Node ID e1c05bf6fdeb6aa0f615fd5a719697eb4d0d2047
# Parent fdb796435d7b6afb4e5b68fdfb97468cdb1edb19
[zmq] Implement a ZMQ-based Repository (closes #2290125)

This Repository server behave mainly like the Pyro-based repository.

diff --git a/hooks/zmq.py b/hooks/zmq.py
@@ -44,5 +44,32 @@
1          self.repo.app_instances_bus.add_subscription('delete', clear_cache_callback)
2          for address in config.get('zmq-address-sub'):
3              self.repo.app_instances_bus.add_subscriber(address)
4          self.repo.app_instances_bus.start()
5 
6 +
7 +class ZMQRepositoryServerStopHook(hook.Hook):
8 +    __regid__ = 'zmqrepositoryserverstop'
9 +    events = ('server_shutdown',)
10 +
11 +    def __call__(self):
12 +        server = getattr(self.repo, 'zmq_repo_server', None)
13 +        if server:
14 +            self.repo.zmq_repo_server.quit()
15 +
16 +class ZMQRepositoryServerStartHook(hook.Hook):
17 +    __regid__ = 'zmqrepositoryserverstart'
18 +    events = ('server_startup',)
19 +
20 +    def __call__(self):
21 +        config = self.repo.config
22 +        if config.name == 'repository':
23 +            # start-repository command already starts a zmq repo
24 +            return
25 +        address = config.get('zmq-repository-address')
26 +        if not address:
27 +            return
28 +        from cubicweb.server import cwzmq
29 +        self.repo.zmq_repo_server = server = cwzmq.ZMQRepositoryServer(self.repo)
30 +        server.connect(address)
31 +        self.repo.threaded_task(server.run)
32 +
diff --git a/server/cwzmq.py b/server/cwzmq.py
@@ -16,16 +16,20 @@
33  #
34  # You should have received a copy of the GNU Lesser General Public License along
35  # with CubicWeb.  If not, see <http://www.gnu.org/licenses/>.
36 
37  from threading import Thread
38 +import cPickle
39 +import traceback
40 +
41  import zmq
42  from zmq.eventloop import ioloop
43  import zmq.eventloop.zmqstream
44 
45  from logging import getLogger
46  from cubicweb import set_log_methods
47 +from cubicweb.server.server import QuitEvent
48 
49  ctx = zmq.Context()
50 
51  class ZMQComm(object):
52      def __init__(self):
@@ -103,7 +107,135 @@
53      def subscribe(self, topic, callback):
54          self.dispatch_table[topic] = callback
55          self.ioloop.add_callback(lambda: self.stream.setsockopt(zmq.SUBSCRIBE, topic))
56 
57 
58 +class ZMQRepositoryServer(object):
59 +
60 +    def __init__(self, repository):
61 +        """make the repository available as a PyRO object"""
62 +        self.address = None
63 +        self.repo = repository
64 +        self.socket = None
65 +        self.stream = None
66 +        self.loop = None
67 +
68 +        # event queue
69 +        self.events = []
70 +
71 +    def connect(self, address):
72 +        self.address = address
73 +
74 +    def run(self):
75 +        """enter the service loop"""
76 +        # start repository looping tasks
77 +        self.socket = ctx.socket(zmq.REP)
78 +        self.loop = ioloop.IOLoop()
79 +        self.stream = zmq.eventloop.zmqstream.ZMQStream(self.socket, io_loop=self.loop)
80 +        self.stream.bind(self.address)
81 +        self.info('ZMQ server bound on: %s', self.address)
82 +
83 +        self.stream.on_recv(self.process_cmds)
84 +
85 +        try:
86 +            self.loop.start()
87 +        except zmq.ZMQError:
88 +            self.warning('ZMQ event loop killed')
89 +        self.quit()
90 +
91 +    def trigger_events(self):
92 +        """trigger ready events"""
93 +        for event in self.events[:]:
94 +            if event.is_ready():
95 +                self.info('starting event %s', event)
96 +                event.fire(self)
97 +                try:
98 +                    event.update()
99 +                except Finished:
100 +                    self.events.remove(event)
101 +
102 +    def process_cmd(self, cmd):
103 +        """Delegate the given command to the repository.
104 +
105 +        ``cmd`` is a list of (method_name, args, kwargs)
106 +        where ``args`` is a list of positional arguments
107 +        and ``kwargs`` is a dictionnary of named arguments.
108 +
109 +        >>> rset = delegate_to_repo(["execute", [sessionid], {'rql': rql}])
110 +
111 +        :note1: ``kwargs`` may be ommited
112 +
113 +            >>> rset = delegate_to_repo(["execute", [sessionid, rql]])
114 +
115 +        :note2: both ``args`` and ``kwargs`` may be omitted
116 +
117 +            >>> schema = delegate_to_repo(["get_schema"])
118 +            >>> schema = delegate_to_repo("get_schema") # also allowed
119 +
120 +        """
121 +        cmd = cPickle.loads(cmd)
122 +        if not cmd:
123 +            raise AttributeError('function name required')
124 +        if isinstance(cmd, basestring):
125 +            cmd = [cmd]
126 +        if len(cmd) < 2:
127 +            cmd.append(())
128 +        if len(cmd) < 3:
129 +            cmd.append({})
130 +        cmd  = list(cmd) + [(), {}]
131 +        funcname, args, kwargs = cmd[:3]
132 +        result = getattr(self.repo, funcname)(*args, **kwargs)
133 +        return result
134 +
135 +    def process_cmds(self, cmds):
136 +        """Callback intended to be used with ``on_recv``.
137 +
138 +        Call ``delegate_to_repo`` on each command and send a pickled of
139 +        each result recursively.
140 +
141 +        Any exception are catched, pickled and sent.
142 +        """
143 +        try:
144 +            for cmd in cmds:
145 +                result = self.process_cmd(cmd)
146 +                self.send_data(result)
147 +        except Exception, exc:
148 +            traceback.print_exc()
149 +            self.send_data(exc)
150 +
151 +    def send_data(self, data):
152 +        self.socket.send_pyobj(data)
153 +
154 +    def quit(self, shutdown_repo=False):
155 +        """stop the server"""
156 +        self.info('Quitting ZMQ server')
157 +        try:
158 +            self.loop.stop()
159 +            self.stream.on_recv(None)
160 +            self.stream.close()
161 +        except Exception, e:
162 +            print e
163 +            pass
164 +        if shutdown_repo and not self.repo.shutting_down:
165 +            event = QuitEvent()
166 +            event.fire(self)
167 +
168 +    # server utilitities ######################################################
169 +
170 +    def install_sig_handlers(self):
171 +        """install signal handlers"""
172 +        import signal
173 +        self.info('installing signal handlers')
174 +        signal.signal(signal.SIGINT, lambda x, y, s=self: s.quit(shutdown_repo=True))
175 +        signal.signal(signal.SIGTERM, lambda x, y, s=self: s.quit(shutdown_repo=True))
176 +
177 +
178 +    # these are overridden by set_log_methods below
179 +    # only defining here to prevent pylint from complaining
180 +    @classmethod
181 +    def info(cls, msg, *a, **kw):
182 +        pass
183 +
184 +
185  set_log_methods(Publisher, getLogger('cubicweb.zmq.pub'))
186  set_log_methods(Subscriber, getLogger('cubicweb.zmq.sub'))
187 +set_log_methods(ZMQRepositoryServer, getLogger('cubicweb.zmq.repo'))
diff --git a/server/serverconfig.py b/server/serverconfig.py
@@ -202,11 +202,17 @@
188            'help': 'Pyro server host, if not detectable correctly through \
189  gethostname(). It may contains port information using <host>:<port> notation, \
190  and if not set, it will be choosen randomly',
191            'group': 'pyro', 'level': 3,
192            }),
193 -
194 +        # zmq services config
195 +        ('zmq-repository-address',
196 +         {'type' : 'string',
197 +          'default': None,
198 +          'help': 'ZMQ URI on which the repository will be bound to.',
199 +          'group': 'zmq', 'level': 3,
200 +          }),
201           ('zmq-address-sub',
202            {'type' : 'csv',
203             'default' : None,
204             'help': ('List of ZMQ addresses to subscribe to (requires pyzmq)'),
205             'group': 'zmq', 'level': 1,
diff --git a/server/serverctl.py b/server/serverctl.py
@@ -33,10 +33,11 @@
206 
207  from cubicweb import AuthenticationError, ExecutionError, ConfigurationError
208  from cubicweb.toolsutils import Command, CommandHandler, underline_title
209  from cubicweb.cwctl import CWCTL, check_options_consistency
210  from cubicweb.server import SOURCE_TYPES
211 +from cubicweb.server.repository import Repository
212  from cubicweb.server.serverconfig import (
213      USER_OPTIONS, ServerConfiguration, SourceConfiguration,
214      ask_source_config, generate_source_config)
215 
216  # utility functions ###########################################################
@@ -631,11 +632,11 @@
217 
218 
219  class StartRepositoryCommand(Command):
220      """Start a CubicWeb RQL server for a given instance.
221 
222 -    The server will be accessible through pyro
223 +    The server will be remotely accessible through pyro or ZMQ
224 
225      <instance>
226        the identifier of the instance to initialize.
227      """
228      name = 'start-repository'
@@ -648,26 +649,44 @@
229          ('loglevel',
230           {'short': 'l', 'type' : 'choice', 'metavar': '<log level>',
231            'default': None, 'choices': ('debug', 'info', 'warning', 'error'),
232            'help': 'debug if -D is set, error otherwise',
233            }),
234 +        ('address',
235 +         {'short': 'a', 'type': 'string', 'metavar': '<protocol>://<host>:<port>',
236 +          'default': '',
237 +          'help': ('specify a ZMQ URI on which to bind, or use "pyro://"'
238 +                   'to create a pyro-based repository'),
239 +          }),
240          )
241 
242 +    def create_repo(self, config):
243 +        address = self['address']
244 +        if not address:
245 +            address = config.get('zmq-repository-address', 'pyro://')
246 +        if address.startswith('pyro://'):
247 +            from cubicweb.server.server import RepositoryServer
248 +            return RepositoryServer(config), config['host']
249 +        else:
250 +            from cubicweb.server.utils import TasksManager
251 +            from cubicweb.server.cwzmq import ZMQRepositoryServer
252 +            repo = Repository(config, TasksManager())
253 +            return ZMQRepositoryServer(repo), address
254 +
255      def run(self, args):
256          from logilab.common.daemon import daemonize, setugid
257          from cubicweb.cwctl import init_cmdline_log_threshold
258 -        from cubicweb.server.server import RepositoryServer
259          appid = args[0]
260          debug = self['debug']
261          if sys.platform == 'win32' and not debug:
262              logger = logging.getLogger('cubicweb.ctl')
263              logger.info('Forcing debug mode on win32 platform')
264              debug = True
265          config = ServerConfiguration.config_for(appid, debugmode=debug)
266          init_cmdline_log_threshold(config, self['loglevel'])
267          # create the server
268 -        server = RepositoryServer(config)
269 +        server, address = self.create_repo(config)
270          # ensure the directory where the pid-file should be set exists (for
271          # instance /var/run/cubicweb may be deleted on computer restart)
272          pidfile = config['pid-file']
273          piddir = os.path.dirname(pidfile)
274          # go ! (don't daemonize in debug mode)
@@ -677,11 +696,11 @@
275              return
276          uid = config['uid']
277          if uid is not None:
278              setugid(uid)
279          server.install_sig_handlers()
280 -        server.connect(config['host'], 0)
281 +        server.connect(address)
282          server.run()
283 
284 
285  def _remote_dump(host, appid, output, sudo=False):
286      # XXX generate unique/portable file name