[Cubicweb] work on dataimport stores
aurélien campéas
aurelien.campeas at gmail.com
Wed Feb 25 18:30:31 CET 2015
I pushed a patch on fastimport to provide a store.
Using this the dibench code can shrink a bit.
I reran the tests.
You can use the joint diff on dibench.
Note that to make the test pass I had to de-inline the pref_label relation.
I'm seriously considering adding inlined relation support to
.insert_relations.
Here's the result for the "eurovoc.xml" test:
========== ========== =========
Store time.clock time.time
========== ========== =========
massive 35 52
sqlgen 172 213
fastimport 245 360
========== ========== =========
2015-02-25 13:08 GMT+01:00 aurélien campéas <aurelien.campeas at gmail.com>:
> Right now, fastimport does not provide a store api, but I plan to do so
> (if we agree
> to call a "store" the thing that lives in cw/dataimport).
>
> see http://www.cubicweb.org/ticket/4985775
>
> This will reduce the need to write complicated code to use it and provide
> a drop-in replacement for other stores.
>
> 2015-02-25 8:10 GMT+01:00 Sylvain Thénault <sylvain.thenault at logilab.fr>:
>
>> On 24 février 17:43, aurélien campéas wrote:
>> > Hi Sylvain,
>>
>> Hi Aurélien,
>>
>> > > That would be great if some people could take a look at it and may be
>> take
>> > > some
>> > > time to champion one store or another, as I suppose that the current
>> > > "generic"
>> > > implementation could be optimized depending on each store
>> particularities.
>> > >
>> > > You'll find setup instructions in the README file. The results.txt
>> file
>> > > states
>> > > the overall goals I'm looking for, and provides some results and
>> > > discussion. You
>> > > should probably at least take a look at this if you're interested:
>> > > http://hg.logilab.org/users/sthenault/dibench/file/tip/results.txt
>> >
>> > I am interested in the memory consumption issue of fastimport.
>> > However I stop it at around 8mins run time (without seeing a memory
>> > spike, but with Christophe fix applied).
>> >
>> > Upon investigation I suspect the FastExtEntitiesImporter is suboptimal,
>> > though I have yet to understand what it does...
>> >
>> > I will dig into this.
>>
>> Great. As I said, it's much likely that the FastExtEntitiesImporter could
>> be
>> optimized for the need of one store or another. You're welcome to provide
>> an
>> implementation tweaked for the fastimport store. We should probably do
>> that
>> before starting discussing of a common API.
>>
>> > > You're much welcome to comment about any point on the list or to
>> provide
>> > > some
>> > >
>> > >
>> > Small note: the patch to handle only eids in fastimport is moot
>> provided a
>> > small patch
>> > on dibench that sends entities instead of eids.
>> > I don't think it will affect performance (though I should test it)
>> since at
>> > relation insertion
>> > time the entity is actually cached on the cnx anyway.
>>
>> Though imo the patch is not that intrusive and makes the fastimport store
>> more
>> similar to other stores (which may not use entities at all, and so without
>> anything in the cnx cache). We'll probably want this at some point.
>>
>> --
>> Sylvain Thénault, LOGILAB, Paris (01.45.32.03.12) - Toulouse
>> (05.62.17.16.42)
>> Formations Python, Debian, Méth. Agiles: http://www.logilab.fr/formations
>> Développement logiciel sur mesure: http://www.logilab.fr/services
>> CubicWeb, the semantic web framework: http://www.cubicweb.org
>>
>
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.cubicweb.org/pipermail/cubicweb/attachments/20150225/bac06ee3/attachment-0186.html>
-------------- next part --------------
# HG changeset patch
# User Aurelien Campeas <aurelien.campeas at logilab.fr>
# Date 1424884867 -3600
# Wed Feb 25 18:21:07 2015 +0100
# Node ID d03dca55f7be76044b90b0929e3cf9061c7916e5
# Parent a4397436a65ac528220d93c9e30d57ef09fb5f23
use the fastimport store
diff --git a/benchmark.py b/benchmark.py
--- a/benchmark.py
+++ b/benchmark.py
@@ -59,17 +59,17 @@ class PostgresImportTC(CubicWebTC):
self.assertEqual(label.label, u'communications')
self.failIf(cnx.execute('Any L WHERE NOT EXISTS(L pref_label_of X) AND NOT EXISTS(L alt_label_of Y) AND NOT EXISTS(L hidden_label_of Z)'))
- #data_file, check_imported = 'eurovoc_skos.rdf', lambda *args: None
- data_file, check_imported = 'siaf_matieres.xml', check_siaf_shortened
+ data_file, check_imported = 'eurovoc_skos.rdf', lambda *args: None
+ #data_file, check_imported = 'siaf_matieres.xml', check_siaf_shortened
@timed
- def test_nohook(self):
+ def _test_nohook(self):
with self.admin_access.repo_cnx() as cnx:
_skos_import(cnx, self.data_file, 'nohookos')
self.check_imported()
@timed
- def test_sqlgen(self):
+ def test_asqlgen(self):
with self.admin_access.repo_cnx() as cnx:
_skos_import(cnx, self.data_file, 'sqlgenos')
self.check_imported()
@@ -141,8 +141,8 @@ def _skos_import(cnx, fpath, impl):
store = dataimport.NoHookRQLObjectStore(cnx, metagen)
importercls = ExtEntitiesImporter
elif impl == 'fastimport':
- from cubes.fastimport.entities import FlushController
- store = FlushController(cnx)
+ from cubes.fastimport.store import HookAwareStore
+ store = HookAwareStore(cnx)
importercls = FastExtEntitiesImporter
elif impl == 'massive':
from cubes.dataio.dataimport import MassiveObjectStore
@@ -161,7 +161,8 @@ def _skos_import(cnx, fpath, impl):
elif impl in 'fastimport':
from cubes.worker.testutils import run_all_tasks
errors = []
- store.run_deferred_hooks(errors)
+ store.fc.run_deferred_hooks(errors)
+ store.flush()
assert not errors
cnx.commit()
run_all_tasks(cnx)
@@ -195,122 +196,13 @@ def iter_rdef(schema):
-class ETypeBuffer(object):
- def __init__(self):
- self._queue = []
- self._etype = None
-
- def push(self, etype, *args):
- if etype == 'ConceptScheme':
- yield etype, [args]
- return
- if self._etype is None:
- self._etype = etype
- self._queue.append(args)
- elif etype == self._etype:
- self._queue.append(args)
- else:
- yield self._etype, self._queue
- self._etype = etype
- self._queue = [args]
-
- def flush(self):
- if self._etype is not None:
- yield self._etype, self._queue
- self._etype = None
- self._queue = []
-
class FastExtEntitiesImporter(ExtEntitiesImporter):
- def _import_entities(self, entities, uri2eid, queue):
- store = self.store
- entity_from_eid = self.cnx.entity_from_eid
- def _create(buffer_content):
- for etype, entity_dicts in buffer_content:
- #print 'creating', len(entity_dicts), etype
- for entity in store.insert_entities(etype, entity_dicts):
- uri2eid[entity.cwuri] = entity.eid
- created.add(entity.eid)
- def _update(buffer_content):
- for etype, entities in buffer_content:
- for entity, entity_dict in entities:
- # XXX some inlined relations may already exists
- entity.cw_set(**entity_dict)
- updated.add(entity.eid)
- deferred = {} # non inlined relations that may be deferred
- created = set()
- updated = set()
- self.import_log.record_debug('importing entities')
- create_buffer = ETypeBuffer()
- update_buffer = ETypeBuffer()
- for extentity in self.iter_ext_entities(entities, uri2eid, deferred, queue):
- try:
- entity = entity_from_eid(uri2eid[extentity.extid], extentity.etype)
- except KeyError:
- values = extentity.values
- values['cwuri'] = extentity.extid
- _create(create_buffer.push(extentity.etype, values, None))
- else:
- if extentity:
- _update(update_buffer.push(etype, entity, extentity.values))
- _create(create_buffer.flush())
- _update(update_buffer.flush())
- # XXX ensure we remove everything we want from the queue
- found_some = True
- schema = self.cnx.vreg.schema
- while queue and found_some:
- found_some = False
- for etype in self.etypes_order_hint:
- if etype in queue:
- create_entities = []
- new_queue = []
- for extentity in queue[etype]:
- if extentity.is_ready(uri2eid):
- try:
- entity = entity_from_eid(uri2eid[extentity.extid], etype)
- except KeyError:
- values = extentity.values
- values['cwuri'] = extentity.extid
- create_entities.append((values, None))
- else:
- _update([(etype, [(entity, extentity.values)])])
- else:
- new_queue.append(extentity)
- if create_entities:
- found_some = True
- #print 'post creating', len(create_entities), etype
- for entity in store.insert_entities(etype, create_entities):
- uri2eid[entity.cwuri] = entity.eid
- created.add(entity.eid)
- if new_queue:
- queue[etype][:] = new_queue
- else:
- del queue[etype]
- return deferred, created, updated
- def _create_deferred_relations(self, deferred, uri2eid):
- rschema = self.cnx.vreg.schema.rschema
- missing_relations = []
- for rtype, relations in deferred.items():
- self.import_log.record_debug('importing %s %s relations' % (len(relations), rtype))
- symmetric = rschema(rtype).symmetric
- existing = self._existing_relations(rtype)
- new_relations = []
- for subject_uri, object_uri in relations:
- try:
- subject_eid = uri2eid[subject_uri]
- object_eid = uri2eid[object_uri]
- except KeyError:
- missing_relations.append((subject_uri, rtype, object_uri))
- continue
- if (subject_eid, object_eid) not in existing:
- new_relations.append((subject_eid, object_eid))
- existing.add((subject_eid, object_eid))
- if symmetric:
- existing.add((object_eid, subject_eid))
- if new_relations:
- self.store.insert_relations(rtype, new_relations)
- return missing_relations
+ def _import_entities(self, *args):
+ values = super(FastExtEntitiesImporter, self)._import_entities(*args)
+ self.store.flush()
+ return values
class MassiveExtEntitiesImporter(ExtEntitiesImporter):
diff --git a/results.txt b/results.txt
--- a/results.txt
+++ b/results.txt
@@ -32,13 +32,22 @@ sqlgen 2.66 3.50
nohook 3.97 7.79
========== ========== =========
+Auc (& the fastimport store):
+========== ========== =========
+Store time.clock time.time
+========== ========== =========
+massive 0.82 1.03
+sqlgen 2.44 3.04
+fastimport 4.11 5.62
+========== ========== =========
+
eurovoc.xml
-----------
http://open-data.europa.eu/fr/data/dataset/eurovoc
-XXX entities, XXX relations
+331396 entities, 25943 relations
========== ========== =========
Store time.clock time.time
@@ -48,8 +57,14 @@ sqlgen 197.90 306.66
nohook 369.23 4947.68
========== ========== =========
-I've not yet been able to import Eurovoc using fastimport (killed by the system because it consumes
-too much memory).
+Auc (& the fastimport store):
+========== ========== =========
+Store time.clock time.time
+========== ========== =========
+massive 35 52
+sqlgen 172 213
+fastimport 245 360
+========== ========== =========
API
@@ -90,4 +105,6 @@ store. In the end, I would expect two st
a store with fastimport capabilities would be great, but would need some work to have an
implementation that doesn't rely on the worker cube.
+auc: can't the massive object store also slow down considerably concurrent transactions while it runs ?
+
TODO explain the overall strategy of each store.
More information about the Cubicweb
mailing list