[dataimport] implement new store API on massive store

and deprecate the old one.

Related to #5414760

authorSylvain Thénault <sylvain.thenault@logilab.fr>
changeset8e1f6de61300
branchdefault
phasepublic
hiddenno
parent revision#7357b1485795 [dataimport] .finish method was missing from (NoHook)RQLObjectStore
child revision#b7f4acf0473b [dataimport] methods that modify in-place shouldn't return value
files modified by this revision
dataimport/massive_store.py
dataimport/test/test_massive_store.py
# HG changeset patch
# User Sylvain Thénault <sylvain.thenault@logilab.fr>
# Date 1445437931 -7200
# Wed Oct 21 16:32:11 2015 +0200
# Node ID 8e1f6de61300b5032a7b615cd8274bb96d2ca173
# Parent 7357b14857951ea0e83ee32a983abd3c7f98129b
[dataimport] implement new store API on massive store

and deprecate the old one.

Related to #5414760

diff --git a/dataimport/massive_store.py b/dataimport/massive_store.py
@@ -22,10 +22,11 @@
1  from collections import defaultdict
2  from io import StringIO
3 
4  from six.moves import range
5 
6 +from logilab.common.deprecation import deprecated
7  from yams.constraints import SizeConstraint
8 
9  from psycopg2 import ProgrammingError
10 
11  from cubicweb.dataimport import stores, pgstore
@@ -154,11 +155,11 @@
12          if self._eids_seq_start is not None:
13              self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange(
14                  'entities_id_seq', initial_value=self._eids_seq_start + 1))
15              cnx.commit()
16          self.get_next_eid = lambda g=self._get_eid_gen(): next(g)
17 -        # recreate then when self.cleanup() is called
18 +        # recreate then when self.finish() is called
19          if not self.slave_mode and self.drop_index:
20              self._drop_metatables_constraints()
21          if source is None:
22              source = cnx.repo.system_source
23          self.source = source
@@ -435,12 +436,15 @@
24          default_values = self.default_values[etype]
25          missing_keys = set(default_values) - set(kwargs)
26          kwargs.update((key, default_values[key]) for key in missing_keys)
27          return kwargs
28 
29 -    def create_entity(self, etype, **kwargs):
30 -        """ Create an entity
31 +    # store api ################################################################
32 +
33 +    def prepare_insert_entity(self, etype, **kwargs):
34 +        """Given an entity type, attributes and inlined relations, returns the inserted entity's
35 +        eid.
36          """
37          # Init the table if necessary
38          self.init_etype_table(etype)
39          # Add meta data if not given
40          if 'modification_date' not in kwargs:
@@ -459,26 +463,89 @@
41              kwargs['eid'] = self.get_next_eid()
42          # Check size constraints
43          kwargs = self.apply_size_constraints(etype, kwargs)
44          # Apply default values
45          kwargs = self.apply_default_values(etype, kwargs)
46 -        # Push data / Return entity
47 +        # Push data
48          self._data_entities[etype].append(kwargs)
49 -        entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx)
50 -        entity.cw_attr_cache.update(kwargs)
51 -        if 'eid' in kwargs:
52 -            entity.eid = kwargs['eid']
53 -        return entity
54 +        # Return eid
55 +        return kwargs.get('eid')
56 
57 -    ### RELATIONS CREATION ####################################################
58 -
59 -    def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs):
60 -        """ Compatibility with other stores
61 +    def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs):
62 +        """Insert into the database a  relation ``rtype`` between entities with eids ``eid_from``
63 +        and ``eid_to``.
64          """
65          # Init the table if necessary
66          self.init_relation_table(rtype)
67 -        self._data_relations[rtype].append({'eid_from': subj_eid, 'eid_to': obj_eid})
68 +        self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to})
69 +
70 +    def flush(self):
71 +        """Flush the data"""
72 +        self.flush_entities()
73 +        self.flush_internal_relations()
74 +        self.flush_relations()
75 +
76 +    def commit(self):
77 +        """Commit the database transaction."""
78 +        self.on_commit()
79 +        super(MassiveObjectStore, self).commit()
80 +
81 +    def finish(self):
82 +        """Remove temporary tables and columns."""
83 +        self.logger.info("Start cleaning")
84 +        if self.slave_mode:
85 +            raise RuntimeError('Store cleanup is not allowed in slave mode')
86 +        self.logger.info("Start cleaning")
87 +        # Cleanup relations tables
88 +        for etype in self._initialized['init_uri_eid']:
89 +            self.sql('DROP TABLE uri_eid_%s' % etype.lower())
90 +        # Remove relations tables
91 +        for rtype in self._initialized['uri_rtypes']:
92 +            if not self._cnx.repo.schema.rschema(rtype).inlined:
93 +                self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
94 +            else:
95 +                self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
96 +        self.commit()
97 +        # Get all the initialized etypes/rtypes
98 +        if self._dbh.table_exists('dataio_initialized'):
99 +            crs = self.sql('SELECT retype, type FROM dataio_initialized')
100 +            for retype, _type in crs.fetchall():
101 +                self.logger.info('Cleanup for %s' % retype)
102 +                if _type == 'etype':
103 +                    # Cleanup entities tables - Recreate indexes
104 +                    self._cleanup_entities(retype)
105 +                elif _type == 'rtype':
106 +                    # Cleanup relations tables
107 +                    self._cleanup_relations(retype)
108 +                self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s',
109 +                         {'e': retype})
110 +        # Create meta constraints (entities, is_instance_of, ...)
111 +        self._create_metatables_constraints()
112 +        self.commit()
113 +        # Delete the meta data table
114 +        for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'):
115 +            if self._dbh.table_exists(table_name):
116 +                self.sql('DROP TABLE %s' % table_name)
117 +        self.commit()
118 +
119 +    @deprecated('[3.22] use prepare_insert_entity instead')
120 +    def create_entity(self, etype, **kwargs):
121 +        """ Create an entity
122 +        """
123 +        eid = self.prepare_insert_entity(etype, **kwargs)
124 +        entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx)
125 +        entity.cw_attr_cache.update(kwargs)
126 +        entity.eid = eid
127 +        return entity
128 +
129 +    @deprecated('[3.22] use prepare_insert_relation instead')
130 +    def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs):
131 +        self.prepare_insert_relation(subj_eid, rtype, obj_eid, *args, **kwargs)
132 +
133 +    @deprecated('[3.22] use finish instead')
134 +    def cleanup(self):
135 +        self.finish()
136 
137 
138      ### FLUSH #################################################################
139 
140      def on_commit(self):
@@ -490,21 +557,10 @@
141              self.on_rollback_callback(exc, etype, data)
142              self._cnx.rollback()
143          else:
144              raise exc
145 
146 -    def commit(self):
147 -        self.on_commit()
148 -        super(MassiveObjectStore, self).commit()
149 -
150 -    def flush(self):
151 -        """ Flush the data
152 -        """
153 -        self.flush_entities()
154 -        self.flush_internal_relations()
155 -        self.flush_relations()
156 -
157      def flush_internal_relations(self):
158          """ Flush the relations data
159          """
160          for rtype, data in self._data_relations.items():
161              if not data:
@@ -619,49 +675,10 @@
162          # Create indexes and constraints
163          if self.drop_index:
164              tablename = '%s_relation' % rtype.lower()
165              self.reapply_constraint_index(tablename)
166 
167 -    def cleanup(self):
168 -        """ Remove temporary tables and columns
169 -        """
170 -        self.logger.info("Start cleaning")
171 -        if self.slave_mode:
172 -            raise RuntimeError('Store cleanup is not allowed in slave mode')
173 -        self.logger.info("Start cleaning")
174 -        # Cleanup relations tables
175 -        for etype in self._initialized['init_uri_eid']:
176 -            self.sql('DROP TABLE uri_eid_%s' % etype.lower())
177 -        # Remove relations tables
178 -        for rtype in self._initialized['uri_rtypes']:
179 -            if not self._cnx.repo.schema.rschema(rtype).inlined:
180 -                self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype})
181 -            else:
182 -                self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype)
183 -        self.commit()
184 -        # Get all the initialized etypes/rtypes
185 -        if self._dbh.table_exists('dataio_initialized'):
186 -            crs = self.sql('SELECT retype, type FROM dataio_initialized')
187 -            for retype, _type in crs.fetchall():
188 -                self.logger.info('Cleanup for %s' % retype)
189 -                if _type == 'etype':
190 -                    # Cleanup entities tables - Recreate indexes
191 -                    self._cleanup_entities(retype)
192 -                elif _type == 'rtype':
193 -                    # Cleanup relations tables
194 -                    self._cleanup_relations(retype)
195 -                self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s',
196 -                         {'e': retype})
197 -        # Create meta constraints (entities, is_instance_of, ...)
198 -        self._create_metatables_constraints()
199 -        self.commit()
200 -        # Delete the meta data table
201 -        for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'):
202 -            if self._dbh.table_exists(table_name):
203 -                self.sql('DROP TABLE %s' % table_name)
204 -        self.commit()
205 -
206      def insert_massive_meta_data(self, etype):
207          """ Massive insertion of meta data for a given etype, based on SQL statements.
208          """
209          # Push data - Use coalesce to avoid NULL (and get 0), if there is no
210          # entities of this type in the entities table.
diff --git a/dataimport/test/test_massive_store.py b/dataimport/test/test_massive_store.py
@@ -77,19 +77,19 @@
211                        'population': population, 'elevation': elevation,
212                        'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]),
213                        'cwuri':  u'http://sws.geonames.org/%s/' % int(infos[0]),
214                        'geonameid': int(infos[0]),
215                        }
216 -            store.create_entity('Location', **entity)
217 +            store.prepare_insert_entity('Location', **entity)
218 
219      def test_autoflush_metadata(self):
220          with self.admin_access.repo_cnx() as cnx:
221              crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
222                                   {'t': 'Location'})
223              self.assertEqual(len(crs.fetchall()), 0)
224              store = MassiveObjectStore(cnx, autoflush_metadata=True)
225 -            store.create_entity('Location', name=u'toto')
226 +            store.prepare_insert_entity('Location', name=u'toto')
227              store.flush()
228              store.commit()
229              store.cleanup()
230              cnx.commit()
231          with self.admin_access.repo_cnx() as cnx:
@@ -102,11 +102,11 @@
232  #            crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
233  #                                      {'t': 'Location'})
234  #            self.assertEqual(len(crs.fetchall()), 0)
235  #        with self.admin_access.repo_cnx() as cnx:
236  #            store = MassiveObjectStore(cnx, autoflush_metadata=False)
237 -#            store.create_entity('Location', name=u'toto')
238 +#            store.prepare_insert_entity('Location', name=u'toto')
239  #            store.flush()
240  #            store.commit()
241  #            crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s',
242  #                                 {'t': 'Location'})
243  #            self.assertEqual(len(crs.fetchall()), 0)
@@ -117,12 +117,12 @@
244  #            store.cleanup()
245 
246      def test_massimport_etype_metadata(self):
247          with self.admin_access.repo_cnx() as cnx:
248              store = MassiveObjectStore(cnx)
249 -            timezone = store.create_entity('TimeZone')
250 -            store.create_entity('Location', timezone=timezone.eid)
251 +            timezone_eid = store.prepare_insert_entity('TimeZone')
252 +            store.prepare_insert_entity('Location', timezone=timezone_eid)
253              store.flush()
254              store.commit()
255              eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, '
256                                        'T name TN')[0]
257              self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname)
@@ -165,46 +165,45 @@
258          self.assertIn('owned_by_relation_to_idx', indexes)
259 
260      def test_eids_seq_range(self):
261          with self.admin_access.repo_cnx() as cnx:
262              store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
263 -            store.create_entity('Location', name=u'toto')
264 +            store.prepare_insert_entity('Location', name=u'toto')
265              store.flush()
266              cnx.commit()
267          with self.admin_access.repo_cnx() as cnx:
268              crs = cnx.system_sql("SELECT * FROM entities_id_seq")
269              self.assertGreater(crs.fetchone()[0], 50000)
270 
271      def test_eid_entity(self):
272          with self.admin_access.repo_cnx() as cnx:
273              store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
274 -            entity = store.create_entity('Location', name=u'toto')
275 +            eid = store.prepare_insert_entity('Location', name=u'toto')
276              store.flush()
277 -            self.assertGreater(entity.eid, 50000)
278 +            self.assertGreater(eid, 50000)
279 
280      def test_eid_entity_2(self):
281          with self.admin_access.repo_cnx() as cnx:
282              store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000)
283 -            entity = store.create_entity('Location', name=u'toto', eid=10000)
284 +            eid = store.prepare_insert_entity('Location', name=u'toto', eid=10000)
285              store.flush()
286 -        with self.admin_access.repo_cnx() as cnx:
287 -            self.assertEqual(entity.eid, 10000)
288 +        self.assertEqual(eid, 10000)
289 
290      def test_on_commit_callback(self):
291          counter = itertools.count()
292          with self.admin_access.repo_cnx() as cnx:
293              store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter))
294 -            store.create_entity('Location', name=u'toto')
295 +            store.prepare_insert_entity('Location', name=u'toto')
296              store.flush()
297              store.commit()
298          self.assertGreaterEqual(next(counter), 1)
299 
300      def test_on_rollback_callback(self):
301          counter = itertools.count()
302          with self.admin_access.repo_cnx() as cnx:
303              store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter))
304 -            store.create_entity('Location', nm='toto')
305 +            store.prepare_insert_entity('Location', nm='toto')
306              store.flush()
307              store.commit()
308          self.assertGreaterEqual(next(counter), 1)
309 
310      def test_slave_mode_indexes(self):