Skip to content

Commit

Permalink
indexer: fix bulk indexing
Browse files Browse the repository at this point in the history
* Fixes race condition during parallel indexing of documents with
persons creation.
* Adds cli to post create persons from documents file.
* Adds online update function for persons.

without person     : poetry run setup  503,53s user 35,71s system 62% cpu 14:19,69 total
with person        : poetry run setup  508,56s user 34,93s system 62% cpu 14:32,93 total
with person enqueue: poetry run setup -k  495,98s user 34,25s system 62% cpu 14:12,54 total

Co-Authored-by: Peter Weber <[email protected]>
  • Loading branch information
rerowep and rerowep committed Sep 11, 2020
1 parent d08189d commit 948c602
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 48 deletions.
113 changes: 103 additions & 10 deletions rero_ils/modules/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 22,7 @@

import difflib
import gc
import itertools
import json
import logging
import multiprocessing
Expand All @@ -32,6 33,7 @@
from collections import OrderedDict
from glob import glob
from pprint import pprint
from time import sleep

import click
import polib
Expand All @@ -40,6 42,7 @@
import xmltodict
import yaml
from babel import Locale, core
from celery.task.control import inspect
from dojson.contrib.marc21.utils import create_record, split_stream
from flask import current_app
from flask.cli import with_appcontext
Expand All @@ -65,6 68,7 @@
from .items.cli import create_items, reindex_items
from .loans.cli import create_loans
from .patrons.cli import import_users
from .persons.tasks import create_mef_record_online
from .tasks import process_bulk_queue
from .utils import get_record_class_from_schema_or_pid_type, read_json_record
from ..modules.providers import append_fixtures_new_identifiers
Expand Down Expand Up @@ -112,6 116,53 @@ def utils():
"""Misc management commands."""


def queue_count():
"""Count tasks in celery."""
inspector = inspect()
task_count = 0
reserved = inspector.reserved()
if reserved:
for key, values in reserved.items():
task_count = len(values)
active = inspector.active()
if active:
for key, values in active.items():
task_count = len(values)
return task_count


def wait_empty_tasks(delay, verbose=False):
"""Wait for tasks to be empty."""
if verbose:
spinner = itertools.cycle(['-', '\\', '|', '/'])
click.echo(
'Waiting: {spinner}\r'.format(spinner=next(spinner)),
nl=False
)
count = queue_count()
sleep(5)
count = queue_count()
while count:
if verbose:
click.echo(
'Waiting: {spinner}\r'.format(spinner=next(spinner)),
nl=False
)
sleep(delay)
count = queue_count()
sleep(5)
count = queue_count()


@utils.command('wait_empty_tasks')
@click.option('-d', '--delay', 'delay', default=3)
@with_appcontext
def wait_empty_tasks_cli(delay):
"""Wait for tasks to be empty."""
wait_empty_tasks(delay=delay, verbose=True)
click.secho('No active celery tasks.', fg='green')


@utils.command('show')
@click.argument('pid_value', nargs=1)
@click.option('-t', '--pid-type', 'pid-type, default(document_id)',
Expand Down Expand Up @@ -222,7 273,7 @@ def init(force):
bar.label = name


@click.command('create')
@fixtures.command('create')
@click.option('-a', '--append', 'append', is_flag=True, default=False)
@click.option('-r', '--reindex', 'reindex', is_flag=True, default=False)
@click.option('-c', '--dbcommit', 'dbcommit', is_flag=True, default=False)
Expand Down Expand Up @@ -337,23 388,18 @@ def create(infile, append, reindex, dbcommit, verbose, debug, schema, pid_type,
error_file.write(']')


fixtures.add_command(create)


@click.command('count')
@fixtures.command('count')
@click.option('-l', '--lazy', 'lazy', is_flag=True, default=False)
@click.argument('infile', type=click.File('r'), default=sys.stdin)
def count(infile, lazy):
def count_cli(infile, lazy):
"""Count records in file.
:param infile: Json file
:param lazy: lazy reads file
:return: count of records
"""
click.secho(
'Count records from {file_name}.'.format(
file_name=infile.name
),
'Count records from {file_name}.'.format(file_name=infile.name),
fg='green'
)
if lazy:
Expand All @@ -368,7 414,54 @@ def count(infile, lazy):
click.echo('Count: {count}'.format(count=count))


fixtures.add_command(count)
@fixtures.command('get_all_mef_records')
@click.argument('infile', type=click.File('r'), default=sys.stdin)
@click.option('-l', '--lazy', 'lazy', is_flag=True, default=False,
help="lazy reads file")
@click.option('-k', '--enqueue', 'enqueue', is_flag=True, default=False,
help="Enqueue record creation.")
@click.option('-v', '--verbose', 'verbose', is_flag=True, default=True,
help='verbose')
@click.option('-w', '--wait', 'wait', is_flag=True, default=False,
help="wait for enqueued tasks to finish")
@with_appcontext
def get_all_mef_records(infile, lazy, verbose, enqueue, wait):
"""Get all persons for given document file."""
click.secho(
'Get all persons for {file_name}.'.format(file_name=infile.name),
fg='green'
)
if lazy:
# try to lazy read json file (slower, better memory management)
records = read_json_record(infile)
else:
# load everything in memory (faster, bad memory management)
records = json.load(infile)
count = 0
refs = {}
for record in records:
for contribution in record.get('contribution', []):
ref = contribution['agent'].get('$ref')
if ref and not refs.get(ref):
count = 1
refs[ref] = 1
if enqueue:
msg = create_mef_record_online.delay(ref)
else:
pid, online = create_mef_record_online(ref)
msg = 'person pid: {pid} {online}'.format(
pid=pid,
online=online
)
if verbose:
click.echo("{count:<10}ref: {ref}\t{msg}".format(
count=count,
ref=ref,
msg=msg
))
if enqueue and wait:
wait_empty_tasks(delay=3, verbose=True)
click.echo('Count refs: {count}'.format(count=count))


@utils.command('check_license')
Expand Down
4 changes: 2 additions & 2 deletions rero_ils/modules/documents/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 202,7 @@ def index_persons(self, bulk=False):
person = None
ref = contribution['agent'].get('$ref')
if ref:
person = Person.get_record_by_ref(ref)
person, online = Person.get_record_by_ref(ref)
pid = contribution['agent'].get('pid')
if pid:
person = Person.get_record_by_pid(pid)
Expand Down Expand Up @@ -249,7 249,7 @@ def replace_refs(self):
for idx, contribution in enumerate(contributions):
ref = contribution['agent'].get('$ref')
if ref:
person = Person.get_record_by_ref(ref)
person, online = Person.get_record_by_ref(ref)
if person:
contributions[idx]['agent'] = person
return super(Document, self).replace_refs()
Expand Down
84 changes: 57 additions & 27 deletions rero_ils/modules/persons/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,29 68,35 @@ class Person(IlsRecord):
def get_record_by_ref(cls, ref):
"""Get a record from DB.
If the record dos not exist get it from MEF and creat it.
If the record dos not exist get it from MEF and create it.
"""
pers = None
def get_person(ref_type, ref_pid):
"""Get person."""
pers = None
if ref_type == 'mef':
pers = cls.get_record_by_pid(ref_pid)
else:
if ref_type == 'viaf':
result = PersonsSearch().filter(
'term', viaf_pid=ref_pid
).source('pid').scan()
else:
result = PersonsSearch().filter(
{'term': {'{type}.pid'.format(type=ref_type): ref_pid}}
).source('pid').scan()
try:
pid = next(result).pid
pers = cls.get_record_by_pid(pid)
except StopIteration:
pass
return pers

online = False
ref_split = ref.split('/')
ref_type = ref_split[-2]
ref_pid = ref_split[-1]
db.session.begin_nested()
if ref_type == 'mef':
pers = cls.get_record_by_pid(ref_pid)
else:
if ref_type == 'viaf':
result = PersonsSearch().filter(
'term', viaf_pid=ref_pid
).source('pid').scan()
else:
result = PersonsSearch().filter(
{'term': {'{type}.pid'.format(type=ref_type): ref_pid}}
).source('pid').scan()
try:
pid = next(result).pid
pers = cls.get_record_by_pid(pid)
except StopIteration:
pass
pers = get_person(ref_type, ref_pid)
if not pers:
# We dit not find the record in DB get it from MEF and create it.
try:
Expand All @@ -102,18 108,22 @@ def get_record_by_ref(cls, ref):
# we have to commit because create
# uses db.session.begin_nested
pers = cls.create(metadata, dbcommit=True)
online = True
except Exception as err:
db.session.rollback()
current_app.logger.error('Get MEF record: {type}:{pid}'.format(
type=ref_type,
pid=ref_pid
))
current_app.logger.error(err)
return None
if metadata:
pers = cls.get_record_by_pid(metadata.get('pid'))
if not pers:
current_app.logger.error(
'Get MEF record: {type}:{pid} >>{err}<<'.format(
type=ref_type,
pid=ref_pid,
err=err
)
)
return pers, online
db.session.commit()
if pers:
pers.reindex()
return pers
return pers, online

def dumps_for_document(self):
"""Transform the record into document contribution format."""
Expand Down Expand Up @@ -226,6 236,26 @@ def get_authorized_access_point(self, language):
language=language
)

def update_online(self, dbcommit=False, reindex=False):
"""Update record online.
:param reindex: reindex record by record
:param dbcommit: commit record to database
:return: updated record status and updated record
"""
updated = False
viaf_pid = self.get('viaf_pid')
if viaf_pid:
data = self._get_mef_data_by_type(viaf_pid, 'viaf')
if data:
metadata = data['metadata']
metadata['$schema'] = self['$schema']
if dict(self) != metadata:
updated = True
self.replace(data=metadata, dbcommit=dbcommit,
reindex=reindex)
return updated, self


class PersonsIndexer(IlsRecordsIndexer):
"""Person indexing class."""
Expand Down
42 changes: 37 additions & 5 deletions rero_ils/modules/persons/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 28,12 @@

@shared_task(ignore_result=True)
def create_mef_records(records, verbose=False):
"""Records creation and indexing."""
"""Records creation and indexing.
:param records: records to create
:param verbose: verbose output
:return: count of records
"""
# TODO: check update an existing record
for record in records:
rec = Person.create(
Expand All @@ -39,16 44,26 @@ def create_mef_records(records, verbose=False):
)
if verbose:
click.echo(
'record uuid: {id}'.format(id=rec.id)
'record pid: {pid}'.format(id=rec.pid)
)
return len(records)


@shared_task(ignore_result=True)
def delete_records(records, force=False, delindex=True, verbose=False):
"""Records deletion and indexing."""
def delete_records(records, verbose=False):
"""Records deletion and indexing.
:param records: records to delete
:param verbose: verbose output
:return: count of records
"""
for record in records:
status = Person.delete(record, force=force, delindex=delindex)
status = Person.delete(
record,
force=False,
dbcommit=True,
delindex=True
)
current_app.logger.info(
'record: {id} | DELETED {status}'.format(
id=record.id,
Expand All @@ -61,3 76,20 @@ def delete_records(records, force=False, delindex=True, verbose=False):
'records deleted: {count}'.format(count=len(records))
)
return len(records)


@shared_task(ignore_result=True)
def create_mef_record_online(ref):
"""Get a record from DB.
If the record dos not exist get it from MEF and create it.
:param ref: referer to person record on MEF
:param verbose: verbose output
:return: person pid
"""
person, online = Person.get_record_by_ref(ref)
pid = None
if person:
pid = person.get('pid')
return pid, online
Loading

0 comments on commit 948c602

Please sign in to comment.