[Cubicweb] work on dataimport stores

aurélien campéas aurelien.campeas at gmail.com
Wed Feb 25 18:30:31 CET 2015


I pushed a patch on fastimport to provide a store.

Using this the dibench code can shrink a bit.
I reran the tests.

You can use the joint diff on dibench.
Note that to make the test pass I had to de-inline the pref_label relation.

I'm seriously considering adding inlined relation support to
.insert_relations.

Here's the result for the "eurovoc.xml" test:

==========  ========== =========
Store       time.clock time.time
==========  ========== =========
massive            35         52
sqlgen            172        213
fastimport        245        360
==========  ========== =========


2015-02-25 13:08 GMT+01:00 aurélien campéas <aurelien.campeas at gmail.com>:

> Right now, fastimport does not provide a store api, but I plan to do so
> (if we agree
> to call a "store" the thing that lives in cw/dataimport).
>
> see http://www.cubicweb.org/ticket/4985775
>
> This will reduce the need to write complicated code to use it and provide
> a drop-in replacement for other stores.
>
> 2015-02-25 8:10 GMT+01:00 Sylvain Thénault <sylvain.thenault at logilab.fr>:
>
>> On 24 février 17:43, aurélien campéas wrote:
>> > Hi Sylvain,
>>
>> Hi Aurélien,
>>
>> > > That would be great if some people could take a look at it and may be
>> take
>> > > some
>> > > time to champion one store or another, as I suppose that the current
>> > > "generic"
>> > > implementation could be optimized depending on each store
>> particularities.
>> > >
>> > > You'll find setup instructions in the README file. The results.txt
>> file
>> > > states
>> > > the overall goals I'm looking for, and provides some results and
>> > > discussion. You
>> > > should probably at least take a look at this if you're interested:
>> > > http://hg.logilab.org/users/sthenault/dibench/file/tip/results.txt
>> >
>> > I am interested in the memory consumption issue of fastimport.
>> > However I stop it at around 8mins run time (without seeing a memory
>> > spike, but with Christophe fix applied).
>> >
>> > Upon investigation I suspect the FastExtEntitiesImporter is suboptimal,
>> > though I have yet to understand what it does...
>> >
>> > I will dig into this.
>>
>> Great. As I said, it's much likely that the FastExtEntitiesImporter could
>> be
>> optimized for the need of one store or another. You're welcome to provide
>> an
>> implementation tweaked for the fastimport store. We should probably do
>> that
>> before starting discussing of a common API.
>>
>> > > You're much welcome to comment about any point on the list or to
>> provide
>> > > some
>> > >
>> > >
>> > Small note: the patch to handle only eids in fastimport is moot
>> provided a
>> > small patch
>> > on dibench that sends entities instead of eids.
>> > I don't think it will affect performance (though I should test it)
>> since at
>> > relation insertion
>> > time the entity is actually cached on the cnx anyway.
>>
>> Though imo the patch is not that intrusive and makes the fastimport store
>> more
>> similar to other stores (which may not use entities at all, and so without
>> anything in the cnx cache). We'll probably want this at some point.
>>
>> --
>> Sylvain Thénault, LOGILAB, Paris (01.45.32.03.12) - Toulouse
>> (05.62.17.16.42)
>> Formations Python, Debian, Méth. Agiles: http://www.logilab.fr/formations
>> Développement logiciel sur mesure:       http://www.logilab.fr/services
>> CubicWeb, the semantic web framework:    http://www.cubicweb.org
>>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.cubicweb.org/pipermail/cubicweb/attachments/20150225/bac06ee3/attachment-0165.html>
-------------- next part --------------
# HG changeset patch
# User Aurelien Campeas <aurelien.campeas at logilab.fr>
# Date 1424884867 -3600
#      Wed Feb 25 18:21:07 2015 +0100
# Node ID d03dca55f7be76044b90b0929e3cf9061c7916e5
# Parent  a4397436a65ac528220d93c9e30d57ef09fb5f23
use the fastimport store

diff --git a/benchmark.py b/benchmark.py
--- a/benchmark.py
+++ b/benchmark.py
@@ -59,17 +59,17 @@ class PostgresImportTC(CubicWebTC):
             self.assertEqual(label.label, u'communications')
             self.failIf(cnx.execute('Any L WHERE NOT EXISTS(L pref_label_of X) AND NOT EXISTS(L alt_label_of Y) AND NOT EXISTS(L hidden_label_of Z)'))
 
-    #data_file, check_imported = 'eurovoc_skos.rdf', lambda *args: None
-    data_file, check_imported = 'siaf_matieres.xml', check_siaf_shortened
+    data_file, check_imported = 'eurovoc_skos.rdf', lambda *args: None
+    #data_file, check_imported = 'siaf_matieres.xml', check_siaf_shortened
 
     @timed
-    def test_nohook(self):
+    def _test_nohook(self):
         with self.admin_access.repo_cnx() as cnx:
             _skos_import(cnx, self.data_file, 'nohookos')
         self.check_imported()
 
     @timed
-    def test_sqlgen(self):
+    def test_asqlgen(self):
         with self.admin_access.repo_cnx() as cnx:
             _skos_import(cnx, self.data_file, 'sqlgenos')
         self.check_imported()
@@ -141,8 +141,8 @@ def _skos_import(cnx, fpath, impl):
             store = dataimport.NoHookRQLObjectStore(cnx, metagen)
         importercls = ExtEntitiesImporter
     elif impl == 'fastimport':
-        from cubes.fastimport.entities import FlushController
-        store = FlushController(cnx)
+        from cubes.fastimport.store import HookAwareStore
+        store = HookAwareStore(cnx)
         importercls = FastExtEntitiesImporter
     elif impl == 'massive':
         from cubes.dataio.dataimport import MassiveObjectStore
@@ -161,7 +161,8 @@ def _skos_import(cnx, fpath, impl):
         elif impl in 'fastimport':
             from cubes.worker.testutils import run_all_tasks
             errors = []
-            store.run_deferred_hooks(errors)
+            store.fc.run_deferred_hooks(errors)
+            store.flush()
             assert not errors
             cnx.commit()
             run_all_tasks(cnx)
@@ -195,122 +196,13 @@ def iter_rdef(schema):
 
 
 
-class ETypeBuffer(object):
-    def __init__(self):
-        self._queue = []
-        self._etype = None
-
-    def push(self, etype, *args):
-        if etype == 'ConceptScheme':
-            yield etype, [args]
-            return
-        if self._etype is None:
-            self._etype = etype
-            self._queue.append(args)
-        elif etype == self._etype:
-            self._queue.append(args)
-        else:
-            yield self._etype, self._queue
-            self._etype = etype
-            self._queue = [args]
-
-    def flush(self):
-        if self._etype is not None:
-            yield self._etype, self._queue
-            self._etype = None
-            self._queue = []
-
 
 class FastExtEntitiesImporter(ExtEntitiesImporter):
-    def _import_entities(self, entities, uri2eid, queue):
-        store = self.store
-        entity_from_eid = self.cnx.entity_from_eid
-        def _create(buffer_content):
-            for etype, entity_dicts in buffer_content:
-                #print 'creating', len(entity_dicts), etype
-                for entity in store.insert_entities(etype, entity_dicts):
-                    uri2eid[entity.cwuri] = entity.eid
-                    created.add(entity.eid)
-        def _update(buffer_content):
-            for etype, entities in buffer_content:
-                for entity, entity_dict in entities:
-                    # XXX some inlined relations may already exists
-                    entity.cw_set(**entity_dict)
-                    updated.add(entity.eid)
-        deferred = {} # non inlined relations that may be deferred
-        created = set()
-        updated = set()
-        self.import_log.record_debug('importing entities')
-        create_buffer = ETypeBuffer()
-        update_buffer = ETypeBuffer()
-        for extentity in self.iter_ext_entities(entities, uri2eid, deferred, queue):
-            try:
-                entity = entity_from_eid(uri2eid[extentity.extid], extentity.etype)
-            except KeyError:
-                values = extentity.values
-                values['cwuri'] = extentity.extid
-                _create(create_buffer.push(extentity.etype, values, None))
-            else:
-                if extentity:
-                    _update(update_buffer.push(etype, entity, extentity.values))
-        _create(create_buffer.flush())
-        _update(update_buffer.flush())
-        # XXX ensure we remove everything we want from the queue
-        found_some = True
-        schema = self.cnx.vreg.schema
-        while queue and found_some:
-            found_some = False
-            for etype in self.etypes_order_hint:
-                if etype in queue:
-                    create_entities = []
-                    new_queue = []
-                    for extentity in queue[etype]:
-                        if extentity.is_ready(uri2eid):
-                            try:
-                                entity = entity_from_eid(uri2eid[extentity.extid], etype)
-                            except KeyError:
-                                values = extentity.values
-                                values['cwuri'] = extentity.extid
-                                create_entities.append((values, None))
-                            else:
-                                _update([(etype, [(entity, extentity.values)])])
-                        else:
-                            new_queue.append(extentity)
-                    if create_entities:
-                        found_some = True
-                        #print 'post creating', len(create_entities), etype
-                        for entity in store.insert_entities(etype, create_entities):
-                            uri2eid[entity.cwuri] = entity.eid
-                            created.add(entity.eid)
-                    if new_queue:
-                        queue[etype][:] = new_queue
-                    else:
-                        del queue[etype]
-        return deferred, created, updated
 
-    def _create_deferred_relations(self, deferred, uri2eid):
-        rschema = self.cnx.vreg.schema.rschema
-        missing_relations = []
-        for rtype, relations in deferred.items():
-            self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype))
-            symmetric = rschema(rtype).symmetric
-            existing = self._existing_relations(rtype)
-            new_relations = []
-            for subject_uri, object_uri in relations:
-                try:
-                    subject_eid = uri2eid[subject_uri]
-                    object_eid = uri2eid[object_uri]
-                except KeyError:
-                    missing_relations.append((subject_uri, rtype, object_uri))
-                    continue
-                if (subject_eid, object_eid) not in existing:
-                    new_relations.append((subject_eid, object_eid))
-                    existing.add((subject_eid, object_eid))
-                    if symmetric:
-                        existing.add((object_eid, subject_eid))
-            if new_relations:
-                self.store.insert_relations(rtype, new_relations)
-        return missing_relations
+    def _import_entities(self, *args):
+        values = super(FastExtEntitiesImporter, self)._import_entities(*args)
+        self.store.flush()
+        return values
 
 
 class MassiveExtEntitiesImporter(ExtEntitiesImporter):
diff --git a/results.txt b/results.txt
--- a/results.txt
+++ b/results.txt
@@ -32,13 +32,22 @@ sqlgen           2.66      3.50
 nohook           3.97      7.79
 ========== ========== =========
 
+Auc (& the fastimport store):
+========== ========== =========
+Store      time.clock time.time
+========== ========== =========
+massive          0.82      1.03
+sqlgen           2.44      3.04
+fastimport       4.11      5.62
+========== ========== =========
+
 
 eurovoc.xml
 -----------
 
 http://open-data.europa.eu/fr/data/dataset/eurovoc
 
-XXX entities, XXX relations
+331396 entities, 25943 relations
 
 ==========  ========== =========
 Store       time.clock time.time
@@ -48,8 +57,14 @@ sqlgen          197.90    306.66
 nohook          369.23   4947.68
 ==========  ========== =========
 
-I've not yet been able to import Eurovoc using fastimport (killed by the system because it consumes
-too much memory).
+Auc (& the fastimport store):
+==========  ========== =========
+Store       time.clock time.time
+==========  ========== =========
+massive            35         52
+sqlgen            172        213
+fastimport        245        360
+==========  ========== =========
 
 
 API
@@ -90,4 +105,6 @@ store. In the end, I would expect two st
 a store with fastimport capabilities would be great, but would need some work to have an
 implementation that doesn't rely on the worker cube.
 
+auc: can't the massive object store also slow down considerably concurrent transactions while it runs ?
+
 TODO explain the overall strategy of each store.


More information about the Cubicweb mailing list