Musterby Elitery
Integrations

OpenAI Assistants API

Trace OpenAI Assistants API calls in Muster using the @observe() decorator pattern.

The native Muster OpenAI wrapper does not support the OpenAI Assistants API due to its server-side state model. Instead, use the @observe() decorator to manually wrap the assistant lifecycle calls.

Setup

pip install --upgrade openai langfuse
import os

os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_BASE_URL"] = "https://app.getmuster.io"
os.environ["OPENAI_API_KEY"] = "sk-..."

Implementation

1. Create the assistant

from langfuse import observe
from openai import OpenAI

@observe()
def create_assistant():
    client = OpenAI()
    assistant = client.beta.assistants.create(
        name="Math Tutor",
        instructions="You are a personal math tutor. Answer questions briefly, in a sentence or less.",
        model="gpt-4",
    )
    return assistant

assistant = create_assistant()

2. Run the assistant

@observe()
def run_assistant(assistant_id, user_input):
    client = OpenAI()
    thread = client.beta.threads.create()

    client.beta.threads.messages.create(
        thread_id=thread.id,
        role="assistant",
        content="I am a math tutor that likes to help students. How can I help?",
    )
    client.beta.threads.messages.create(
        thread_id=thread.id, role="user", content=user_input
    )

    run = client.beta.threads.runs.create(
        thread_id=thread.id,
        assistant_id=assistant_id,
    )
    return run, thread

3. Retrieve responses

@observe()
def get_response(thread_id, run_id):
    client = OpenAI()
    messages = client.beta.threads.messages.list(thread_id=thread_id, order="asc")
    assistant_response = messages.data[-1].content[0].text.value

    run_log = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id)
    return assistant_response, run_log

4. Compose the workflow

import time

@observe()
def run_math_tutor(user_input):
    assistant = create_assistant()
    run, thread = run_assistant(assistant.id, user_input)
    time.sleep(5)
    response = get_response(thread.id, run.id)
    return response[0]

Each @observe()-wrapped step becomes a span on the trace, giving you a unified view of the multi-step assistant lifecycle.

See also