WIP add functions and tools to export RDF dumps

authorSimon Chabot <simon.chabot@logilab.fr>
changeset244e4e12e585
branchdefault
phasedraft
hiddenno
parent revision#2f3cb7f5a92f chore(flake8): update flake8-ok-files.txt with files that passes flake8 test
child revision<not specified>
files modified by this revision
cubicweb/rdf.py
# HG changeset patch
# User Simon Chabot <simon.chabot@logilab.fr>
# Date 1581689331 -3600
# Fri Feb 14 15:08:51 2020 +0100
# Node ID 244e4e12e585c2f6edc979e3e19261f12e0eafbc
# Parent 2f3cb7f5a92f4c3a70a5abf30227d6228b614804
WIP add functions and tools to export RDF dumps

diff --git a/cubicweb/rdf.py b/cubicweb/rdf.py
@@ -13,13 +13,21 @@
1  # details.
2  #
3  # You should have received a copy of the GNU Lesser General Public License
4  # along with this program. If not, see <http://www.gnu.org/licenses/>.
5 
6 -from rdflib import plugin, namespace
7 +import os
8 +import tarfile
9 +
10 +from rdflib import plugin, namespace, Namespace, ConjunctiveGraph
11  import rdflib_jsonld  # noqa
12 
13 +from cubicweb import xy
14 +from cubicweb.cwctl import CWCTL
15 +from cubicweb.toolsutils import Command
16 +from cubicweb.utils import admincnx
17 +
18  plugin.register("jsonld", plugin.Serializer, "rdflib_jsonld.serializer", "JsonLDSerializer")
19 
20  RDF_MIMETYPE_TO_FORMAT = {
21      'application/rdf+xml': 'xml',
22      'text/turtle': 'turtle',
@@ -44,12 +52,149 @@
23      "schema": namespace.Namespace("http://schema.org/"),
24      "cubicweb": namespace.Namespace("http://ns.cubicweb.org/cubicweb/0.0/")
25  }
26 
27 
28 +def register_prefixes_in_xy():
29 +    for prefix, uri in namespaces.items():
30 +        xy.register_prefix(prefix, uri, overwrite=True)
31 +
32 +
33 +class VocabAdapter:
34 +
35 +    def __init__(self):
36 +        self.adapter_cache = {}
37 +
38 +    def adapt(self, entity, vocab):
39 +        adapter = None
40 +        cache_key = (entity.__regid__, vocab)
41 +        if cache_key in self.adapter_cache:
42 +            adaptercls = self.adapter_cache[cache_key]
43 +            # adaptercls = None means no adapter for this vocabulary
44 +            if adaptercls is not None:
45 +                adapter = adaptercls(entity._cw, entity=entity)
46 +        else:
47 +            adapter = entity.cw_adapt_to(vocab)
48 +            adaptercls = adapter.__class__ if adapter is not None else None
49 +            self.adapter_cache[cache_key] = adaptercls
50 +        return adapter
51 +
52 +
53 +def conjunctive_graph():
54 +    """factory to build a ``ConjunctiveGraph`` and bind all namespaces
55 +    """
56 +    graph = ConjunctiveGraph()
57 +    for vocab, rdfns in NS_VARS.items():
58 +        graph.bind(vocab, rdfns)
59 +    return graph
60 +
61 +
62 +def iter_rdf_adapters(entity):
63 +    for adapter_id in ETYPES_ADAPTERS.get(entity.__regid__, ()):
64 +        adapter = entity.cw_adapt_to(adapter_id)
65 +        if adapter:
66 +            yield adapter
67 +
68 +
69  def add_entity_to_graph(graph, entity):
70      adapter = entity.cw_adapt_to("rdf")
71      if adapter:
72          for triple in adapter.triples():
73              graph.add(triple)
74          for prefix, rdfns in adapter.used_namespaces.items():
75              graph.bind(prefix, rdfns)
76 +
77 +
78 +def _add_etype_to_graph(cnx, graph, etype, limit, offset):
79 +    rql = "Any X ORDERBY X LIMIT %s OFFSET %s WHERE X is %s" % (limit, offset, etype)
80 +    rset = cnx.execute(rql)
81 +    # Construct graph
82 +    for entity in rset.entities():
83 +        add_entity_to_graph(graph, entity, build_dump=True)
84 +
85 +
86 +# dumps #######################################################################
87 +
88 +def create_dumps_etype(cnx, output_dir, etype, formats, chunksize=2000):
89 +    nb_entities = cnx.execute("Any COUNT(X) WHERE X is %s" % etype)[0][0]
90 +    filenames = []
91 +    for offset in range(0, nb_entities, chunksize):
92 +        graph = conjunctive_graph()
93 +        _add_etype_to_graph(cnx, graph, etype, chunksize, offset)
94 +        for _format in formats:
95 +            filename = "%s_%06d.%s" % (etype.lower(), offset, _format)
96 +            filepath = os.path.join(output_dir, filename)
97 +            with open(filepath, "ab") as dump_file:
98 +                dump_file.write(graph.serialize(format=_format))
99 +            filenames.append(filepath)
100 +        # clean as much as possible to avoid memory exhaustion
101 +        cnx.drop_entity_cache()
102 +    return filenames
103 +
104 +
105 +def make_archive(output_dir, label, filenames, formats):
106 +    for _format in formats:
107 +        archive_name = "%s_%s.tar.gz" % (label, _format)
108 +        with tarfile.open(os.path.join(output_dir, archive_name), "w:gz") as tar:
109 +            for filename in filenames:
110 +                # add file but specify basename as the alternative filename
111 +                # to avoid nested directory structure in the archive
112 +                tar.add(filename, arcname=os.path.basename(filename))
113 +                # os.remove(filename)
114 +
115 +
116 +def create_dumps(cnx, config):
117 +    output_dir = config.get("output-dir")
118 +    formats = config.get("formats")
119 +    etypes = config.get("etypes")
120 +
121 +    if not os.path.exists(output_dir):
122 +        os.makedirs(output_dir)
123 +    for etype in etypes:
124 +        filenames = create_dumps_etype(cnx, output_dir, etype, formats)
125 +        make_archive(output_dir, etype.lower(), filenames, formats)
126 +
127 +
128 +class RdfDump(Command):
129 +    """ Export data to rdf dumps.
130 +    """
131 +
132 +    name = "rdfdump"
133 +    arguments = "<instance>"
134 +    min_args = max_args = 1
135 +    options = [
136 +        (
137 +            "etypes",
138 +            {
139 +                "type": "csv",
140 +                "default": list(ETYPES_ADAPTERS),
141 +                "help": (
142 +                    "liste des types d'entité à exporter "
143 +                    "de la dernière date de moissonnage"
144 +                ),
145 +            },
146 +        ),
147 +        (
148 +            "output-dir",
149 +            {
150 +                "type": "string",
151 +                "default": "/tmp",
152 +                "help": ("répertoire dans lequel les archives seront créées"),
153 +            },
154 +        ),
155 +        (
156 +            "formats",
157 +            {
158 +                "type": "csv",
159 +                "default": ("nt", "n3", "xml"),
160 +                "help": ("liste des formats dans lequel on veut sérialiser le rdf"),
161 +            },
162 +        ),
163 +    ]
164 +
165 +    def run(self, args):
166 +        appid = args[0]
167 +        with admincnx(appid) as cnx:
168 +            create_dumps(cnx, self)
169 +
170 +
171 +CWCTL.register(RdfDump)