adds tests for new task system

This commit is contained in:
Mouse Reeve
2021-11-11 15:17:32 -08:00
parent 908c9dc689
commit c33d791974
2 changed files with 24 additions and 12 deletions

View File

@ -112,14 +112,14 @@ def start_import_task(source, job_id):
"""trigger the child tasks for each row"""
job = ImportJob.objects.get(id=job_id)
# these are sub-tasks so that one big task doesn't use up all the memory in celery
for item in job.items.values("id").all():
import_item_task.delay(source, item.id)
for item in job.items.values_list("id", flat=True).all():
import_item_task.delay(source, item)
@app.task(queue="low_priority")
def import_item_task(source, item_id):
"""resolve a row into a book"""
item = models.ImportItem.objets.get(id=item_id)
item = models.ImportItem.objects.get(id=item_id)
try:
item.resolve()
except Exception as err: # pylint: disable=broad-except