Integrations
LangServe
Trace LangServe REST endpoints in Muster via the LangChain CallbackHandler.
LangServe deploys LangChain runnables and chains as a REST API on top of FastAPI. The Muster integration is the standard LangChain CallbackHandler attached to the runnable's config.
Setup
!pip install fastapi sse_starlette httpx langserve langfuse langchain-openai langchainimport os
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_HOST"] = "https://app.getmuster.io"
os.environ["OPENAI_API_KEY"] = "sk-..."Examples
Simple LLM call
from langchain_openai import ChatOpenAI
from langchain_core.runnables.config import RunnableConfig
from langfuse.callback import CallbackHandler
from fastapi import FastAPI
from langserve import add_routes
langfuse_handler = CallbackHandler()
langfuse_handler.auth_check()
llm = ChatOpenAI()
config = RunnableConfig(callbacks=[langfuse_handler])
llm_with_langfuse = llm.with_config(config)
app = FastAPI()
add_routes(app, llm_with_langfuse, path="/test-simple-llm-call")
LCEL chain
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
prompt = ChatPromptTemplate.from_template("Tell me a joke about {topic}")
chain = prompt | llm | StrOutputParser()
add_routes(app, chain.with_config(config), path="/test-chain")
Agent
from langchain_core.tools import tool
from langchain_core.utils.function_calling import convert_to_openai_tool
from langchain.agents import AgentExecutor
from langchain.agents.output_parsers.openai_tools import OpenAIToolsAgentOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
@tool
def word_length(word: str) -> int:
"""Returns the length of the input word."""
return len(word)
tools = [word_length]
llm_with_tools = llm.bind(tools=[convert_to_openai_tool(t) for t in tools])
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("user", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent_executor = AgentExecutor(agent=agent, tools=tools)
agent_config = RunnableConfig({"run_name": "agent"}, callbacks=[langfuse_handler])
add_routes(app, agent_executor.with_config(agent_config), path="/test-agent")