Integrations
Instructor
Combine the Muster OpenAI wrapper with the Instructor library for structured LLM outputs and full observability.
Instructor is a library for structured LLM outputs — JSON schemas, Pydantic models, and tool/function-calling. Combine it with the Muster OpenAI integration to trace every structured generation.
Setup
pip install langfuse openai pydantic instructor --upgradeimport os
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_BASE_URL"] = "https://app.getmuster.io"
os.environ["OPENAI_API_KEY"] = "sk-proj-..."Basic Usage
The integration combines the Muster OpenAI integration with Instructor's patching functionality. Both sync and async clients are supported.
Synchronous
import instructor
from langfuse.openai import OpenAI
from pydantic import BaseModel
client = instructor.patch(OpenAI())
class WeatherDetail(BaseModel):
city: str
temperature: int
weather_info = client.chat.completions.create(
model="gpt-4o",
response_model=WeatherDetail,
messages=[
{"role": "user", "content": "The weather in Paris is 18 degrees Celsius."},
],
)
print(weather_info.model_dump_json(indent=2))Asynchronous
import instructor
from langfuse.openai import AsyncOpenAI
from pydantic import BaseModel
client = instructor.apatch(AsyncOpenAI())
class WeatherDetail(BaseModel):
city: str
temperature: int
response = await client.chat.completions.create(
model="gpt-4o",
response_model=WeatherDetail,
messages=[
{"role": "user", "content": "The weather in Paris is 18 degrees Celsius."},
],
)Comprehensive example: classify + score feedback
This example classifies customer feedback into categories
(PRAISE, SUGGESTION, BUG, QUESTION) and scores relevance on a
0.0-1.0 scale using asynchronous processing.
from typing import List, Tuple
from enum import Enum
import asyncio
import instructor
from langfuse.openai import AsyncOpenAI
from langfuse import observe, get_client
from pydantic import BaseModel, Field, field_validator
client = AsyncOpenAI()
client = instructor.patch(client, mode=instructor.Mode.TOOLS)
langfuse = get_client()
sem = asyncio.Semaphore(5)
class FeedbackType(Enum):
PRAISE = "PRAISE"
SUGGESTION = "SUGGESTION"
BUG = "BUG"
QUESTION = "QUESTION"
class FeedbackClassification(BaseModel):
feedback_text: str = Field(...)
classification: List[FeedbackType] = Field(description="Predicted categories")
relevance_score: float = Field(default=0.0)
@field_validator("classification", mode="before")
def validate_classification(cls, v):
if not isinstance(v, list):
v = [v]
return v
@observe()
async def classify_feedback(feedback: str) -> Tuple[FeedbackClassification, float]:
async with sem:
response = await client.chat.completions.create(
model="gpt-4o",
response_model=FeedbackClassification,
max_retries=2,
messages=[{"role": "user", "content": f"Classify and score this feedback: {feedback}"}],
)
observation_id = langfuse.get_current_observation_id()
return feedback, response, observation_id
def score_relevance(trace_id, observation_id, relevance_score):
langfuse.create_score(
trace_id=trace_id,
observation_id=observation_id,
name="feedback-relevance",
value=relevance_score,
)
@observe()
async def main(feedbacks):
tasks = [classify_feedback(f) for f in feedbacks]
results = []
for task in asyncio.as_completed(tasks):
feedback, classification, observation_id = await task
trace_id = langfuse.get_current_trace_id()
score_relevance(trace_id, observation_id, classification.relevance_score)
results.append({
"feedback": feedback,
"classification": [c.value for c in classification.classification],
"relevance_score": classification.relevance_score,
})
langfuse.flush()
return results
feedbacks = [
"The chat bot on your website does not work.",
"Your customer service is exceptional!",
"Could you add more features to your app?",
"I have a question about my recent order.",
]
results = await main(feedbacks)