[Cubicweb] Enhancing `bulk` writes in CubicWeb

Aurélien Campéas aurelien.campeas at logilab.fr
Mon May 5 19:16:54 CEST 2014


On 02/05/2014 15:05, Aurélien Campéas wrote:
[...]
> 
> Going forward
> ~~~~~~~~~~~~~
> 
> The new hooks runner for faster insertions has the following API:
> 
> * `HooksRunner` (logger, session, disabled_regids,
>                  deferred_entity_hooks, deferred_relation_hooks)
> 
> * `call_etype_hooks` (event, etype, entities, inlinedrtypes)
> * `call_rtype_hooks` (event, rtype, relations)
> 
> It will prune hooks whenever possible for the duration of the
> `session` (read: transaction), not run the disabled regids, not run
> the deferred regids.
> 
> For the deferred regids, only entities and relations are
> collected. These objects can be used later to make the relevant hooks:
> 
> * run in a subsequent transaction (makes sense for notifications of
>   FTI), or
> 
> * run, but in a different form (e.g. some of the hooks/op code is
>   folded into a single method)


uh, I forgot an important piece: the problem with hooks as currently
existing is they apply to *one entity at a time* ...

One intersting optimisation is vectorizing the execution of some.
Here's an example (the interesting part for RQLConstraint)::

 if regid == 'checkattrconstraint':
     for etype, entities in entities_by_regid.iteritems():
         done = 0
         eschema = schema[etype]
         insertattrs = set(entities[0].cw_attr_cache)
         for rtype in insertattrs:
             if schema[rtype].inlined:
                 continue
             for constraint in eschema.rdef(rtype).constraints:
                 if isinstance(constraint, RQLConstraint):
                     if not check_attribute_repo_constraint(session,
self.logger,
                                                            entities,
constraint):
                         for entity in entities:
                             signalerror(etype, entity.eid, rtype,
'subject')
                     done += 1
                     continue
                 for entity in entities:
                     value = entity.cw_attr_cache.get(rtype)
                     if value is not None:
                         if not constraint.check(entity, rtype, value):
                             signalerror(etype, entity.eid, rtype,
'subject')
                         done += 1
         if done:
             self.logger.info('%s: checked %s entities (for %s)', regid,
done, etype)



With:

 def contiguousboundaries(eids):
     """
     >>> r = [1, 2, 3, 4, 7, 55, 56, 57, 98, 99]
     >>> assert r == sorted(r)
     >>> contiguousboundaries(r)
     [(1, 4), (7, 7), (55, 57), (98, 99)]
     """
     partitionindices = numpy.where(numpy.diff(eids) != 1)[0]
     boundaries = []
     i = 0
     for j in partitionindices:
         boundaries.append((eids[i], eids[j]))
         i = j+1
     boundaries.append((eids[i], eids[len(eids) - 1]))
     return boundaries

 def check_attribute_repo_constraint(session, logger, entities, constraint):
     eids = [e.eid for e in entities]
     eidboundaries = contiguousboundaries(eids)
     for mineid, maxeid in eidboundaries:
         if not _check_attribute_repo_constraint(session, logger,
mineid, maxeid, constraint):
             return False
     return True

 def _check_attribute_repo_constraint(session, logger, mineid, maxeid,
constraint):
     expression = 'S eid > %(mineid)s, S eid < %(maxeid)s, ' +
constraint.expression
     args = {'mineid': mineid - 1, 'maxeid': maxeid + 1}
     if 'U' in constraint.rqlst.defined_vars:
         expression = 'U eid %(u)s, ' + expression
         args['u'] = session.user.eid
     rql = 'Any %s WHERE %s' % (','.join(sorted(constraint.mainvars)),
expression)
     if constraint.distinct_query:
         rql = 'DISTINCT ' + rql
     logger.info('constraint execution: %s (args: %s)', rql, args)
     rset = session.execute(rql, args, build_descr=False)
     return rset.rowcount == (maxeid - mineid) + 1




More information about the Cubicweb mailing list