-
-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Citations event via __event_emitter__ #3615
Conversation
Would be good to validate the schema for I left the API the same as the one used on the frontend, feel free to change it if you've got any opinions |
Thanks! |
Groovy 😎 Screen.Recording.2024-07-07.at.12.39.39.AM.mp4 |
Is it working now?!? Can someone share a full code snipped please? Will this also work inside pipelines? I imagine I have to wait for the next release... this is an incredible feature! Also wondering if we can have pipelines stream multiple replies. My use case is LangGraph. I want to show the execution as it computes - I may also have multiple LLMs streaming their replies which will build off of each other. |
Screen_Recording_2024-07-08_at_1.53.27_PM.mp4"""
title: Web Search using SearXNG and Scrape first N Pages
author: constLiakos with enhancements by justinh-rahb
funding_url: https://github.com/open-webui
version: 0.1.3
license: MIT
"""
import os
import requests
from datetime import datetime
import json
from requests import get
from bs4 import BeautifulSoup
import concurrent.futures
from html.parser import HTMLParser
from urllib.parse import urlparse
import re
import unicodedata
from pydantic import BaseModel, Field
import asyncio
from typing import Callable, Any
class Tools:
class Valves(BaseModel):
SEARXNG_ENGINE_API_BASE_URL: str = Field(
default="https://example.com/search",
description="The base URL for Search Engine",
)
SEARXNG_ENGINE_RESULT_NO: int = Field(
default=3,
description="The number of Search Engine Results",
)
CITATION_LINKS: bool = Field(
default=False,
description="If True, send custom citations with links",
)
def __init__(self):
self.valves = self.Valves()
async def search_web(
self,
query: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Search the web and get the content of the relevant pages. Search for unknown knowledge, news, info, public contact info, weather, etc.
:params query: Web Query used in search engine.
:return: The content of the pages in json format.
"""
def get_base_url(url):
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
return base_url
def process_result(result):
title_site = result["title"]
url_site = result["url"]
snippet = result.get("snippet", "") # Get snippet if available
try:
response_site = requests.get(url_site, timeout=120)
response_site.raise_for_status()
html_content = response_site.text
soup = BeautifulSoup(html_content, "html.parser")
content_site = soup.get_text(separator=" ", strip=True)
content_site = unicodedata.normalize("NFKC", content_site)
content_site = re.sub(r"\s ", " ", content_site)
content_site = content_site.strip()
links = []
if self.valves.CITATION_LINKS:
for a in soup.find_all("a", href=True):
links.append(
{
"title": a.text.strip(),
"link": get_base_url(url_site) a["href"],
}
)
return {
"title": title_site,
"url": url_site,
"content": content_site,
"snippet": snippet,
"links": links,
}
except requests.exceptions.RequestException as e:
return {
"title": title_site,
"url": url_site,
"content": f"Failed to retrieve the page. Error: {str(e)}",
"snippet": snippet,
}
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "in_progress",
"description": f"Initiating web search for: {query}",
"done": False,
},
}
)
number_of_results = self.valves.SEARXNG_ENGINE_RESULT_NO
search_engine_url = self.valves.SEARXNG_ENGINE_API_BASE_URL
params = {"q": query, "format": "json", "number_of_results": number_of_results}
try:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "in_progress",
"description": "Sending request to search engine",
"done": False,
},
}
)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
resp = requests.get(
search_engine_url, params=params, headers=headers, timeout=120
)
resp.raise_for_status()
data = resp.json()
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "in_progress",
"description": f"Retrieved {len(data.get('results', []))} search results",
"done": False,
},
}
)
except requests.exceptions.RequestException as e:
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "error",
"description": f"Error during search: {str(e)}",
"done": True,
},
}
)
return json.dumps({"error": str(e)})
results_json = []
if "results" in data:
results = data["results"][:number_of_results]
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "in_progress",
"description": f"Processing {len(results)} search results",
"done": False,
},
}
)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(process_result, result) for result in results
]
results_json = [
future.result()
for future in concurrent.futures.as_completed(futures)
]
# Add custom citations only if CITATION_LINKS is True
if self.valves.CITATION_LINKS and __event_emitter__:
for result in results_json:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [result["snippet"] or result["title"]],
"metadata": [{"source": result["url"]}],
"source": {"name": result["title"]},
},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete",
"description": f"Web search completed. Retrieved content from {len(results_json)} pages",
"done": True,
},
}
)
return json.dumps(results_json, ensure_ascii=False)
async def get_website(
self,
url: str,
__event_emitter__: Callable[[dict], Any] = None,
) -> str:
"""
Get the content of the URL provided.
:params url: The URL of the website
:return: The content of the page from the URL in json format.
"""
def get_base_url(url):
parsed_url = urlparse(url)
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
return base_url
def generate_excerpt(content, max_length=200):
return (
content[:max_length] "..." if len(content) > max_length else content
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "in_progress",
"description": f"Fetching content from URL: {url}",
"done": False,
},
}
)
results_json = []
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
response_site = requests.get(url, headers=headers, timeout=120)
response_site.raise_for_status()
html_content = response_site.text
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "in_progress",
"description": "Parsing website content",
"done": False,
},
}
)
soup = BeautifulSoup(html_content, "html.parser")
page_title = soup.title.string if soup.title else "No title found"
page_title = unicodedata.normalize("NFKC", page_title.strip())
title_site = page_title
url_site = url
content_site = soup.get_text(separator=" ", strip=True)
content_site = unicodedata.normalize("NFKC", content_site)
content_site = re.sub(r"\s ", " ", content_site)
content_site = content_site.strip()
links = []
if self.valves.CITATION_LINKS:
for a in soup.find_all("a", href=True):
links.append(
{
"title": a.text.strip(),
"link": get_base_url(url_site) a["href"],
}
)
result_site = {
"title": title_site,
"url": url_site,
"content": content_site,
"excerpt": generate_excerpt(content_site),
"links": links,
}
results_json.append(result_site)
if self.valves.CITATION_LINKS and __event_emitter__:
await __event_emitter__(
{
"type": "citation",
"data": {
"document": [result_site["excerpt"]],
"metadata": {"source": url_site},
"source": {"name": title_site},
},
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "complete",
"description": "Website content retrieved and processed successfully",
"done": True,
},
}
)
except requests.exceptions.RequestException as e:
results_json.append(
{
"url": url,
"content": f"Failed to retrieve the page. Error: {str(e)}",
}
)
if __event_emitter__:
await __event_emitter__(
{
"type": "status",
"data": {
"status": "error",
"description": f"Error fetching website content: {str(e)}",
"done": True,
},
}
)
return json.dumps(results_json, ensure_ascii=False) |
THANKS. Bless you sir. Does this also work for pipelines? That's honestly where I'd be using it. I need to emit the state of my graph computation. My pipeline will call a LangServe endpoint and being able to show the progress in the UI would be essential. |
Unfortunately not. As of now it works with tools and filter functions (but not regular functions/manifolds). I was planning to make it work for regular functions but |
Motivation: I would like to create custom citations for my search tool. Example usage:
This currently doesn't work: I'm guessing
messages
is not rendered directly?Pull Request Checklist
Note to first-time contributors: Please open a discussion post in Discussions and describe your changes before submitting a pull request.
Before submitting, make sure you've checked the following:
dev
branch.Changelog Entry
Description
__event_emitter__
Added
__event_emitter__
Breaking Changes
__event_emitter__
now requirestype
key to specify which kind of event we'd like to emit, withdata
containing the payload.