Skip to content

Commit

Permalink
Merge pull request jina-ai#488 from jina-ai/add-multi-field-search-469
Browse files Browse the repository at this point in the history
feat: add the support for multi-field search
  • Loading branch information
hanxiao authored Jun 4, 2020
2 parents c9167cb aee8ee9 commit f4de9ad
Show file tree
Hide file tree
Showing 23 changed files with 591 additions and 185 deletions.
6 changes: 3 additions & 3 deletions docs/chapters/jep/jep-3/main.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 29,8 @@ The Multi-field search is commonly used in practice. Concretely,
.. highlights::
as a user, I want to limit the query within some selected fields.

In the following use case, there are two documents and three two fields in each of them, i.e. ``title`` and ``summary``.
The user wants to query ``painter`` but **only** from the ``title`` field. The expected result will be ``{'doc_id': 11, 'title': 'hackers and painters'}``.
In the following use case, there are two documents and three two fields in each of them, i.e. `title` and `summary`.
The user wants to query ``painter`` but **only** from the `title` field. The expected result will be `{'doc_id': 11, 'title': 'hackers and painters'}`.

.. highlight:: json
.. code-block:: json
Expand All @@ -57,7 57,7 @@ Modify ``jina.proto``

Let's take the following ``Flow`` as an example.
The ``FieldsMapper`` is a ``Crafter`` that split each ``Document`` into fields and add the ``field_name`` information for ``Chunks``.
Afterwards, the ``Chunks`` containing the ``title`` and the ``summary`` information are processed differently in two pathways and stored seperately.
Afterwards, the ``Chunks`` containing the `title` and the `summary` information are processed differently in two pathways and stored seperately.

.. image:: JEP3-index-design.png
:align: center
Expand Down
17 changes: 14 additions & 3 deletions docs/chapters/proto/modify.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 21,12 @@ Take MacOS as an example,
cp -r ~/Downloads/protoc-3.7.1-osx-x86_64/include/* /usr/local/include/
#. Install gRPC tools dependencies: :command:`brew install automake autoconf libtool`
#. Install gRPC tools dependencies:

.. highlight:: bash
.. code-block:: bash
brew install automake autoconf libtool
#. Install gRPC and ``grpc_python_plugin`` from the source:

Expand All @@ -33,6 38,12 @@ Take MacOS as an example,
make grpc_python_plugin
#. This will compile the grpc-python-plugin and build it to, e.g., :file:`/Documents/grpc/bins/opt/grpc_python_plugin`
#. This will compile the grpc-python-plugin and build it to, e.g., :file:`~/Documents/grpc/bins/opt/grpc_python_plugin`

#. Generate the python interfaces.

.. highlight:: bash
.. code-block:: bash
#. Generate the python interfaces via :command:` jina/proto/build-proto.sh`
cd jina/proto
bash build-proto.sh ~/Documents/grpc/bins/opt/grpc_python_plugin
2 changes: 1 addition & 1 deletion jina/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 7,7 @@

# do not change this line manually
# this is managed by proto/build-proto.sh and updated on every execution
__proto_version__ = '0.0.31'
__proto_version__ = '0.0.32'

import platform
import sys
Expand Down
10 changes: 10 additions & 0 deletions jina/clients/python/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 21,7 @@ def _generate(data: Union[Iterator[bytes], Iterator['jina_pb2.Document'], Iterat
first_doc_id: int = 0, first_request_id: int = 0,
random_doc_id: bool = False, mode: ClientMode = ClientMode.INDEX, top_k: int = 50,
mime_type: str = None,
filter_by: Union[Iterator[str], str] = None,
*args, **kwargs) -> Iterator['jina_pb2.Message']:
buffer_sniff = False

Expand Down Expand Up @@ -48,6 49,15 @@ def _generate(data: Union[Iterator[bytes], Iterator['jina_pb2.Document'], Iterat
else:
req.search.top_k = top_k

if filter_by:
_filter_by = []
if isinstance(filter_by, str):
_filter_by.append(filter_by)
else:
for _field in filter_by:
_filter_by.append(_field)
req.search.filter_by.extend(_filter_by)

for _raw in pi:
d = getattr(req, str(mode).lower()).docs.add()
if isinstance(_raw, jina_pb2.Document):
Expand Down
7 changes: 6 additions & 1 deletion jina/drivers/encode.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 3,7 @@

from . import BaseExecutableDriver
from .helper import extract_chunks, array2pb
from typing import Union, List, Tuple


class BaseEncodeDriver(BaseExecutableDriver):
Expand All @@ -15,9 16,13 @@ def __init__(self, executor: str = None, method: str = 'encode', *args, **kwargs
class EncodeDriver(BaseEncodeDriver):
"""Extract the chunk-level content from documents and call executor and do encoding
"""
def __init__(self, filter_by: Union[List[str], Tuple[str]] = [], *args, **kwargs):
super().__init__(*args, **kwargs)
self.filter_by = filter_by

def __call__(self, *args, **kwargs):
contents, chunk_pts, no_chunk_docs, bad_chunk_ids = extract_chunks(self.req.docs, embedding=False)
contents, chunk_pts, no_chunk_docs, bad_chunk_ids = \
extract_chunks(self.req.docs, self.filter_by, embedding=False)

if no_chunk_docs:
self.logger.warning('these docs contain no chunk: %s' % no_chunk_docs)
Expand Down
10 changes: 7 additions & 3 deletions jina/drivers/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 5,7 @@
import os
import urllib.parse
import urllib.request
from typing import Dict, Any, Iterable, Tuple
from typing import Dict, Any, Iterable, Tuple, List, Union

import numpy as np

Expand Down Expand Up @@ -70,10 70,12 @@ def array2pb(x: 'np.ndarray', quantize: str = None) -> 'jina_pb2.NdArray':
return blob


def extract_chunks(docs: Iterable['jina_pb2.Document'], embedding: bool) -> Tuple:
def extract_chunks(
docs: Iterable['jina_pb2.Document'], filter_by: Union[Tuple[str], List[str]], embedding: bool) -> Tuple:
"""Iterate over a list of protobuf documents and extract chunk-level information from them
:param docs: an iterable of protobuf documents
:param filter_by: a list of service names to wait
:param embedding: an indicator of extracting embedding or not.
If ``True`` then all chunk-level embedding are extracted.
If ``False`` then ``text``, ``buffer``, ``blob`` info of each chunks are extracted
Expand All @@ -90,7 92,7 @@ def extract_chunks(docs: Iterable['jina_pb2.Document'], embedding: bool) -> Tupl
bad_chunk_ids = []

if embedding:
_extract_fn = lambda c: c.embedding.buffer and pb2array(c.embedding)
_extract_fn = lambda c: c.embedding.buffer and pb2array(c.embedding) if c.embedding.buffer else None
else:
_extract_fn = lambda c: c.text or c.buffer or (c.blob and pb2array(c.blob))

Expand All @@ -101,6 103,8 @@ def extract_chunks(docs: Iterable['jina_pb2.Document'], embedding: bool) -> Tupl

for c in d.chunks:
_c = _extract_fn(c)
if len(filter_by) > 0 and c.field_name not in filter_by:
continue
if _c is not None:
_contents.append(_c)
chunk_pts.append(c)
Expand Down
30 changes: 24 additions & 6 deletions jina/drivers/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 5,7 @@

from . import BaseExecutableDriver
from .helper import extract_chunks
from typing import Union, List, Tuple


class BaseIndexDriver(BaseExecutableDriver):
Expand All @@ -18,9 19,13 @@ class VectorIndexDriver(BaseIndexDriver):
"""Extract chunk-level embeddings and add it to the executor
"""
def __init__(self, filter_by: Union[List[str], Tuple[str]] = [], *args, **kwargs):
super().__init__(*args, **kwargs)
self.filter_by = filter_by

def __call__(self, *args, **kwargs):
embed_vecs, chunk_pts, no_chunk_docs, bad_chunk_ids = extract_chunks(self.req.docs, embedding=True)
embed_vecs, chunk_pts, no_chunk_docs, bad_chunk_ids = \
extract_chunks(self.req.docs, self.filter_by, embedding=True)

if no_chunk_docs:
self.pea.logger.warning('these docs contain no chunk: %s' % no_chunk_docs)
Expand All @@ -46,7 51,7 @@ class KVIndexDriver(BaseIndexDriver):
- C is the number of chunks per query/doc
"""

def __init__(self, level: str, *args, **kwargs):
def __init__(self, level: str, filter_by: Union[List[str], Tuple[str]] = [], *args, **kwargs):
"""
:param level: index level "chunk" or "doc", or "all"
Expand All @@ -55,15 60,18 @@ def __init__(self, level: str, *args, **kwargs):
"""
super().__init__(*args, **kwargs)
self.level = level
self.filter_by = filter_by

def __call__(self, *args, **kwargs):
from google.protobuf.json_format import MessageToJson
if self.level == 'doc':
content = {f'd{d.doc_id}': MessageToJson(d) for d in self.req.docs}
elif self.level == 'chunk':
content = {f'c{c.chunk_id}': MessageToJson(c) for d in self.req.docs for c in d.chunks}
content = {f'c{c.chunk_id}': MessageToJson(c) for d in self.req.docs for c in d.chunks
if self.filter_by and c.field_name in self.filter_by}
elif self.level == 'all':
content = {f'c{c.chunk_id}': MessageToJson(c) for d in self.req.docs for c in d.chunks}
content = {f'c{c.chunk_id}': MessageToJson(c) for d in self.req.docs for c in d.chunks
if self.filter_by and c.field_name in self.filter_by}
content.update({f'd{d.doc_id}': MessageToJson(d) for d in self.req.docs})
else:
raise TypeError(f'level={self.level} is not supported, must choose from "chunk" or "doc" ')
Expand All @@ -79,6 87,16 @@ def __init__(self, level: str = 'doc', *args, **kwargs):


class ChunkKVIndexDriver(KVIndexDriver):

def __init__(self, level: str = 'chunk', *args, **kwargs):
def __init__(self,
level: str = 'chunk', filter_by: Union[str, List[str], Tuple[str]] = None, *args, **kwargs):
super().__init__(level, *args, **kwargs)
self.filter_by = filter_by if self.filter_by else []

def __call__(self, *args, **kwargs):
from google.protobuf.json_format import MessageToJson
content = {
f'c{c.chunk_id}': MessageToJson(c)
for d in self.req.docs for c in d.chunks
if len(self.filter_by) > 0 and c.field_name in self.filter_by}
if content:
self.exec_fn(content)
53 changes: 28 additions & 25 deletions jina/drivers/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 9,8 @@ class MergeDriver(BaseDriver):

def __call__(self, *args, **kwargs):
# take unique routes by service identity
if self.prev_reqs is None:
return
routes = {(r.pod r.pod_id): r for m in self.prev_msgs for r in m.envelope.routes}
self.msg.envelope.ClearField('routes')
self.msg.envelope.routes.extend(
Expand All @@ -18,7 20,7 @@ def __call__(self, *args, **kwargs):
class MergeTopKDriver(MergeDriver):
"""Merge the topk results from multiple messages into one and sorted
Useful in indexer sharding (i.e. ``--replicas > 1``)
Useful in indexer sharding (i.e. ``--replicas > 1``) and having multiple indexers in parallel
Complexity depends on the level:
- ``level=chunk``: D x C x K x R
Expand All @@ -40,30 42,31 @@ def __init__(self, level: str, *args, **kwargs):
self.level = level

def __call__(self, *args, **kwargs):
if self.level == 'chunk':
for _d_id, _doc in enumerate(self.req.docs):
for _c_id, _chunk in enumerate(_doc.chunks):
_flat_topk = [k for r in self.prev_reqs for k in r.docs[_d_id].chunks[_c_id].topk_results]
_chunk.ClearField('topk_results')
_chunk.topk_results.extend(sorted(_flat_topk, key=lambda x: x.score.value))
elif self.level == 'doc':
for _d_id, _doc in enumerate(self.req.docs):
_flat_topk = [k for r in self.prev_reqs for k in r.docs[_d_id].topk_results]
_doc.ClearField('topk_results')
_doc.topk_results.extend(sorted(_flat_topk, key=lambda x: x.score.value))
elif self.level == 'all':
for _d_id, _doc in enumerate(self.req.docs):
_flat_topk = [k for r in self.prev_reqs for k in r.docs[_d_id].topk_results]
_doc.ClearField('topk_results')
_doc.topk_results.extend(sorted(_flat_topk, key=lambda x: x.score.value))

for _c_id, _chunk in enumerate(_doc.chunks):
_flat_topk = [k for r in self.prev_reqs for k in r.docs[_d_id].chunks[_c_id].topk_results]
_chunk.ClearField('topk_results')
_chunk.topk_results.extend(sorted(_flat_topk, key=lambda x: x.score.value))

else:
raise TypeError(f'level={self.level} is not supported, must choose from "chunk" or "doc" ')
if self.prev_reqs is not None:
if self.level == 'chunk':
for _d_id, _doc in enumerate(self.req.docs):
for _c_id, _chunk in enumerate(_doc.chunks):
_flat_topk = [k for r in self.prev_reqs for k in r.docs[_d_id].chunks[_c_id].topk_results]
_chunk.ClearField('topk_results')
_chunk.topk_results.extend(sorted(_flat_topk, key=lambda x: x.score.value))
elif self.level == 'doc':
for _d_id, _doc in enumerate(self.req.docs):
_flat_topk = [k for r in self.prev_reqs for k in r.docs[_d_id].topk_results]
_doc.ClearField('topk_results')
_doc.topk_results.extend(sorted(_flat_topk, key=lambda x: x.score.value))
elif self.level == 'all':
for _d_id, _doc in enumerate(self.req.docs):
_flat_topk = [k for r in self.prev_reqs for k in r.docs[_d_id].topk_results]
_doc.ClearField('topk_results')
_doc.topk_results.extend(sorted(_flat_topk, key=lambda x: x.score.value))

for _c_id, _chunk in enumerate(_doc.chunks):
_flat_topk = [k for r in self.prev_reqs for k in r.docs[_d_id].chunks[_c_id].topk_results]
_chunk.ClearField('topk_results')
_chunk.topk_results.extend(sorted(_flat_topk, key=lambda x: x.score.value))

else:
raise TypeError(f'level={self.level} is not supported, must choose from "chunk" or "doc" ')

super().__call__(*args, **kwargs)

Expand Down
2 changes: 2 additions & 0 deletions jina/drivers/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 35,8 @@ def __call__(self, *args, **kwargs):

# np.uint32 uses 32 bits. np.float32 uses 23 bit mantissa, so integer greater than 2^23 will have their
# least significant bits truncated.
if not match_idx:
continue
match_idx = np.array(match_idx, dtype=np.float64)

doc_idx = self.exec_fn(match_idx, query_chunk_meta, match_chunk_meta)
Expand Down
33 changes: 21 additions & 12 deletions jina/drivers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 47,12 @@ def __call__(self, *args, **kwargs):
elif self.level == 'chunk':
for d in self.req.docs:
for c in d.chunks:
self._update_topk_chunks(c)
self._update_topk_chunks(c, self.req.filter_by)
elif self.level == 'all':
for d in self.req.docs:
self._update_topk_docs(d)
for c in d.chunks:
self._update_topk_chunks(c)
self._update_topk_chunks(c, self.req.filter_by)
else:
raise TypeError(f'level={self.level} is not supported, must choose from "chunk" or "doc" ')

Expand All @@ -68,14 68,16 @@ def _update_topk_docs(self, d):
d.ClearField('topk_results')
d.topk_results.extend(hit_sr)

def _update_topk_chunks(self, c):
def _update_topk_chunks(self, c, filter_by):
hit_sr = [] #: hited scored results, not some search may not ends with result. especially in shards
for tk in c.topk_results:
r = self.exec_fn(f'c{tk.match_chunk.chunk_id}')
if r:
sr = ScoredResult()
sr.score.CopyFrom(tk.score)
sr.match_chunk.CopyFrom(r)
if filter_by and sr.match_chunk.field_name not in filter_by:
continue
hit_sr.append(sr)
c.ClearField('topk_results')
c.topk_results.extend(hit_sr)
Expand All @@ -101,19 103,26 @@ class VectorSearchDriver(BaseSearchDriver):
"""

def __call__(self, *args, **kwargs):
embed_vecs, chunk_pts, no_chunk_docs, bad_chunk_ids = extract_chunks(self.req.docs, embedding=True)
embed_vecs, chunk_pts, no_chunk_docs, bad_chunk_ids = \
extract_chunks(self.req.docs, filter_by=[], embedding=True)

if no_chunk_docs:
self.logger.warning('these docs contain no chunk: %s' % no_chunk_docs)

if bad_chunk_ids:
self.logger.warning('these bad chunks can not be added: %s' % bad_chunk_ids)

idx, dist = self.exec_fn(embed_vecs, top_k=self.req.top_k)
op_name = self.exec.__class__.__name__
for c, topks, scs in zip(chunk_pts, idx, dist):
for m, s in zip(topks, scs):
r = c.topk_results.add()
r.match_chunk.chunk_id = m
r.score.value = s
r.score.op_name = op_name
if chunk_pts:
try:
idx, dist = self.exec_fn(embed_vecs, top_k=self.req.top_k)
op_name = self.exec.__class__.__name__
for c, topks, scs in zip(chunk_pts, idx, dist):
for m, s in zip(topks, scs):
r = c.topk_results.add()
r.match_chunk.chunk_id = m
r.score.value = s
r.score.op_name = op_name
except ValueError:
pass
else:
self.logger.warning('no chunks to query, (filter_by: {})'.format(self.req.filter_by))
4 changes: 4 additions & 0 deletions jina/executors/indexers/vector/numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 71,8 @@ def get_query_handler(self) -> Optional['np.ndarray']:
f'the size of the keys and vectors are inconsistent ({self.int2ext_key.shape[0]} != {vecs.shape[0]}), '
f'did you write to this index twice?')
return None
if vecs.shape[0] == 0:
self.logger.warning(f'an empty index is loaded, size: {vecs.shape[0]}')
return vecs
else:
return None
Expand Down Expand Up @@ -121,6 123,8 @@ def query(self, keys: np.ndarray, top_k: int, *args, **kwargs) -> Tuple['np.ndar
Distance (the smaller the better) is returned, not the score.
"""
if self.query_handler is None:
raise ValueError('query handler is empty')
if self.metric not in {'cosine', 'euclidean'} or self.backend == 'scipy':
try:
from scipy.spatial.distance import cdist
Expand Down
1 change: 1 addition & 0 deletions jina/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 506,7 @@ def valid_yaml_path(path: str, to_stream: bool = False):
def get_parsed_args(kwargs, parser, parser_name: str = None):
args = kwargs2list(kwargs)
try:
# TODO: fix this to parse list of strings
p_args, unknown_args = parser.parse_known_args(args)
if unknown_args:
from .logging import default_logger
Expand Down
Loading

0 comments on commit f4de9ad

Please sign in to comment.