Posted in

使用 Databricks 和 Pinecone 实现检索增强生成 (RAG) 聊天机器人_AI阅读总结 — 包阅AI

包阅导读总结

1. 关键词:`RAG`、`Chatbot`、`Databricks`、`Pinecone`、`Vector Database`

2. 总结:

本文介绍了利用 Databricks 和 Pinecone 实现 RAG 聊天机器人,阐述了 RAG 的优势,包括提高准确性、处理多种数据、灵活响应等。还详细说明了实现步骤,涵盖数据准备、存储、查询等环节。

3. 主要内容:

– 介绍 RAG 方法及优势

– 使聊天机器人更好理解和回答复杂问题

– 结合大语言模型与外部知识检索,提高准确性和质量

– 能处理不同类型数据,生成更动态灵活的响应,降低成本并加快开发

– 介绍 Databricks 和 Pinecone

– Databricks 处理和分析大数据集,Pinecone 擅长精准管理复杂数据搜索

– 二者集成可创建更准确高效的聊天机器人

– 实现步骤

– 准备数据

– 用 Databricks Autoloader 摄入云存储的原始文件

– 从 PDF/HTML 文件提取文本,创建向量嵌入并存入 Delta 表

– 将数据存储在 Pinecone 向量数据库

– 初始化客户端配置,创建索引

– 转换数据为 Pinecone 所需模式并写入

– 查询 Pinecone 向量数据库

– 可直接用 API 或通过 LangChain 框架简化查询工作

思维导图:

文章地址:https://www.databricks.com/blog/implementing-rag-chatbot-using-databricks-and-pinecone

文章来源:databricks.com

作者:Databricks

发布时间:2024/9/9 11:59

语言:英文

总字数:2246字

预计阅读时间:9分钟

评分:84分

标签:检索增强生成,聊天机器人,Databricks,Pinecone,向量数据库


以下为原文内容

本内容来源于用户推荐转载,旨在分享知识与观点,如有侵权请联系删除 联系邮箱 media@ilingban.com

Imagine giving your business an intelligent bot to talk to customers. Chatbots are commonly used to talk to customers and provide them with help or information. But, the usual chatbots sometimes struggle to answer complicated questions.

What is RAG?

Retrieval Augmented Generation (RAG) is a method that makes chatbots better at understanding and responding to tough questions. This Generative AI design pattern combines large language models (LLMs) with external knowledge retrieval. It allows real-time data to be integrated into your AI applications during the generation process (inference time). By providing the LLM with this contextual information, RAG significantly improves the accuracy and quality of the generated outputs.

Here are some of the benefits of using RAG:

  • Improved accuracy and quality of AI applications: By providing real-time data as context to the LLM, RAG can improve the accuracy and quality of AI applications. This is because the LLM has access to more information, which it can use to generate more informed and relevant responses.
  • Ability to handle different types of data: RAG can handle different types of data, including unstructured data like documents and emails and structured data like tables. This makes it a versatile tool that can be used in a variety of applications.
  • More dynamic and flexible responses to user queries: RAG can generate more dynamic and flexible responses to user queries, such as limiting responses based on user interests or data access controls. This makes RAG chatbots more engaging and helpful for users, with security controls.
  • Reduced up-front costs and faster development: RAG can be deployed quickly and easily without extensive development work or LLM fine-tuning.

Databricks and Pinecone

Pinecone’s vector database excels at managing complex data searches with pinpoint accuracy, while the Databricks Data Intelligence Platform streamlines the handling and analysis of vast datasets.

The integration with Pinecone is seamless, enabling Databricks to efficiently store and retrieve vector embeddings at scale. This integration simplifies the development of high-performance vector search applications that leverage Pinecone and Databricks.

Using Databricks and Pinecone together, you can create a more accurate and efficient chatbot than traditional chatbots.

Step-by-Step Implementation

In this blog, we walk you through building a chatbot that can answer any questions around Databricks, by leveraging Databricks documentation and whitepapers.

There are four key stages required in building a chatbot. The first stage is ingesting and data preparation. The next stage is storing the data in a vector database like Pinecone, for efficient information retrieval. The third stage is to set up a RAG retriever and chain that uses Pinecone for retrieval and an LLM like Llama 3.1 to generate responses. The final stage is registering the chatbot to Databricks Unity Catalog and deploying it via Databricks Mosaic AI Model Serving. Continue reading for a step-by-step walkthrough of this process.

Step 1: Prepare Data with Databricks

  1. Ingest raw files located on cloud storage using Databricks Autoloader.
    We use Databricks autoloader, which offers a hands-off approach that automatically processes new files as they land in cloud storage, ensuring efficiency and fault tolerance without the need for manual state management. Databricks Autoloader is designed to scale to billions of files, and is cost-effective, leveraging native cloud APIs for file discovery to keep costs in check. Moreover, Auto Loader is intelligent, with built-in schema inference and evolution capabilities that adapt to schema changes. Whether you’re dealing with high volumes of data or require near-real-time ingestion, Auto Loader helps simplify and accelerate the data ingest process. Streaming tables provide a much more simplified experience, especially for handling streaming or incremental data processing.
  2. Extract the text from the pdf / html files.
    First, we need to transform the byte content of PDF files into readable text and retrieve specific segments from the text. In this reference implementation, we leverage the PyPdf or UnstructuredIO libraries with a Spark UDF to simplify the text extraction process. We also use a text splitter to break the text into manageable chunks.
  3. Create vector embeddings and save them to a Delta table.
    For creating the vector embeddings, we use the BGE embedding model available via Databricks Mosaic AI Foundational Model API. A Python UDF computes the embeddings using the foundational model endpoints. The extracted data from the PDFs and embeddings are then stored in a Delta table.

Step 2: Store Data in a Pinecone vector database

  1. Initialize Pinecone client configs.

    When you upsert vector embeddings into Pinecone, you’ll first create an index. An index is a group of embeddings with the same number of dimensions and typically represents the underlying dataset for similar types of use cases. Log in to Pinecone to create a Pinecone API key.

    Databricks Secrets securely manage and store sensitive information such as passwords, API keys, and other credentials that you may need to use within your Databricks notebooks, jobs, and data pipelines. We use Databricks secrets to store sensitive information like the Pinecone API key and other required credentials.

    The below shows how you can retrieve sensitive information, such as the Pinecone API key, using Databricks secrets. Then, using your Pinecone API key and environment, initialize your client connection to Pinecone.

    # Initialize pineconeimport pineconefrom pinecone import Pinecone pinecone_api_key = dbutils.secrets.get("your_secrets_scope", "PINECONE_API_KEY")project_name = "Starter"    # your-pinecone-project-nameindex_name = "dbdemo-index" # your-pinecone-index-name # connect to pinecone indexpc = Pinecone(api_key=api_key)index = pc.Index(index_name)
  2. You then create a Pinecone index either using the Pinecone UI or the API.
    pc.create_index(name= index_name, # pinecone index namedimension=1536,metric="cosine",spec=ServerlessSpec(   cloud="aws",    region="us-east-1"))
  3. Transform data to the schema required by Pinecone.

    Pinecone lets you attach metadata key-value pairs to vectors in an index. This can be used to store the original document and its metadata and to specify additional filter expressions. Before we can write to Pinecone, we transform the data from the delta table by adding a metadata column that captures the content/snippet of the original document and additional metadata like document source and ID, in accordance with Pinecone’s schema requirements.

    from pyspark.sql.functions import col, lit, struct, to_jsonfrom pyspark.sql.functions import encode df = spark.table('databricks_pdf_documentation')\           .withColumn("metadata", to_json(struct(col("content"), col("url"), col("id"))))\           .withColumn("namespace", lit("dbdemo-namespace")) \           .withColumn("values", col("embedding")) \           .withColumn("sparse_values", lit(None)) \           .select("id", "values", "namespace", "metadata", "sparse_values")
  4. Write to the Pinecone index.

    Install the Pinecone spark connector as described in the documentation. We use the Pinecone spark connector to write the embeddings to the Pinecone index. Note that mode “append” allows us to augment the index with new data as well.

    #write to pinecone(   df.write   .option("pinecone.apiKey", api_key)   .option("pinecone.indexName", index_name)   .format("io.pinecone.spark.pinecone.Pinecone")   .mode("append")   .save())

Step 3: Query the Pinecone vector database

We then can query the Pinecone vector index, using the query API. This API takes the question embedding as input.

# UDF for embeddingfrom pyspark.sql.types import *def get_embedding_for_string(text):   response = deploy_client.predict(endpoint="databricks-bge-large-en", inputs={"input": text})   e = response.data   return e[0]['embedding']#register as udfget_embedding_for_string_udf = udf(get_embedding_for_string, ArrayType(FloatType()))# Querying the pinecone vector database question = "How can I track billing usage on my workspaces?" # create the query embeddingxq = get_embedding_for_string(question) # query pinecone the top 5 most similar resultsquery_response = index.query(   namespace='dbdemo-namespace',   top_k=5,   include_values=True,   include_metadata=True,   vector=xq) #print(query_response) query_response_docs = []for match in query_response['matches']:   query_response_docs.append([match['metadata']['url'],match['metadata']['content'],match['score']]) print(query_response_docs)

Querying Pinecone directly via the API allows you to integrate Pinecone and Databricks into arbitrary code.

In the next section, we show how to simplify this workflow using the popular LangChain framework.

Step 4: Query a Pinecone vector database using LangChain

Langchain is a framework that simplifies building applications powered by LLMs (large language models). Its Databricks Embeddings help simplify interacting with embedding models, and its integration with Pinecone provides a simplified query interface.

Langchain wrappers make it easy, by handling all the underlying logic and API calls for you. The LangChain code below abstracts away the need to explicitly convert the query text to a vector.

import pineconefrom langchain_community.embeddings import DatabricksEmbeddingsfrom langchain.chains import RetrievalQAfrom pinecone import Pineconefrom langchain_pinecone import PineconeVectorStore import os  #Creating the input question embeddings (with Databricks `bge-large-en`)embedding_model = DatabricksEmbeddings(endpoint="databricks-bge-large-en")# connect to pinecone indexpc = Pinecone(api_key=pinecone_api_key)index_pc = pc.Index(pinecone_index_name) vectorstore = PineconeVectorStore(     index=index_pc,     namespace=pinecone_namespace,     embedding=embedding_model,     text_key="content" ) #Calling the Pinecone vector database to find similar documentsquery = "What is Apache Spark?"docs = vectorstore.similarity_search(     query,  # our search query     k=3  # return 3 most relevant docs )pprint(docs[0])

Step 5: Create a retriever for Pinecone and LangChain

Above, we showed how to do a similarity search on our Pinecone vector index. To create a RAG chatbot, we will use the LangChain Retriever interface to wrap the index.

We first initiate Pinecone to set the API key and environment. Then, we create a VectorStore instance from the existing Pinecone index we created earlier, with the correct namespace and keys.

from langchain_community.embeddings import DatabricksEmbeddingsfrom langchain.chains import RetrievalQAfrom pinecone import Pineconefrom langchain_pinecone import PineconeVectorStore embedding_model = DatabricksEmbeddings(endpoint="databricks-bge-large-en") def get_retriever(persist_dir: str = None):   # initialize pinecone and connect to pinecone index   pc = Pinecone(api_key=pinecone_api_key)   index_pc = pc.Index(pinecone_index_name)    vectorstore = PineconeVectorStore(       index=index_pc,       namespace=pinecone_namespace,       embedding=embedding_model,       text_key="content"   )    return vectorstore.as_retriever() retriever = get_retriever()

Step 6: Assemble the chatbot chain

Now, we can put the retriever into a chain defining our chatbot!

retrieve_document_chain = (   itemgetter("messages")   | RunnableLambda(extract_question)   | retriever) #test the retriever chainprint(retrieve_document_chain.invoke({"messages": [{"role": "user", "content": "What is Apache Spark?"}]}))

Let’s see if our chatbot can correctly extract the question from the chat messages and retrieve relevant context from Pinecone.

Step 7: Deploy the chatbot as a model

As we iterate on our chatbot, we will want to track model objects, model versions, and metadata, as well as manage access controls. For that, we will use MLflow’s Model Registry, integrated with Unity Catalog.

You can register the chatbot chain as a model using mlflow.langchain.log_model, with the Unity Catalog. The signature of the model can be inferred using infer_signature in mlflow. Remember to put pinecone-client into the dependencies. Set "mlflow.models.set_model(model=full_chain)" in the notebook where you defined the chain. In a new driver notebook, register the chatbot and deploy chatbot to Model Serving.

from mlflow.models import infer_signatureimport mlflowimport langchainimport pandas as pd mlflow.set_registry_uri("databricks-uc")model_name = f"{catalog_name}.{schema_name}.rag_with_pinecone_model" #catalog_name, schema_name and model name # Specify the full path to the chain notebookchain_notebook_file = "2.1 - Advanced-Chatbot-Chain - Using Pinecone" # the name of the notebook that has the chain definitionchain_notebook_path = os.path.join(os.getcwd(), chain_notebook_file) with mlflow.start_run():   signature = infer_signature(input_example, output_example)   logged_chain_info = mlflow.langchain.log_model(       lc_model=chain_notebook_path,       artifact_path="chain",       registered_model_name=model_name,       input_example=input_example,       signature=signature,       example_no_conversion=True, # required to allow the schema to work       extra_pip_requirements=[         "mlflow==" + mlflow.__version__,         "langchain==" + langchain.__version__,         "pinecone-client==3.2.2",         "langchain-pinecone==0.1.1",         "langchain-community",       ]   )

The model is registered with Databricks Unity Catalog, which centralizes access control, auditing, lineage, and discovery for all data and AI assets.

Step 8: Deploy the chatbot to Databricks Model Serving

Now let’s deploy the chatbot chain mode as a Model Serving endpoint. Below, we put PINECONE_API_KEY and DATABRICKS_TOKEN into the environment variables as the serving endpoint will use them to talk to Pinecone and Databricks Foundation Models. This allows us to grant access to the served model, without revealing these secrets in code or to users.

# Create or update serving endpointfrom databricks.sdk import WorkspaceClientfrom databricks.sdk.service.serving import EndpointCoreConfigInput, ServedModelInput, ServedModelInputWorkloadSizeimport requests # Check for latest model versiondef get_latest_model_version(model_name):   from mlflow import MlflowClient   mlflow_client = MlflowClient()   latest_version = 1   for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):       version_int = int(mv.version)       if version_int > latest_version:           latest_version = version_int   return latest_version # Now Create or update serving endpoint serving_endpoint_name = "pinecone_rag_chain"latest_model_version = get_latest_model_version(model_name)databricks_api_token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()  w = WorkspaceClient()endpoint_config = EndpointCoreConfigInput(   name=serving_endpoint_name,   served_models=[       ServedModelInput(           model_name=model_name,           model_version=latest_model_version,           workload_size=ServedModelInputWorkloadSize.SMALL,           scale_to_zero_enabled=True,           environment_vars={               "PINECONE_API_KEY": "{{secrets/prasad_kona/PINECONE_API_KEY}}",               "DATABRICKS_TOKEN": "{{secrets/dbdemos/rag_sp_token}}",           }       )   ]) existing_endpoint = next(   (e for e in w.serving_endpoints.list() if e.name == serving_endpoint_name), None)serving_endpoint_url = f"{host}/ml/endpoints/{serving_endpoint_name}"if existing_endpoint == None:   print(f"Creating the endpoint {serving_endpoint_url}, this will take a few minutes to package and deploy the endpoint...")   w.serving_endpoints.create_and_wait(name=serving_endpoint_name, config=endpoint_config)else:   print(f"Updating the endpoint {serving_endpoint_url} to version {latest_model_version}, this will take a few minutes to package and deploy the endpoint...")   w.serving_endpoints.update_config_and_wait(served_models=endpoint_config.served_models, name=serving_endpoint_name) displayHTML(f'Your Model Endpoint Serving is now available. Open the <a href="/ml/endpoints/{serving_endpoint_name}">Model Serving Endpoint page</a> for more details.')

The Model Serving UI provides real-time information on the health of the model being served.

Step 9: Test your chatbot

After deploying the chatbot, you can test it with a REST API or Databricks SDK.

from databricks.sdk.service.serving import DataframeSplitInput test_dialog = DataframeSplitInput(   columns=["messages"],   data=[                {               "messages": [                   {"role": "user", "content": "What is Apache Spark?"},                   {                       "role": "assistant",                       "content": "Apache Spark is an open-source data processing engine that is widely used in big data analytics.",                   },                   {"role": "user", "content": "Does it support streaming?"},               ]           }        ],)answer = w.serving_endpoints.query(serving_endpoint_name, dataframe_split=test_dialog)print(answer.predictions[0])

You can also test it using the Query UI available as part of model serving.

Next steps

Improve your customer service with Databricks and Pinecone by deploying cutting-edge RAG chatbots. Unlike traditional bots, these advanced chatbots leverage the Databricks Data Intelligence Platform and Pinecone’s vector database to deliver precise, timely responses. They rapidly sift through vast data to find the exact information needed, providing customers with accurate answers in seconds. This not only elevates the customer experience but also sets a new benchmark for digital engagement.

For business leaders, embracing this technology is more than just an upgrade—it’s a strategic move to lead in customer service innovation. By adopting data-driven, intelligent solutions, you can place your business at the forefront of customer engagement, showcasing a commitment to excellence that resonates with your audience.

Check out our free training to learn more about Generative AI with Databricks, and read additional Pinecone and Databricks documentation here. Access sample notebooks from this blog here.