wip

authorJulien Cristau <julien.cristau@logilab.fr>
changeset31216de022ec
branchdefault
phasedraft
hiddenyes
parent revision#c4faa7e919fb [dataimport/test] start our own pg cluster instead of polluting the system one
child revision<not specified>
files modified by this revision
dataimport/massive_store.py
dataimport/test/test_massive_store.py
# HG changeset patch
# User Julien Cristau <julien.cristau@logilab.fr>
# Date 1445434662 -7200
# Wed Oct 21 15:37:42 2015 +0200
# Node ID 31216de022ec793446bed713fec81c9af88dc27e
# Parent c4faa7e919fb89ad8ce575e23da7752ef5b5b36a
wip

diff --git a/dataimport/massive_store.py b/dataimport/massive_store.py
@@ -319,12 +319,10 @@
1          indexes, constraints = self._dbh.application_indexes_constraints(tablename)
2          for name, query in constraints.iteritems():
3              sql = 'INSERT INTO dataio_constraints VALUES (%(e)s, %(c)s, %(t)s)'
4              self.sql(sql, {'e': tablename, 'c': query, 't': 'constraint'})
5              sql = 'ALTER TABLE %s DROP CONSTRAINT %s CASCADE' % (tablename, name)
6 -            if name == 'entities_pkey':
7 -                continue
8              self.sql(sql)
9          for name, query in indexes.iteritems():
10              sql = 'INSERT INTO dataio_constraints VALUES (%(e)s, %(c)s, %(t)s)'
11              self.sql(sql, {'e': tablename, 'c': query, 't': 'index'})
12              sql = 'DROP INDEX %s' % name
@@ -526,14 +524,10 @@
13                  self.commit()
14 
15      def flush_entities(self):
16          """ Flush the entities data
17          """
18 -        # insert data into the 'entities' table first as it's needed to satisfy foreign key
19 -        # constraints
20 -        if self.autoflush_metadata:
21 -            self.flush_meta_data()
22          for etype, data in self._data_entities.iteritems():
23              if not data:
24                  # There is no data for these etype for this flush round.
25                  continue
26              # XXX It may be interresting to directly infer the columns'
@@ -562,10 +556,12 @@
27                  cursor.copy_from(buf, 'cw_%s' % etype.lower(), null='NULL', columns=columns)
28              except Exception as exc:
29                  self.on_rollback(exc, etype, data)
30              # Clear data cache
31              self._data_entities[etype] = []
32 +        if self.autoflush_metadata:
33 +            self.flush_meta_data()
34          # Commit if asked
35          if self.commit_at_flush:
36              self.commit()
37 
38      def flush_meta_data(self):
@@ -665,21 +661,21 @@
39      def insert_massive_meta_data(self, etype):
40          """ Massive insertion of meta data for a given etype, based on SQL statements.
41          """
42          # Push data - Use coalesce to avoid NULL (and get 0), if there is no
43          # entities of this type in the entities table.
44 -        sql = ("INSERT INTO entities (eid, type, asource, extid) "
45 -               "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
46 -               "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
47 -               % (etype, etype.lower()))
48 -        self.sql(sql)
49          # Meta data relations
50          self.metagen_push_relation(etype, self._cnx.user.eid, 'created_by_relation')
51          self.metagen_push_relation(etype, self._cnx.user.eid, 'owned_by_relation')
52          self.metagen_push_relation(etype, self.source.eid, 'cw_source_relation')
53          self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_relation')
54          self.metagen_push_relation(etype, self._etype_eid_idx[etype], 'is_instance_of_relation')
55 +        sql = ("INSERT INTO entities (eid, type, asource, extid) "
56 +               "SELECT cw_eid, '%s', 'system', NULL FROM cw_%s "
57 +               "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
58 +               % (etype, etype.lower()))
59 +        self.sql(sql)
60 
61      def metagen_push_relation(self, etype, eid_to, rtype):
62          sql = ("INSERT INTO %s (eid_from, eid_to) SELECT cw_eid, %s FROM cw_%s "
63                 "WHERE NOT EXISTS (SELECT 1 FROM entities WHERE eid=cw_eid)"
64                 % (rtype, eid_to, etype.lower()))
diff --git a/dataimport/test/test_massive_store.py b/dataimport/test/test_massive_store.py
@@ -132,11 +132,11 @@
65              cnx.commit()
66          with self.admin_access.repo_cnx() as cnx:
67              crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
68              indexes = [r[0] for r in crs.fetchall()]
69          self.assertIn('entities_pkey', indexes)
70 -        self.assertIn('entities_extid_idx', indexes)
71 +        self.assertIn('unique_entities_extid_idx', indexes)
72          self.assertIn('owned_by_relation_p_key', indexes)
73          self.assertIn('owned_by_relation_to_idx', indexes)
74 
75      def test_drop_index(self):
76          with self.admin_access.repo_cnx() as cnx:
@@ -144,11 +144,11 @@
77              cnx.commit()
78          with self.admin_access.repo_cnx() as cnx:
79              crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
80              indexes = [r[0] for r in crs.fetchall()]
81          self.assertNotIn('entities_pkey', indexes)
82 -        self.assertNotIn('entities_extid_idx', indexes)
83 +        self.assertNotIn('unique_entities_extid_idx', indexes)
84          self.assertNotIn('owned_by_relation_pkey', indexes)
85          self.assertNotIn('owned_by_relation_to_idx', indexes)
86 
87      def test_drop_index_recreation(self):
88          with self.admin_access.repo_cnx() as cnx:
@@ -157,11 +157,11 @@
89              cnx.commit()
90          with self.admin_access.repo_cnx() as cnx:
91              crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
92              indexes = [r[0] for r in crs.fetchall()]
93          self.assertIn('entities_pkey', indexes)
94 -        self.assertIn('entities_extid_idx', indexes)
95 +        self.assertIn('unique_entities_extid_idx', indexes)
96          self.assertIn('owned_by_relation_p_key', indexes)
97          self.assertIn('owned_by_relation_to_idx', indexes)
98 
99      def test_eids_seq_range(self):
100          with self.admin_access.repo_cnx() as cnx:
@@ -211,11 +211,11 @@
101              slave_store = MassiveObjectStore(cnx, slave_mode=True)
102          with self.admin_access.repo_cnx() as cnx:
103              crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
104              indexes = [r[0] for r in crs.fetchall()]
105          self.assertIn('entities_pkey', indexes)
106 -        self.assertIn('entities_extid_idx', indexes)
107 +        self.assertIn('unique_entities_extid_idx', indexes)
108          self.assertIn('owned_by_relation_p_key', indexes)
109          self.assertIn('owned_by_relation_to_idx', indexes)
110 
111      def test_slave_mode_exception(self):
112          with self.admin_access.repo_cnx() as cnx:
@@ -243,22 +243,22 @@
113 
114              # Check index
115              crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
116              indexes = [r[0] for r in crs.fetchall()]
117              self.assertNotIn('entities_pkey', indexes)
118 -            self.assertNotIn('entities_extid_idx', indexes)
119 +            self.assertNotIn('unique_entities_extid_idx', indexes)
120              self.assertNotIn('owned_by_relation_p_key', indexes)
121              self.assertNotIn('owned_by_relation_to_idx', indexes)
122 
123              # Cleanup -> index
124              store.cleanup()
125 
126              # Check index again
127              crs = cnx.system_sql('SELECT indexname FROM pg_indexes')
128              indexes = [r[0] for r in crs.fetchall()]
129              self.assertIn('entities_pkey', indexes)
130 -            self.assertIn('entities_extid_idx', indexes)
131 +            self.assertIn('unique_entities_extid_idx', indexes)
132              self.assertIn('owned_by_relation_p_key', indexes)
133              self.assertIn('owned_by_relation_to_idx', indexes)
134 
135      def test_flush_meta_data(self):
136          with self.admin_access.repo_cnx() as cnx: