[PATCH 4 of 4 celerytask] Migrate task logs from redis and database to logs files

Philippe Pepiot philippe.pepiot at logilab.fr
Fri Jun 22 14:59:58 CEST 2018


# HG changeset patch
# User Philippe Pepiot <philippe.pepiot at logilab.fr>
# Date 1529670043 -7200
#      Fri Jun 22 14:20:43 2018 +0200
# Node ID c1d93ed176f23c9d97a544d46b6a2374fa944afe
# Parent  82130ff7c6d5d8e733c34881bc08a6b066c6a962
# Available At https://hg.logilab.org/review/cubes/celerytask
#              hg pull https://hg.logilab.org/review/cubes/celerytask -r c1d93ed176f2
Migrate task logs from redis and database to logs files

Having logs stored in redis then in database took too much memory in redis and
storage in database.
Using files is far simplier, but it require to have a shared file system (nfs)
when the worker and the cubiweb instance (the reader) are not in the same
server.

Use the new cw_celerytask_helpers filelogger instead of redislogger.
Logs are stored in celerytask-log-dir directory in gzip with a predictible
filename based on task_id (which is unique).

Drop task_logs attribute from CeleryTask and update tests accordingly.
celery-monitor don't copy anymore from redis to database when the task is
ended.

diff --git a/entities.py b/entities.py
--- a/entities.py
+++ b/entities.py
@@ -27,7 +27,7 @@ from cubicweb.view import EntityAdapter
 from cubicweb.predicates import is_instance
 from cubicweb.server.hook import DataOperationMixIn, Operation
 
-from cw_celerytask_helpers import redislogger as loghelper
+from cw_celerytask_helpers.filelogger import get_task_logs
 
 from cubes.celerytask import STATES, FINAL_STATES
 
@@ -193,7 +193,7 @@ class ICeleryTask(EntityAdapter):
 
     @property
     def logs(self):
-        return loghelper.get_task_logs(self.task_id) or b''
+        return get_task_logs(self.task_id) or b''
 
     @property
     def result(self):
@@ -353,15 +353,6 @@ class CeleryTaskAdapter(ICeleryTask):
             self._cw.commit()
 
     @property
-    def logs(self):
-        task_logs = self.entity.task_logs
-        if task_logs is not None:
-            task_logs.seek(0)
-            return task_logs.read()
-        else:
-            return super(CeleryTaskAdapter, self).logs
-
-    @property
     def state(self):
         db_state = self.entity.cw_adapt_to('IWorkflowable').state
         db_final_state_map = {'done': STATES.SUCCESS, 'failed': STATES.FAILURE}
diff --git a/hooks.py b/hooks.py
--- a/hooks.py
+++ b/hooks.py
@@ -24,11 +24,10 @@ import os.path
 from celery import current_app
 import celery.task.control
 
-from cubicweb import Binary
-from cubicweb.predicates import on_fire_transition, is_instance
+from cubicweb.predicates import is_instance
 from cubicweb.server.hook import Hook, DataOperationMixIn, Operation
 
-from cw_celerytask_helpers.redislogger import flush_task_logs
+from cw_celerytask_helpers.filelogger import flush_task_logs
 
 
 class FlushCeleryTaskLogsOp(DataOperationMixIn, Operation):
@@ -38,21 +37,6 @@ class FlushCeleryTaskLogsOp(DataOperatio
             flush_task_logs(task_id)
 
 
-class CeleryTaskFinishedHook(Hook):
-    __regid__ = 'celerytask.celerytask_finished'
-    __select__ = (Hook.__select__ &
-                  on_fire_transition('CeleryTask', ('finish', 'fail')))
-    events = ('after_add_entity',)
-
-    def __call__(self):
-        if current_app.conf.CELERY_ALWAYS_EAGER:
-            return
-        entity = self.entity.for_entity
-        logs = Binary(entity.cw_adapt_to('ICeleryTask').logs)
-        entity.cw_set(task_logs=logs)
-        FlushCeleryTaskLogsOp.get_instance(self._cw).add_data(entity.task_id)
-
-
 class DeleteCeleryTaskOp(DataOperationMixIn, Operation):
 
     def postcommit_event(self):
diff --git a/migration/0.6.0_Any.py b/migration/0.6.0_Any.py
--- a/migration/0.6.0_Any.py
+++ b/migration/0.6.0_Any.py
@@ -1,1 +1,4 @@
+from cubes.celerytask.migration.utils import migrate_task_logs_to_bfss
 option_added('celerytask-log-dir')
+migrate_task_logs_to_bfss(cnx)
+drop_attribute('CeleryTask', 'task_logs')
diff --git a/migration/__init__.py b/migration/__init__.py
new file mode 100644
diff --git a/migration/utils.py b/migration/utils.py
new file mode 100644
--- /dev/null
+++ b/migration/utils.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# copyright 2018 LOGILAB S.A. (Paris, FRANCE), all rights reserved.
+# contact http://www.logilab.fr -- mailto:contact at logilab.fr
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation, either version 2.1 of the License, or (at your option)
+# any later version.
+#
+# This program is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import gzip
+
+from logilab.common.shellutils import ProgressBar
+from cw_celerytask_helpers.filelogger import get_log_filename
+from cw_celerytask_helpers.redislogger import get_task_logs, flush_task_logs
+
+
+def migrate_task_logs_to_bfss(cnx):
+    """Migrate logs from redis and from database to logs files in
+    celerytask-log-dir"""
+    to_flush = set()
+    rset = cnx.execute('Any X, T WHERE X is CeleryTask, X task_id T')
+    pb = ProgressBar(len(rset))
+    for eid, task_id in rset:
+        entity = cnx.entity_from_eid(eid)
+        logs = get_task_logs(task_id)
+        if logs is not None:
+            to_flush.add(task_id)
+        else:
+            if entity.task_logs is not None:
+                logs = entity.task_logs.read()
+        if logs is not None:
+            fname = get_log_filename(task_id)
+            with gzip.open(fname, 'wb') as f:
+                f.write(logs)
+        pb.update()
+    cnx.commit()
+    for task_id in to_flush:
+        flush_task_logs(task_id)
diff --git a/schema.py b/schema.py
--- a/schema.py
+++ b/schema.py
@@ -18,7 +18,7 @@
 """cubicweb-celerytask schema"""
 
 
-from yams.buildobjs import RelationDefinition, String, Bytes
+from yams.buildobjs import RelationDefinition, String
 
 from cubicweb.schema import WorkflowableEntityType
 
@@ -26,7 +26,6 @@ from cubicweb.schema import Workflowable
 class CeleryTask(WorkflowableEntityType):
     task_id = String(maxsize=40, required=False, indexed=True, unique=True)
     task_name = String(required=True)
-    task_logs = Bytes()
 
 
 class parent_task(RelationDefinition):
diff --git a/test/data/tasks/tasks.py b/test/data/tasks/tasks.py
--- a/test/data/tasks/tasks.py
+++ b/test/data/tasks/tasks.py
@@ -10,7 +10,8 @@ import six
 from celery import current_app as app, chord
 from celery.utils.log import get_task_logger
 
-from cw_celerytask_helpers.redislogger import redirect_stdouts, get_redis_client
+from cw_celerytask_helpers.utils import get_redis_client
+from cw_celerytask_helpers import redirect_stdouts
 
 cw_logger = logging.getLogger('cubes.tasks')
 dummy_logger = logging.getLogger('dummy')
diff --git a/test/test_celerytask.py b/test/test_celerytask.py
--- a/test/test_celerytask.py
+++ b/test/test_celerytask.py
@@ -21,6 +21,7 @@ import collections
 import logging
 import unittest
 import time
+import os.path
 
 import six
 import celery
@@ -33,7 +34,8 @@ from cubes.celerytask.entities import (s
                                        run_all_tasks)
 from cubes.celerytask.testutils import BaseCeleryTaskTC
 
-import cw_celerytask_helpers.redislogger as loghelper
+from cw_celerytask_helpers.utils import get_redis_client
+from cw_celerytask_helpers.filelogger import get_log_filename
 
 
 def wait_until(func, timeout=10, retry=1):
@@ -85,7 +87,7 @@ class CeleryTaskTC(BaseCeleryTaskTC):
             self.wait_async_task(cnx, cwtask.task_id)
             wf = cwtask.cw_adapt_to('IWorkflowable')
             self.assertEqual(wf.state, 'failed')
-            logs = cwtask.task_logs.read().decode('utf8')
+            logs = cwtask.cw_adapt_to('ICeleryTask').logs.decode('utf-8')
             if six.PY2:
                 self.assertIn(
                     (u"""raise RuntimeError(u'Cette tâche a échoué'."""
@@ -118,15 +120,7 @@ class CeleryTaskTC(BaseCeleryTaskTC):
             run_all_tasks(cnx)
             cwtask = cnx.entity_from_eid(cwtask_eid)
             self.wait_async_task(cnx, cwtask.task_id)
-
-            # logs should be flushed from redis to database
-            redis_logs = loghelper.get_task_logs(cwtask.task_id)
-            self.assertIsNone(redis_logs)
-            cwtask.task_logs.seek(0)
-            task_logs = cwtask.task_logs.read()
             logs = cwtask.cw_adapt_to('ICeleryTask').logs
-            self.assertEqual(task_logs, logs)
-
             self.assertIn(b'out should be in logs', logs)
             self.assertIn(b'err should be in logs', logs)
             self.assertIn(b'cw warning should be in logs', logs)
@@ -139,7 +133,7 @@ class CeleryTaskTC(BaseCeleryTaskTC):
             self.assertIn(b'raise Exception("oops")', logs)
 
     def test_task_deleted(self):
-        rdb = loghelper.get_redis_client()
+        rdb = get_redis_client()
         with self.admin_access.cnx() as cnx:
             # this 'buggy_task_revoked' key is used simulate the 'revoke' since
             # it's not handled by celery in threaded solo mode that we use in
@@ -157,8 +151,7 @@ class CeleryTaskTC(BaseCeleryTaskTC):
             rdb.set('buggy_task_revoked', 'yes')
             revoke.assert_called_once_with([task.task_id], signal='SIGKILL',
                                            terminate=True)
-            self.assertNotIn(loghelper.get_log_key(task.task_id),
-                             rdb.keys('cw:celerytask:log:*'))
+            self.assertFalse(os.path.exists(get_log_filename(task.task_id)))
 
     @unittest.skipIf(celery.VERSION.major == 3, "not supported with celery 3")
     def test_workflow_chain(self):
diff --git a/testutils.py b/testutils.py
--- a/testutils.py
+++ b/testutils.py
@@ -50,6 +50,10 @@ class BaseCeleryTaskTC(testlib.CubicWebT
         conf.CELERY_RESULT_SERIALIZER = 'json'
         conf.CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']
         conf.CELERY_IMPORTS = ('cw_celerytask_helpers.helpers', 'tasks')
+        # this is required since we use a non-cubicweb worker, so the startup
+        # hook setting CUBICWEB_CELERYTASK_LOGDIR won't run.
+        conf.CUBICWEB_CELERYTASK_LOGDIR = os.path.join(
+            cls.config.appdatahome, 'logs')
         import tasks  # noqa
         cls.worker = multiprocessing.Process(target=cls.start_worker)
         cls.worker.start()


More information about the cubicweb-devel mailing list