# HG changeset patch
# User Sylvain Thénault <sylvain.thenault@logilab.fr>
# Date 1445437931 -7200
# Wed Oct 21 16:32:11 2015 +0200
# Node ID beeb70bb41defd7f311d4d758ec99bf5f463c0de
# Parent 3987a5e736d819bc0c843e9273430fd84ebc827c
[dataimport] implement new store API on massive store
and deprecate the old one.
Related to #5414760
# User Sylvain Thénault <sylvain.thenault@logilab.fr>
# Date 1445437931 -7200
# Wed Oct 21 16:32:11 2015 +0200
# Node ID beeb70bb41defd7f311d4d758ec99bf5f463c0de
# Parent 3987a5e736d819bc0c843e9273430fd84ebc827c
[dataimport] implement new store API on massive store
and deprecate the old one.
Related to #5414760
@@ -22,10 +22,11 @@
1 from collections import defaultdict 2 from io import StringIO 3 4 from six.moves import range 5 6 +from logilab.common.deprecation import deprecated 7 from yams.constraints import SizeConstraint 8 9 from psycopg2 import ProgrammingError 10 11 from cubicweb.dataimport import stores, pgstore
@@ -154,11 +155,11 @@
12 if self._eids_seq_start is not None: 13 self._cnx.system_sql(self._cnx.repo.system_source.dbhelper.sql_restart_numrange( 14 'entities_id_seq', initial_value=self._eids_seq_start + 1)) 15 cnx.commit() 16 self.get_next_eid = lambda g=self._get_eid_gen(): next(g) 17 - # recreate then when self.cleanup() is called 18 + # recreate then when self.finish() is called 19 if not self.slave_mode and self.drop_index: 20 self._drop_metatables_constraints() 21 if source is None: 22 source = cnx.repo.system_source 23 self.source = source
@@ -435,12 +436,15 @@
24 default_values = self.default_values[etype] 25 missing_keys = set(default_values) - set(kwargs) 26 kwargs.update((key, default_values[key]) for key in missing_keys) 27 return kwargs 28 29 - def create_entity(self, etype, **kwargs): 30 - """ Create an entity 31 + # store api ################################################################ 32 + 33 + def prepare_insert_entity(self, etype, **kwargs): 34 + """Given an entity type, attributes and inlined relations, returns the inserted entity's 35 + eid. 36 """ 37 # Init the table if necessary 38 self.init_etype_table(etype) 39 # Add meta data if not given 40 if 'modification_date' not in kwargs:
@@ -459,26 +463,89 @@
41 kwargs['eid'] = self.get_next_eid() 42 # Check size constraints 43 kwargs = self.apply_size_constraints(etype, kwargs) 44 # Apply default values 45 kwargs = self.apply_default_values(etype, kwargs) 46 - # Push data / Return entity 47 + # Push data 48 self._data_entities[etype].append(kwargs) 49 - entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx) 50 - entity.cw_attr_cache.update(kwargs) 51 - if 'eid' in kwargs: 52 - entity.eid = kwargs['eid'] 53 - return entity 54 + # Return eid 55 + return kwargs.get('eid') 56 57 - ### RELATIONS CREATION #################################################### 58 - 59 - def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs): 60 - """ Compatibility with other stores 61 + def prepare_insert_relation(self, eid_from, rtype, eid_to, **kwargs): 62 + """Insert into the database a relation ``rtype`` between entities with eids ``eid_from`` 63 + and ``eid_to``. 64 """ 65 # Init the table if necessary 66 self.init_relation_table(rtype) 67 - self._data_relations[rtype].append({'eid_from': subj_eid, 'eid_to': obj_eid}) 68 + self._data_relations[rtype].append({'eid_from': eid_from, 'eid_to': eid_to}) 69 + 70 + def flush(self): 71 + """Flush the data""" 72 + self.flush_entities() 73 + self.flush_internal_relations() 74 + self.flush_relations() 75 + 76 + def commit(self): 77 + """Commit the database transaction.""" 78 + self.on_commit() 79 + super(MassiveObjectStore, self).commit() 80 + 81 + def finish(self): 82 + """Remove temporary tables and columns.""" 83 + self.logger.info("Start cleaning") 84 + if self.slave_mode: 85 + raise RuntimeError('Store cleanup is not allowed in slave mode') 86 + self.logger.info("Start cleaning") 87 + # Cleanup relations tables 88 + for etype in self._initialized['init_uri_eid']: 89 + self.sql('DROP TABLE uri_eid_%s' % etype.lower()) 90 + # Remove relations tables 91 + for rtype in self._initialized['uri_rtypes']: 92 + if not self._cnx.repo.schema.rschema(rtype).inlined: 93 + self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype}) 94 + else: 95 + self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype) 96 + self.commit() 97 + # Get all the initialized etypes/rtypes 98 + if self._dbh.table_exists('dataio_initialized'): 99 + crs = self.sql('SELECT retype, type FROM dataio_initialized') 100 + for retype, _type in crs.fetchall(): 101 + self.logger.info('Cleanup for %s' % retype) 102 + if _type == 'etype': 103 + # Cleanup entities tables - Recreate indexes 104 + self._cleanup_entities(retype) 105 + elif _type == 'rtype': 106 + # Cleanup relations tables 107 + self._cleanup_relations(retype) 108 + self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s', 109 + {'e': retype}) 110 + # Create meta constraints (entities, is_instance_of, ...) 111 + self._create_metatables_constraints() 112 + self.commit() 113 + # Delete the meta data table 114 + for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'): 115 + if self._dbh.table_exists(table_name): 116 + self.sql('DROP TABLE %s' % table_name) 117 + self.commit() 118 + 119 + @deprecated('[3.22] use prepare_insert_entity instead') 120 + def create_entity(self, etype, **kwargs): 121 + """ Create an entity 122 + """ 123 + eid = self.prepare_insert_entity(etype, **kwargs) 124 + entity = self._cnx.vreg['etypes'].etype_class(etype)(self._cnx) 125 + entity.cw_attr_cache.update(kwargs) 126 + entity.eid = eid 127 + return entity 128 + 129 + @deprecated('[3.22] use prepare_insert_relation instead') 130 + def relate(self, subj_eid, rtype, obj_eid, *args, **kwargs): 131 + self.prepare_insert_relation(subj_eid, rtype, obj_eid, *args, **kwargs) 132 + 133 + @deprecated('[3.22] use finish instead') 134 + def cleanup(self): 135 + self.finish() 136 137 138 ### FLUSH ################################################################# 139 140 def on_commit(self):
@@ -490,21 +557,10 @@
141 self.on_rollback_callback(exc, etype, data) 142 self._cnx.rollback() 143 else: 144 raise exc 145 146 - def commit(self): 147 - self.on_commit() 148 - super(MassiveObjectStore, self).commit() 149 - 150 - def flush(self): 151 - """ Flush the data 152 - """ 153 - self.flush_entities() 154 - self.flush_internal_relations() 155 - self.flush_relations() 156 - 157 def flush_internal_relations(self): 158 """ Flush the relations data 159 """ 160 for rtype, data in self._data_relations.items(): 161 if not data:
@@ -619,49 +675,10 @@
162 # Create indexes and constraints 163 if self.drop_index: 164 tablename = '%s_relation' % rtype.lower() 165 self.reapply_constraint_index(tablename) 166 167 - def cleanup(self): 168 - """ Remove temporary tables and columns 169 - """ 170 - self.logger.info("Start cleaning") 171 - if self.slave_mode: 172 - raise RuntimeError('Store cleanup is not allowed in slave mode') 173 - self.logger.info("Start cleaning") 174 - # Cleanup relations tables 175 - for etype in self._initialized['init_uri_eid']: 176 - self.sql('DROP TABLE uri_eid_%s' % etype.lower()) 177 - # Remove relations tables 178 - for rtype in self._initialized['uri_rtypes']: 179 - if not self._cnx.repo.schema.rschema(rtype).inlined: 180 - self.sql('DROP TABLE %(r)s_relation_iid_tmp' % {'r': rtype}) 181 - else: 182 - self.logger.warning("inlined relation %s: no cleanup to be done for it" % rtype) 183 - self.commit() 184 - # Get all the initialized etypes/rtypes 185 - if self._dbh.table_exists('dataio_initialized'): 186 - crs = self.sql('SELECT retype, type FROM dataio_initialized') 187 - for retype, _type in crs.fetchall(): 188 - self.logger.info('Cleanup for %s' % retype) 189 - if _type == 'etype': 190 - # Cleanup entities tables - Recreate indexes 191 - self._cleanup_entities(retype) 192 - elif _type == 'rtype': 193 - # Cleanup relations tables 194 - self._cleanup_relations(retype) 195 - self.sql('DELETE FROM dataio_initialized WHERE retype = %(e)s', 196 - {'e': retype}) 197 - # Create meta constraints (entities, is_instance_of, ...) 198 - self._create_metatables_constraints() 199 - self.commit() 200 - # Delete the meta data table 201 - for table_name in ('dataio_initialized', 'dataio_constraints', 'dataio_metadata'): 202 - if self._dbh.table_exists(table_name): 203 - self.sql('DROP TABLE %s' % table_name) 204 - self.commit() 205 - 206 def insert_massive_meta_data(self, etype): 207 """ Massive insertion of meta data for a given etype, based on SQL statements. 208 """ 209 # Push data - Use coalesce to avoid NULL (and get 0), if there is no 210 # entities of this type in the entities table.
@@ -77,19 +77,19 @@
211 'population': population, 'elevation': elevation, 212 'gtopo30': gtopo, 'timezone': timezone_code.get(infos[17]), 213 'cwuri': u'http://sws.geonames.org/%s/' % int(infos[0]), 214 'geonameid': int(infos[0]), 215 } 216 - store.create_entity('Location', **entity) 217 + store.prepare_insert_entity('Location', **entity) 218 219 def test_autoflush_metadata(self): 220 with self.admin_access.repo_cnx() as cnx: 221 crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', 222 {'t': 'Location'}) 223 self.assertEqual(len(crs.fetchall()), 0) 224 store = MassiveObjectStore(cnx, autoflush_metadata=True) 225 - store.create_entity('Location', name=u'toto') 226 + store.prepare_insert_entity('Location', name=u'toto') 227 store.flush() 228 store.commit() 229 store.cleanup() 230 cnx.commit() 231 with self.admin_access.repo_cnx() as cnx:
@@ -102,11 +102,11 @@
232 # crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', 233 # {'t': 'Location'}) 234 # self.assertEqual(len(crs.fetchall()), 0) 235 # with self.admin_access.repo_cnx() as cnx: 236 # store = MassiveObjectStore(cnx, autoflush_metadata=False) 237 -# store.create_entity('Location', name=u'toto') 238 +# store.prepare_insert_entity('Location', name=u'toto') 239 # store.flush() 240 # store.commit() 241 # crs = cnx.system_sql('SELECT * FROM entities WHERE type=%(t)s', 242 # {'t': 'Location'}) 243 # self.assertEqual(len(crs.fetchall()), 0)
@@ -117,12 +117,12 @@
244 # store.cleanup() 245 246 def test_massimport_etype_metadata(self): 247 with self.admin_access.repo_cnx() as cnx: 248 store = MassiveObjectStore(cnx) 249 - timezone = store.create_entity('TimeZone') 250 - store.create_entity('Location', timezone=timezone.eid) 251 + timezone_eid = store.prepare_insert_entity('TimeZone') 252 + store.prepare_insert_entity('Location', timezone=timezone_eid) 253 store.flush() 254 store.commit() 255 eid, etname = cnx.execute('Any X, TN WHERE X timezone TZ, X is T, ' 256 'T name TN')[0] 257 self.assertEqual(cnx.entity_from_eid(eid).cw_etype, etname)
@@ -165,46 +165,45 @@
258 self.assertIn('owned_by_relation_to_idx', indexes) 259 260 def test_eids_seq_range(self): 261 with self.admin_access.repo_cnx() as cnx: 262 store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000) 263 - store.create_entity('Location', name=u'toto') 264 + store.prepare_insert_entity('Location', name=u'toto') 265 store.flush() 266 cnx.commit() 267 with self.admin_access.repo_cnx() as cnx: 268 crs = cnx.system_sql("SELECT * FROM entities_id_seq") 269 self.assertGreater(crs.fetchone()[0], 50000) 270 271 def test_eid_entity(self): 272 with self.admin_access.repo_cnx() as cnx: 273 store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000) 274 - entity = store.create_entity('Location', name=u'toto') 275 + eid = store.prepare_insert_entity('Location', name=u'toto') 276 store.flush() 277 - self.assertGreater(entity.eid, 50000) 278 + self.assertGreater(eid, 50000) 279 280 def test_eid_entity_2(self): 281 with self.admin_access.repo_cnx() as cnx: 282 store = MassiveObjectStore(cnx, eids_seq_range=1000, eids_seq_start=50000) 283 - entity = store.create_entity('Location', name=u'toto', eid=10000) 284 + eid = store.prepare_insert_entity('Location', name=u'toto', eid=10000) 285 store.flush() 286 - with self.admin_access.repo_cnx() as cnx: 287 - self.assertEqual(entity.eid, 10000) 288 + self.assertEqual(eid, 10000) 289 290 def test_on_commit_callback(self): 291 counter = itertools.count() 292 with self.admin_access.repo_cnx() as cnx: 293 store = MassiveObjectStore(cnx, on_commit_callback=lambda:next(counter)) 294 - store.create_entity('Location', name=u'toto') 295 + store.prepare_insert_entity('Location', name=u'toto') 296 store.flush() 297 store.commit() 298 self.assertGreaterEqual(next(counter), 1) 299 300 def test_on_rollback_callback(self): 301 counter = itertools.count() 302 with self.admin_access.repo_cnx() as cnx: 303 store = MassiveObjectStore(cnx, on_rollback_callback=lambda *_: next(counter)) 304 - store.create_entity('Location', nm='toto') 305 + store.prepare_insert_entity('Location', nm='toto') 306 store.flush() 307 store.commit() 308 self.assertGreaterEqual(next(counter), 1) 309 310 def test_slave_mode_indexes(self):