Musterby Elitery
Integrations

Instructor

Combine the Muster OpenAI wrapper with the Instructor library for structured LLM outputs and full observability.

Instructor is a library for structured LLM outputs — JSON schemas, Pydantic models, and tool/function-calling. Combine it with the Muster OpenAI integration to trace every structured generation.

Setup

pip install langfuse openai pydantic instructor --upgrade
import os

os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_BASE_URL"] = "https://app.getmuster.io"
os.environ["OPENAI_API_KEY"] = "sk-proj-..."

Basic Usage

The integration combines the Muster OpenAI integration with Instructor's patching functionality. Both sync and async clients are supported.

Synchronous

import instructor
from langfuse.openai import OpenAI
from pydantic import BaseModel

client = instructor.patch(OpenAI())

class WeatherDetail(BaseModel):
    city: str
    temperature: int

weather_info = client.chat.completions.create(
    model="gpt-4o",
    response_model=WeatherDetail,
    messages=[
        {"role": "user", "content": "The weather in Paris is 18 degrees Celsius."},
    ],
)

print(weather_info.model_dump_json(indent=2))

Asynchronous

import instructor
from langfuse.openai import AsyncOpenAI
from pydantic import BaseModel

client = instructor.apatch(AsyncOpenAI())

class WeatherDetail(BaseModel):
    city: str
    temperature: int

response = await client.chat.completions.create(
    model="gpt-4o",
    response_model=WeatherDetail,
    messages=[
        {"role": "user", "content": "The weather in Paris is 18 degrees Celsius."},
    ],
)

Comprehensive example: classify + score feedback

This example classifies customer feedback into categories (PRAISE, SUGGESTION, BUG, QUESTION) and scores relevance on a 0.0-1.0 scale using asynchronous processing.

from typing import List, Tuple
from enum import Enum
import asyncio
import instructor

from langfuse.openai import AsyncOpenAI
from langfuse import observe, get_client
from pydantic import BaseModel, Field, field_validator

client = AsyncOpenAI()
client = instructor.patch(client, mode=instructor.Mode.TOOLS)
langfuse = get_client()
sem = asyncio.Semaphore(5)

class FeedbackType(Enum):
    PRAISE = "PRAISE"
    SUGGESTION = "SUGGESTION"
    BUG = "BUG"
    QUESTION = "QUESTION"

class FeedbackClassification(BaseModel):
    feedback_text: str = Field(...)
    classification: List[FeedbackType] = Field(description="Predicted categories")
    relevance_score: float = Field(default=0.0)

    @field_validator("classification", mode="before")
    def validate_classification(cls, v):
        if not isinstance(v, list):
            v = [v]
        return v

@observe()
async def classify_feedback(feedback: str) -> Tuple[FeedbackClassification, float]:
    async with sem:
        response = await client.chat.completions.create(
            model="gpt-4o",
            response_model=FeedbackClassification,
            max_retries=2,
            messages=[{"role": "user", "content": f"Classify and score this feedback: {feedback}"}],
        )
        observation_id = langfuse.get_current_observation_id()
        return feedback, response, observation_id

def score_relevance(trace_id, observation_id, relevance_score):
    langfuse.create_score(
        trace_id=trace_id,
        observation_id=observation_id,
        name="feedback-relevance",
        value=relevance_score,
    )

@observe()
async def main(feedbacks):
    tasks = [classify_feedback(f) for f in feedbacks]
    results = []
    for task in asyncio.as_completed(tasks):
        feedback, classification, observation_id = await task
        trace_id = langfuse.get_current_trace_id()
        score_relevance(trace_id, observation_id, classification.relevance_score)
        results.append({
            "feedback": feedback,
            "classification": [c.value for c in classification.classification],
            "relevance_score": classification.relevance_score,
        })
    langfuse.flush()
    return results

feedbacks = [
    "The chat bot on your website does not work.",
    "Your customer service is exceptional!",
    "Could you add more features to your app?",
    "I have a question about my recent order.",
]

results = await main(feedbacks)

Instructor trace in Muster

See also