[index] refresh the index after performing the `es_index` operation. (closes #51594388)

If this is not done, elasticsearch.scan method retrieves the old index values so wrong values will be copied while synchronizing different indexes.

We probably should do it by default in indexer.es_index method. We probably should do it in indexer.es_delete method.

see also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html

authorKatia Saurfelt <katia.saurfelt@logilab.fr>
changeset2a9a7b4ef362
branchdefault
phasepublic
hiddenno
parent revision#49e1524640df Added tag 0.6.1, debian/0.6.1-1, centos/0.6.1-1 for changeset f691acea28d1
child revision#a568b4704d5f [pkg] 0.6.2-1, #cb6200854d36 [pkg] 0.6.2
files modified by this revision
cubicweb_elasticsearch/entities.py
# HG changeset patch
# User Katia Saurfelt <katia.saurfelt@logilab.fr>
# Date 1524129695 -7200
# Thu Apr 19 11:21:35 2018 +0200
# Node ID 2a9a7b4ef36297970604c6e1adbc7a15f62ad853
# Parent 49e1524640dfa23bd05aef4ad690d3757e2018d2
[index] refresh the index after performing the `es_index` operation. (closes #51594388)

If this is not done, elasticsearch.scan method retrieves the old index values
so wrong values will be copied while synchronizing different indexes.

We probably should do it by default in `indexer.es_index` method.
We probably should do it in `indexer.es_delete` method.

see also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-refresh.html

diff --git a/cubicweb_elasticsearch/entities.py b/cubicweb_elasticsearch/entities.py
@@ -17,10 +17,12 @@
1 
2  """cubicweb-elasticsearch entity's classes"""
3 
4  import collections
5 
6 +from functools import partial
7 +
8  from urllib3.exceptions import ProtocolError
9 
10  from elasticsearch.exceptions import ConnectionError, NotFoundError
11 
12  from logilab.common.decorators import cachedproperty
@@ -84,21 +86,22 @@
13              deep_update(settings, custom_settings)
14          es_cnx = es.get_connection(self._cw.vreg.config)
15          if es_cnx is not None:
16              es.create_index(es_cnx, index_name, settings)
17 
18 -    def es_index(self, entity):
19 +    def es_index(self, entity, params=None):
20          es_cnx = self.get_connection()
21          if es_cnx is None or not self.index_name:
22              self.error('no connection to ES (not configured) skip ES indexing')
23              return
24          serializable = entity.cw_adapt_to('IFullTextIndexSerializable')
25          json = serializable.serialize()
26          if not json:
27              return
28          es_cnx.index(index=self.index_name, id=serializable.es_id,
29 -                     doc_type=serializable.es_doc_type, body=json)
30 +                     doc_type=serializable.es_doc_type, body=json,
31 +                     params=params)
32 
33      def es_delete(self, entity):
34          es_cnx = self.get_connection()
35          if es_cnx is None or not self.index_name:
36              self.error('no connection to ES (not configured) skip ES deletion')
@@ -216,11 +219,11 @@
37          indexer = es_operation.get('indexer', self.default_indexer)
38          entity = es_operation['entity']
39          if self._cw.deleted_in_transaction(entity.eid):
40              es_method = indexer.es_delete
41          elif es_operation['op_type'] == 'index':
42 -            es_method = indexer.es_index
43 +            es_method = partial(indexer.es_index, params={'refresh': True})
44          elif es_operation['op_type'] == 'delete':
45              es_method = indexer.es_delete
46          else:
47              self.info('skipping unknown operation type %s on %s',
48                        es_operation['op_type'], entity.eid)