Compare commits

...

9 Commits

Author SHA1 Message Date
Jesse Gross
a50199cd70 mlxrunner: batch the sampler across multiple sequences
Register sequences with Add/Remove; each Sample call takes any subset of
registered slots and samples one token per row, appending to each slot's
ring-buffer history. When all slots share Options and penalty rings are
full, one fused transform pass runs over the whole batch via a persistent
pooled history tensor; otherwise calls fall back to per-slot serial
processing indexed against the same pool.

Performance is unchanged for a single sequence, which is all that is
exposed for now.
2026-04-21 15:09:19 -07:00
Jesse Gross
5264ba9194 mlxrunner: track sampler history in a fixed-size ring buffer
AppendToken used to concatenate the new token onto the history tensor
and slice it back to RepeatLastN every decode step, churning the graph
shape and reallocating a fresh tensor each call. The stateful penalties
don't care about order within the window, so a fixed-capacity ring with
one SliceUpdate per append keeps the tensor shape constant across
steps.
2026-04-21 14:40:19 -07:00
Jesse Gross
ce99f24731 mlxrunner: tokenize prompts in request handler goroutines
Move tokenization out of the single GPU processing goroutine and
into each request's HTTP handler goroutine. This allows the next
request's prompt to be tokenized on the CPU while the current
request is executing on the GPU.
2026-04-21 14:38:49 -07:00
Jesse Gross
04f5f0cdb4 mlx: improve thread safety of array management
Use atomic.Int32 for Array.pinned and a sync.Mutex for the global
arrays slice so MLX arrays can be created and pinned from multiple
goroutines without racing on those structures. Convert Array value
receivers to pointer receivers and struct fields from Array to
*Array to avoid copying the atomic.

This does not fully achieve thread safety even when building
completely independent graphs. The tracing flag and traceScratch
slice in compile.go are unprotected, so concurrent Compile calls
will race. MLX itself is not fully thread-safe either although
it is working to improve.
2026-04-21 14:38:49 -07:00
Matteo Celani
fb36a01ffe app/ui: fix model picker showing stale model after switching chats (#15280)
* app/ui: fix model picker showing stale model after switching chats

Optimistic messages created during streaming were storing the full
Model object instead of the model name string. When switching back
to a chat with cached streaming data, the restore effect read an
object where it expected a string, causing the model picker to fail
matching and remain stuck on the previous chat's model.

* app/ui: fix two more instances of Model object passed as model name

Fix the same bug at lines 523 and 536 in the assistant_with_tools
event handler, where selectedModel (object) was used instead of
selectedModel.model (string).
2026-04-21 15:08:06 -04:00
Michael Verrilli
0c65ed33bc cmd: populate model capabilities in launchInteractiveModel (#15712)
launchInteractiveModel was introduced in PR #14609 without the
client.Show() capability-detection block that RunHandler uses.
This left opts.MultiModal always false in the TUI path, causing
image/audio file paths to always be treated as unknown commands
instead of being loaded as multimodal attachments.

Mirror the Show() call, pull-on-404 fallback, cloud auth handling,
and MultiModal/Think population from RunHandler into
launchInteractiveModel.

Fixes #15711
2026-04-21 14:37:36 -04:00
Jesse Gross
22d6c817f8 mlxrunner: fuse top-P and top-K into a single sort pass
When both filters are active, avoid paying for a full sort in top-P
and a partial sort in top-K. Single-filter paths are unchanged.
Improves generation throughput on gemma4:e4b by 1.5%.
2026-04-20 17:43:00 -07:00
Jesse Gross
ca01373b28 mlxrunner: use MaxAxis in the min-P sampler
One reduction op instead of Argmax + TakeAlongAxis.
2026-04-20 17:43:00 -07:00
Jesse Gross
24e038d56a mlxrunner: add logprobs support
Match the ollamarunner and OpenAI semantics: raw, full-vocab log-softmax
with the top-K ranked by probability. Skipped on the GPU when the request
doesn't ask for logprobs so decode doesn't pay for it otherwise.
2026-04-20 17:43:00 -07:00
18 changed files with 1338 additions and 386 deletions

View File

@@ -381,7 +381,7 @@ export const useSendMessage = (chatId: string) => {
role: "assistant", role: "assistant",
content: "", content: "",
thinking: "", thinking: "",
model: effectiveModel, model: effectiveModel.model,
}), }),
); );
lastMessage = newMessages[newMessages.length - 1]; lastMessage = newMessages[newMessages.length - 1];
@@ -433,7 +433,7 @@ export const useSendMessage = (chatId: string) => {
role: "assistant", role: "assistant",
content: "", content: "",
thinking: "", thinking: "",
model: effectiveModel, model: effectiveModel.model,
}), }),
); );
lastMessage = newMessages[newMessages.length - 1]; lastMessage = newMessages[newMessages.length - 1];
@@ -520,7 +520,7 @@ export const useSendMessage = (chatId: string) => {
thinkingTimeStart: thinkingTimeStart:
lastMessage.thinkingTimeStart || event.thinkingTimeStart, lastMessage.thinkingTimeStart || event.thinkingTimeStart,
thinkingTimeEnd: event.thinkingTimeEnd, thinkingTimeEnd: event.thinkingTimeEnd,
model: selectedModel, model: selectedModel.model,
}); });
newMessages[newMessages.length - 1] = updatedMessage; newMessages[newMessages.length - 1] = updatedMessage;
} else { } else {
@@ -533,7 +533,7 @@ export const useSendMessage = (chatId: string) => {
tool_calls: event.toolCalls, tool_calls: event.toolCalls,
thinkingTimeStart: event.thinkingTimeStart, thinkingTimeStart: event.thinkingTimeStart,
thinkingTimeEnd: event.thinkingTimeEnd, thinkingTimeEnd: event.thinkingTimeEnd,
model: selectedModel, model: selectedModel.model,
}), }),
); );
} }
@@ -699,7 +699,7 @@ export const useSendMessage = (chatId: string) => {
queryClient.setQueryData(["chat", newId], { queryClient.setQueryData(["chat", newId], {
chat: new Chat({ chat: new Chat({
id: newId, id: newId,
model: effectiveModel, model: effectiveModel.model,
messages: [ messages: [
new Message({ new Message({
role: "user", role: "user",

View File

@@ -1975,8 +1975,61 @@ func launchInteractiveModel(cmd *cobra.Command, modelName string) error {
Options: map[string]any{}, Options: map[string]any{},
ShowConnect: true, ShowConnect: true,
} }
// loadOrUnloadModel is cloud-safe here: remote/cloud models skip local preload
// and only validate auth/connectivity before interactive chat starts. client, err := api.ClientFromEnvironment()
if err != nil {
return err
}
requestedCloud := modelref.HasExplicitCloudSource(modelName)
info, err := func() (*api.ShowResponse, error) {
showReq := &api.ShowRequest{Name: modelName}
info, err := client.Show(cmd.Context(), showReq)
var se api.StatusError
if errors.As(err, &se) && se.StatusCode == http.StatusNotFound {
if requestedCloud {
return nil, err
}
if err := PullHandler(cmd, []string{modelName}); err != nil {
return nil, err
}
return client.Show(cmd.Context(), &api.ShowRequest{Name: modelName})
}
return info, err
}()
if err != nil {
if handleCloudAuthorizationError(err) {
return nil
}
return err
}
ensureCloudStub(cmd.Context(), client, modelName)
opts.Think, err = inferThinkingOption(&info.Capabilities, &opts, false)
if err != nil {
return err
}
audioCapable := slices.Contains(info.Capabilities, model.CapabilityAudio)
opts.MultiModal = slices.Contains(info.Capabilities, model.CapabilityVision) || audioCapable
// TODO: remove the projector info and vision info checks below,
// these are left in for backwards compatibility with older servers
// that don't have the capabilities field in the model info
if len(info.ProjectorInfo) != 0 {
opts.MultiModal = true
}
for k := range info.ModelInfo {
if strings.Contains(k, ".vision.") {
opts.MultiModal = true
break
}
}
applyShowResponseToRunOptions(&opts, info)
if err := loadOrUnloadModel(cmd, &opts); err != nil { if err := loadOrUnloadModel(cmd, &opts); err != nil {
return fmt.Errorf("error loading model: %w", err) return fmt.Errorf("error loading model: %w", err)
} }

View File

@@ -406,10 +406,6 @@ func TestAPIShowModel(t *testing.T) {
} }
func TestAPIGenerateLogprobs(t *testing.T) { func TestAPIGenerateLogprobs(t *testing.T) {
if testModel != "" {
// Logprobs requires runner support (e.g. llama.cpp has it, MLX does not).
t.Skip("logprobs not supported by all runners")
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel() defer cancel()
@@ -523,10 +519,6 @@ func TestAPIGenerateLogprobs(t *testing.T) {
} }
func TestAPIChatLogprobs(t *testing.T) { func TestAPIChatLogprobs(t *testing.T) {
if testModel != "" {
// Logprobs requires runner support (e.g. llama.cpp has it, MLX does not).
t.Skip("logprobs not supported by all runners")
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel() defer cancel()

View File

@@ -151,22 +151,11 @@ func (c *Client) WaitUntilRunning(ctx context.Context) error {
} }
} }
// completionRequest is a properly-tagged version of llm.CompletionRequest for JSON serialization. type CompletionRequest struct {
type completionRequest struct { Prompt string
Prompt string `json:"prompt"` Options api.Options
Options *completionOpts `json:"options,omitempty"` Logprobs bool
} TopLogprobs int
type completionOpts struct {
Temperature float32 `json:"temperature,omitempty"`
TopP float32 `json:"top_p,omitempty"`
MinP float32 `json:"min_p,omitempty"`
TopK int `json:"top_k,omitempty"`
RepeatLastN int `json:"repeat_last_n,omitempty"`
RepeatPenalty float32 `json:"repeat_penalty,omitempty"`
PresencePenalty float32 `json:"presence_penalty,omitempty"`
FrequencyPenalty float32 `json:"frequency_penalty,omitempty"`
NumPredict int `json:"num_predict,omitempty"`
} }
type CompletionResponse struct { type CompletionResponse struct {
@@ -179,6 +168,8 @@ type CompletionResponse struct {
EvalCount int EvalCount int
EvalDuration time.Duration EvalDuration time.Duration
Logprobs []llm.Logprob
Error *api.StatusError Error *api.StatusError
} }
@@ -203,21 +194,13 @@ func (c *Client) Close() error {
// Completion implements llm.LlamaServer. // Completion implements llm.LlamaServer.
func (c *Client) Completion(ctx context.Context, req llm.CompletionRequest, fn func(llm.CompletionResponse)) error { func (c *Client) Completion(ctx context.Context, req llm.CompletionRequest, fn func(llm.CompletionResponse)) error {
creq := completionRequest{ creq := CompletionRequest{
Prompt: req.Prompt, Prompt: req.Prompt,
Logprobs: req.Logprobs,
TopLogprobs: req.TopLogprobs,
} }
if req.Options != nil { if req.Options != nil {
creq.Options = &completionOpts{ creq.Options = *req.Options
Temperature: req.Options.Temperature,
TopP: req.Options.TopP,
MinP: req.Options.MinP,
TopK: req.Options.TopK,
RepeatLastN: req.Options.RepeatLastN,
RepeatPenalty: req.Options.RepeatPenalty,
PresencePenalty: req.Options.PresencePenalty,
FrequencyPenalty: req.Options.FrequencyPenalty,
NumPredict: req.Options.NumPredict,
}
} }
body, err := json.Marshal(creq) body, err := json.Marshal(creq)
@@ -243,7 +226,7 @@ func (c *Client) Completion(ctx context.Context, req llm.CompletionRequest, fn f
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body) respBody, _ := io.ReadAll(resp.Body)
return fmt.Errorf("%s", strings.TrimSpace(string(respBody))) return api.StatusError{StatusCode: resp.StatusCode, ErrorMessage: strings.TrimSpace(string(respBody))}
} }
scanner := bufio.NewScanner(resp.Body) scanner := bufio.NewScanner(resp.Body)
@@ -266,6 +249,7 @@ func (c *Client) Completion(ctx context.Context, req llm.CompletionRequest, fn f
PromptEvalDuration: raw.PromptEvalDuration, PromptEvalDuration: raw.PromptEvalDuration,
EvalCount: raw.EvalCount, EvalCount: raw.EvalCount,
EvalDuration: raw.EvalDuration, EvalDuration: raw.EvalDuration,
Logprobs: raw.Logprobs,
} }
fn(cresp) fn(cresp)

View File

@@ -10,6 +10,8 @@ import (
"reflect" "reflect"
"sort" "sort"
"strings" "strings"
"sync"
"sync/atomic"
"unsafe" "unsafe"
"github.com/ollama/ollama/logutil" "github.com/ollama/ollama/logutil"
@@ -18,20 +20,28 @@ import (
type Array struct { type Array struct {
ctx C.mlx_array ctx C.mlx_array
name string name string
pinned int pinned atomic.Int32
} }
var arrays []*Array var (
arrays []*Array
arraysMu sync.Mutex
)
// constructor utilities // constructor utilities
func New(name string) *Array { func New(name string) *Array {
t := &Array{name: name} t := &Array{name: name}
if tracing { if tracing {
traceScratch = append(traceScratch, t) traceScratch = append(traceScratch, t)
} else { } else {
arraysMu.Lock()
defer arraysMu.Unlock()
arrays = append(arrays, t) arrays = append(arrays, t)
} }
return t return t
} }
@@ -131,7 +141,7 @@ func (t *Array) Clone() *Array {
func Pin(s ...*Array) { func Pin(s ...*Array) {
for _, t := range s { for _, t := range s {
if t != nil { if t != nil {
t.pinned++ t.pinned.Add(1)
} }
} }
} }
@@ -140,8 +150,7 @@ func Pin(s ...*Array) {
func Unpin(s ...*Array) { func Unpin(s ...*Array) {
for _, t := range s { for _, t := range s {
if t != nil { if t != nil {
t.pinned-- if t.pinned.Add(-1) < 0 {
if t.pinned < 0 {
panic(fmt.Sprintf("mlx.Unpin: negative pin count on array %q", t.name)) panic(fmt.Sprintf("mlx.Unpin: negative pin count on array %q", t.name))
} }
} }
@@ -151,9 +160,11 @@ func Unpin(s ...*Array) {
// Sweep releases all unpinned arrays, primarily intermediate tensors. MLX will truly // Sweep releases all unpinned arrays, primarily intermediate tensors. MLX will truly
// free them when there are no other references, including dependencies in the graph. // free them when there are no other references, including dependencies in the graph.
func Sweep() { func Sweep() {
arraysMu.Lock()
defer arraysMu.Unlock()
n := 0 n := 0
for _, t := range arrays { for _, t := range arrays {
if t.pinned > 0 && t.Valid() { if t.pinned.Load() > 0 && t.Valid() {
arrays[n] = t arrays[n] = t
n++ n++
} else if t.Valid() { } else if t.Valid() {
@@ -180,7 +191,7 @@ func (t *Array) String() string {
func (t *Array) LogValue() slog.Value { func (t *Array) LogValue() slog.Value {
attrs := []slog.Attr{ attrs := []slog.Attr{
slog.String("name", t.name), slog.String("name", t.name),
slog.Int("pinned", t.pinned), slog.Int("pinned", int(t.pinned.Load())),
} }
if t.Valid() { if t.Valid() {
attrs = append(attrs, attrs = append(attrs,
@@ -194,19 +205,19 @@ func (t *Array) LogValue() slog.Value {
// shape utilities // shape utilities
func (t Array) Size() int { func (t *Array) Size() int {
return int(C.mlx_array_size(t.ctx)) return int(C.mlx_array_size(t.ctx))
} }
func (t Array) NumBytes() int { func (t *Array) NumBytes() int {
return int(C.mlx_array_nbytes(t.ctx)) return int(C.mlx_array_nbytes(t.ctx))
} }
func (t Array) NumDims() int { func (t *Array) NumDims() int {
return int(C.mlx_array_ndim(t.ctx)) return int(C.mlx_array_ndim(t.ctx))
} }
func (t Array) Dims() []int { func (t *Array) Dims() []int {
dims := make([]int, t.NumDims()) dims := make([]int, t.NumDims())
for i := range dims { for i := range dims {
dims[i] = t.Dim(i) dims[i] = t.Dim(i)
@@ -215,29 +226,32 @@ func (t Array) Dims() []int {
return dims return dims
} }
func (t Array) Dim(dim int) int { func (t *Array) Dim(dim int) int {
return int(C.mlx_array_dim(t.ctx, C.int(dim))) return int(C.mlx_array_dim(t.ctx, C.int(dim)))
} }
func (t Array) DType() DType { func (t *Array) DType() DType {
return DType(C.mlx_array_dtype(t.ctx)) return DType(C.mlx_array_dtype(t.ctx))
} }
// data utilities // data utilities
func (t Array) Int() int { func (t *Array) Int() int {
var item C.int64_t var item C.int64_t
C.mlx_array_item_int64(&item, t.ctx) C.mlx_array_item_int64(&item, t.ctx)
return int(item) return int(item)
} }
func (t Array) Float() float64 { func (t *Array) Float() float64 {
var item C.double var item C.double
C.mlx_array_item_float64(&item, t.ctx) C.mlx_array_item_float64(&item, t.ctx)
return float64(item) return float64(item)
} }
func (t Array) Ints() []int { func (t *Array) Ints() []int {
if dt := t.DType(); dt != DTypeInt32 {
panic(fmt.Sprintf("mlx: Ints requires DTypeInt32, got %v", dt))
}
ints := make([]int, t.Size()) ints := make([]int, t.Size())
for i, f := range unsafe.Slice(C.mlx_array_data_int32(t.ctx), len(ints)) { for i, f := range unsafe.Slice(C.mlx_array_data_int32(t.ctx), len(ints)) {
ints[i] = int(f) ints[i] = int(f)
@@ -245,7 +259,10 @@ func (t Array) Ints() []int {
return ints return ints
} }
func (t Array) Floats() []float32 { func (t *Array) Floats() []float32 {
if dt := t.DType(); dt != DTypeFloat32 {
panic(fmt.Sprintf("mlx: Floats requires DTypeFloat32, got %v", dt))
}
floats := make([]float32, t.Size()) floats := make([]float32, t.Size())
for i, f := range unsafe.Slice(C.mlx_array_data_float32(t.ctx), len(floats)) { for i, f := range unsafe.Slice(C.mlx_array_data_float32(t.ctx), len(floats)) {
floats[i] = float32(f) floats[i] = float32(f)
@@ -253,7 +270,7 @@ func (t Array) Floats() []float32 {
return floats return floats
} }
func (t Array) Save(name string) error { func (t *Array) Save(name string) error {
cName := C.CString(name) cName := C.CString(name)
defer C.free(unsafe.Pointer(cName)) defer C.free(unsafe.Pointer(cName))
C.mlx_save(cName, t.ctx) C.mlx_save(cName, t.ctx)
@@ -262,6 +279,8 @@ func (t Array) Save(name string) error {
// LogArrays logs all live arrays, sorted by size // LogArrays logs all live arrays, sorted by size
func LogArrays() { func LogArrays() {
arraysMu.Lock()
defer arraysMu.Unlock()
sort.Slice(arrays, func(i, j int) bool { sort.Slice(arrays, func(i, j int) bool {
return arrays[i].NumBytes() > arrays[j].NumBytes() return arrays[i].NumBytes() > arrays[j].NumBytes()
}) })
@@ -270,7 +289,7 @@ func LogArrays() {
for _, t := range arrays { for _, t := range arrays {
nb := t.NumBytes() nb := t.NumBytes()
total += nb total += nb
logutil.Trace(fmt.Sprintf("tensor %-60s %5s %5s pinned=%d %v", t.name, t.DType(), PrettyBytes(nb), t.pinned, t.Dims())) logutil.Trace(fmt.Sprintf("tensor %-60s %5s %5s pinned=%d %v", t.name, t.DType(), PrettyBytes(nb), t.pinned.Load(), t.Dims()))
} }
logutil.Trace(fmt.Sprintf("tensors total: %d, size: %s, active: %s", len(arrays), PrettyBytes(total), PrettyBytes(ActiveMemory()))) logutil.Trace(fmt.Sprintf("tensors total: %d, size: %s, active: %s", len(arrays), PrettyBytes(total), PrettyBytes(ActiveMemory())))
} }

View File

@@ -150,7 +150,7 @@ func closureCallback(res *C.mlx_vector_array, input C.mlx_vector_array, payload
traceScratch = nil traceScratch = nil
defer func() { defer func() {
for _, a := range traceScratch { for _, a := range traceScratch {
if a.pinned > 0 { if a.pinned.Load() > 0 {
panic("mlx: traced array was pinned during compilation") panic("mlx: traced array was pinned during compilation")
} }
if a.Valid() { if a.Valid() {

View File

@@ -24,8 +24,8 @@ func ScaledDotProductAttention(query, key, value, mask *Array, scale float32) *A
} }
type LayerNorm struct { type LayerNorm struct {
Weight Array `weight:"weight"` Weight *Array `weight:"weight"`
Bias Array `weight:"bias"` Bias *Array `weight:"bias"`
} }
func (r *LayerNorm) Forward(x *Array, eps float32) *Array { func (r *LayerNorm) Forward(x *Array, eps float32) *Array {
@@ -35,10 +35,10 @@ func (r *LayerNorm) Forward(x *Array, eps float32) *Array {
} }
type RMSNorm struct { type RMSNorm struct {
Weight Array `weight:"weight"` Weight *Array `weight:"weight"`
} }
func (r RMSNorm) Forward(x *Array, eps float32) *Array { func (r *RMSNorm) Forward(x *Array, eps float32) *Array {
out := New("FAST_RMSNORM") out := New("FAST_RMSNORM")
C.mlx_fast_rms_norm(&out.ctx, x.ctx, r.Weight.ctx, C.float(eps), DefaultStream().ctx) C.mlx_fast_rms_norm(&out.ctx, x.ctx, r.Weight.ctx, C.float(eps), DefaultStream().ctx)
return out return out

View File

@@ -1,12 +1,12 @@
package mlx package mlx
type Linear struct { type Linear struct {
Weight Array `weight:"weight"` Weight *Array `weight:"weight"`
Bias Array `weight:"bias"` Bias *Array `weight:"bias"`
} }
// Forward computes the linear transformation: x @ Weight.T + Bias // Forward computes the linear transformation: x @ Weight.T + Bias
func (m Linear) Forward(x *Array) *Array { func (m *Linear) Forward(x *Array) *Array {
w := m.Weight.Transpose(1, 0) w := m.Weight.Transpose(1, 0)
if m.Bias.Valid() { if m.Bias.Valid() {
return m.Bias.Addmm(x, w, 1.0, 1.0) return m.Bias.Addmm(x, w, 1.0, 1.0)
@@ -15,14 +15,14 @@ func (m Linear) Forward(x *Array) *Array {
return x.Matmul(w) return x.Matmul(w)
} }
func (m Linear) Gather(x, lhs, rhs *Array, sorted bool) *Array { func (m *Linear) Gather(x, lhs, rhs *Array, sorted bool) *Array {
w := m.Weight.Transpose(0, 2, 1) w := m.Weight.Transpose(0, 2, 1)
// TODO: bias // TODO: bias
return x.GatherMM(w, lhs, rhs, sorted) return x.GatherMM(w, lhs, rhs, sorted)
} }
type Embedding struct { type Embedding struct {
Weight Array `weight:"weight"` Weight *Array `weight:"weight"`
} }
func (e *Embedding) Forward(indices *Array) *Array { func (e *Embedding) Forward(indices *Array) *Array {

View File

@@ -72,6 +72,10 @@ func (t *Array) AsStrided(shape []int, strides []int, offset int) *Array {
} }
func (t *Array) Concatenate(axis int, others ...*Array) *Array { func (t *Array) Concatenate(axis int, others ...*Array) *Array {
if len(others) == 0 {
return t
}
vector := C.mlx_vector_array_new() vector := C.mlx_vector_array_new()
defer C.mlx_vector_array_free(vector) defer C.mlx_vector_array_free(vector)
@@ -127,9 +131,9 @@ func (t *Array) GatherMM(other, lhs, rhs *Array, sorted bool) *Array {
return out return out
} }
func (t *Array) Logsumexp(keepDims bool) *Array { func (t *Array) LogsumexpAxis(axis int, keepDims bool) *Array {
out := New("LOGSUMEXP") out := New("LOGSUMEXP_AXIS")
C.mlx_logsumexp(&out.ctx, t.ctx, C.bool(keepDims), DefaultStream().ctx) C.mlx_logsumexp_axis(&out.ctx, t.ctx, C.int(axis), C.bool(keepDims), DefaultStream().ctx)
return out return out
} }
@@ -139,6 +143,12 @@ func (t *Array) Less(other *Array) *Array {
return out return out
} }
func (t *Array) MaxAxis(axis int, keepDims bool) *Array {
out := New("MAX_AXIS")
C.mlx_max_axis(&out.ctx, t.ctx, C.int(axis), C.bool(keepDims), DefaultStream().ctx)
return out
}
func (t *Array) Matmul(other *Array) *Array { func (t *Array) Matmul(other *Array) *Array {
out := New("MATMUL") out := New("MATMUL")
C.mlx_matmul(&out.ctx, t.ctx, other.ctx, DefaultStream().ctx) C.mlx_matmul(&out.ctx, t.ctx, other.ctx, DefaultStream().ctx)

View File

@@ -376,6 +376,9 @@ func Concatenate(arrays []*Array, axis int) *Array {
if len(arrays) == 0 { if len(arrays) == 0 {
return nil return nil
} }
if len(arrays) == 1 {
return arrays[0]
}
return arrays[0].Concatenate(axis, arrays[1:]...) return arrays[0].Concatenate(axis, arrays[1:]...)
} }

View File

@@ -6,36 +6,60 @@ import (
"errors" "errors"
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "sort"
"time" "time"
"github.com/ollama/ollama/api" "github.com/ollama/ollama/llm"
"github.com/ollama/ollama/logutil" "github.com/ollama/ollama/logutil"
"github.com/ollama/ollama/x/mlxrunner/mlx" "github.com/ollama/ollama/x/mlxrunner/mlx"
sampler "github.com/ollama/ollama/x/mlxrunner/sample"
"github.com/ollama/ollama/x/tokenizer"
) )
func prefillChunkSize() int { func prefillChunkSize() int {
return 2 << 10 return 2 << 10
} }
func (r *Runner) TextGenerationPipeline(request Request) error { // Prepare tokenizes the prompt and validates it against the model's
// context length. It is safe to call from any goroutine. On success it
// populates request.Tokens and adjusts request.Options.NumPredict.
func (r *Runner) Prepare(request *Request) error {
if r.Model == nil { if r.Model == nil {
return errors.New("model not loaded") return errors.New("model not loaded")
} }
tokens := r.Tokenizer.Encode(request.Prompt, r.Tokenizer.AddBOS())
if len(tokens) == 0 {
return errors.New("empty prompt")
}
if len(tokens) >= r.contextLength {
return fmt.Errorf("input length (%d tokens) exceeds the model's maximum context length (%d tokens)", len(tokens), r.contextLength)
}
// Cap generation to stay within the model's context length
maxGenerate := r.contextLength - len(tokens)
if request.Options.NumPredict <= 0 {
request.Options.NumPredict = maxGenerate
} else {
request.Options.NumPredict = min(request.Options.NumPredict, maxGenerate)
}
request.Tokens = tokens
return nil
}
// The runner serializes requests today so we just use a fixed slot ID.
const pipelineSlot = 0
func (r *Runner) TextGenerationPipeline(ctx context.Context, request Request) error {
mlx.ResetPeakMemory() mlx.ResetPeakMemory()
ctx := request.Ctx var sample, nextSample sampler.Result
var (
sample *mlx.Array
nextSample *mlx.Array
)
defer func() { defer func() {
if request.Sampler != nil { r.Sampler.Remove(pipelineSlot)
request.Sampler.Free() mlx.Unpin(sample.Arrays()...)
} mlx.Unpin(nextSample.Arrays()...)
mlx.Unpin(sample)
mlx.Unpin(nextSample)
mlx.Sweep() mlx.Sweep()
mlx.ClearCache() mlx.ClearCache()
@@ -46,27 +70,7 @@ func (r *Runner) TextGenerationPipeline(request Request) error {
slog.Info("peak memory", "size", mlx.PrettyBytes(mlx.PeakMemory())) slog.Info("peak memory", "size", mlx.PrettyBytes(mlx.PeakMemory()))
}() }()
inputs := r.Tokenizer.Encode(request.Prompt, r.Tokenizer.AddBOS()) inputs := request.Tokens
if len(inputs) == 0 {
return errors.New("empty prompt")
}
if len(inputs) >= r.contextLength {
return api.StatusError{
StatusCode: http.StatusBadRequest,
ErrorMessage: fmt.Sprintf("input length (%d tokens) exceeds the model's maximum context length (%d tokens)", len(inputs), r.contextLength),
}
}
// Cap generation to stay within the model's context length
maxGenerate := r.contextLength - len(inputs)
if request.Options.MaxTokens <= 0 {
request.Options.MaxTokens = maxGenerate
} else {
request.Options.MaxTokens = min(request.Options.MaxTokens, maxGenerate)
}
request.Sampler.ResetHistory(inputs)
session := r.cache.begin(r.Model, inputs) session := r.cache.begin(r.Model, inputs)
defer session.close() defer session.close()
@@ -118,7 +122,7 @@ func (r *Runner) TextGenerationPipeline(request Request) error {
} }
} }
r.Model.Forward(mlx.FromValues(tokens[processed:processed+n], n).ExpandDims(0), caches) r.Model.Forward(mlx.FromValues(tokens[processed:processed+n], 1, n), caches)
mlx.Sweep() mlx.Sweep()
materializeCaches() materializeCaches()
processed += n processed += n
@@ -135,40 +139,44 @@ func (r *Runner) TextGenerationPipeline(request Request) error {
mlx.ClearCache() mlx.ClearCache()
} }
step := func(token *mlx.Array) *mlx.Array { // Register the sampler after prefill completes.
fwd := r.Model.Forward(token.ExpandDims(0), caches) r.Sampler.Add(pipelineSlot, request.SamplerOpts, inputs)
step := func(token *mlx.Array) sampler.Result {
fwd := r.Model.Forward(token, caches)
logits := r.Model.Unembed(fwd) logits := r.Model.Unembed(fwd)
logits = logits.Slice(mlx.Slice(), mlx.Slice(logits.Dim(1)-1), mlx.Slice()).Squeeze(1) logits = logits.Slice(mlx.Slice(), mlx.Slice(logits.Dim(1)-1), mlx.Slice()).Squeeze(1)
sample := request.Sampler.Sample(logits) sample := r.Sampler.Sample([]int{pipelineSlot}, logits)
mlx.Pin(sample.Arrays()...)
mlx.Pin(sample)
mlx.Sweep() mlx.Sweep()
mlx.AsyncEval(sample) mlx.AsyncEval(sample.Arrays()...)
return sample return sample
} }
sample = step(mlx.FromValues(tokens[processed:], total-processed)) sample = step(mlx.FromValues(tokens[processed:], 1, total-processed))
var b bytes.Buffer dec := decoder{
tokenizer: r.Tokenizer,
wantLogprobs: request.SamplerOpts.Logprobs,
wantTopLogprobs: request.SamplerOpts.TopLogprobs,
}
final := CompletionResponse{Done: true, PromptEvalCount: len(inputs), EvalCount: request.Options.MaxTokens, DoneReason: 1} final := CompletionResponse{Done: true, PromptEvalCount: len(inputs), EvalCount: request.Options.NumPredict, DoneReason: 1}
for i := range request.Options.MaxTokens { for i := range request.Options.NumPredict {
if err := ctx.Err(); err != nil { if err := ctx.Err(); err != nil {
return err return err
} }
request.Sampler.AppendToken(sample) nextSample = step(sample.Token.ExpandDims(-1))
nextSample = step(sample)
if i == 0 { if i == 0 {
mlx.Eval(sample) mlx.Eval(sample.Arrays()...)
final.PromptEvalDuration = time.Since(now) final.PromptEvalDuration = time.Since(now)
now = time.Now() now = time.Now()
} }
output := int32(sample.Int()) output := int32(sample.Token.Int())
session.outputs = append(session.outputs, output) session.outputs = append(session.outputs, output)
if r.Tokenizer.IsEOS(output) { if r.Tokenizer.IsEOS(output) {
@@ -177,17 +185,16 @@ func (r *Runner) TextGenerationPipeline(request Request) error {
break break
} }
select { if resp, ok := dec.decode(sample); ok {
case <-ctx.Done(): select {
return ctx.Err() case <-ctx.Done():
case request.Responses <- CompletionResponse{ return ctx.Err()
Content: r.Decode(output, &b), case request.Responses <- resp:
}: }
} }
mlx.Unpin(sample) mlx.Unpin(sample.Arrays()...)
sample = nextSample sample, nextSample = nextSample, sampler.Result{}
nextSample = nil
if i%256 == 0 { if i%256 == 0 {
mlx.ClearCache() mlx.ClearCache()
@@ -203,13 +210,69 @@ func (r *Runner) TextGenerationPipeline(request Request) error {
} }
} }
func (r Runner) Decode(sample int32, b *bytes.Buffer) string { // decoder serializes sampled tokens into response chunks, holding bytes
token := r.Tokenizer.Decode([]int32{sample}) // whose UTF-8 sequence hasn't completed yet and the logprobs that belong
// with those bytes so Content and Logprobs stay aligned when a chunk does
// flush.
type decoder struct {
tokenizer *tokenizer.Tokenizer
buf bytes.Buffer
logprobs []llm.Logprob
wantLogprobs bool
wantTopLogprobs int
}
if _, err := b.WriteString(token); err != nil { func (d *decoder) decode(res sampler.Result) (CompletionResponse, bool) {
slog.Error("Failed to write token to buffer", "error", err) output := int32(res.Token.Int())
return "" d.buf.WriteString(d.tokenizer.Decode([]int32{output}))
d.logprobs = append(d.logprobs, buildLogprob(res, d.wantLogprobs, d.wantTopLogprobs, d.tokenizer.Decode)...)
content := flushValidUTF8Prefix(&d.buf)
if content == "" {
return CompletionResponse{}, false
}
resp := CompletionResponse{Content: content, Logprobs: d.logprobs}
d.logprobs = nil
return resp, true
}
// buildLogprob converts the sampler's logprob tensors into the wire-format
// llm.Logprob entries the caller wants. The sampler populates its logprob
// tensors whenever any registered slot requested them, so the caller must
// gate emission on its own request config (wantLogprobs / wantTopLogprobs)
// rather than on whether the tensors happen to be non-nil.
func buildLogprob(sample sampler.Result, wantLogprobs bool, wantTopLogprobs int, decode func([]int32) string) []llm.Logprob {
if !wantLogprobs || sample.Logprob == nil {
return nil
}
tok := func(id int32) string { return decode([]int32{id}) }
out := llm.Logprob{
TokenLogprob: llm.TokenLogprob{
Token: tok(int32(sample.Token.Int())),
Logprob: float64(sample.Logprob.Floats()[0]),
},
} }
return flushValidUTF8Prefix(b) if wantTopLogprobs > 0 && sample.TopTokens != nil {
ids := sample.TopTokens.Ints()
vals := sample.TopLogprobs.Floats()
pairs := make([]llm.TokenLogprob, len(ids))
for i, id := range ids {
pairs[i] = llm.TokenLogprob{
Token: tok(int32(id)),
Logprob: float64(vals[i]),
}
}
// The sampler emits the top maxK across registered slots via
// Argpartition, which leaves entries unsorted.
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].Logprob > pairs[j].Logprob
})
if wantTopLogprobs < len(pairs) {
pairs = pairs[:wantTopLogprobs]
}
out.TopLogprobs = pairs
}
return []llm.Logprob{out}
} }

View File

@@ -18,38 +18,25 @@ import (
"github.com/ollama/ollama/x/tokenizer" "github.com/ollama/ollama/x/tokenizer"
) )
// Request is a short-lived struct that carries a completion request through
// a channel from the HTTP handler to the runner goroutine. The ctx field
// must travel with the request so that cancellation propagates across the
// channel boundary.
type Request struct { type Request struct {
TextCompletionsRequest CompletionRequest
Responses chan CompletionResponse Responses chan CompletionResponse
Pipeline func(Request) error Pipeline func(context.Context, Request) error
Ctx context.Context Ctx context.Context //nolint:containedctx
Tokens []int32
Sampler *sample.Sampler SamplerOpts sample.Options
}
type TextCompletionsRequest struct {
Prompt string `json:"prompt"`
Options struct {
Temperature float32 `json:"temperature"`
TopP float32 `json:"top_p"`
MinP float32 `json:"min_p"`
TopK int `json:"top_k"`
RepeatLastN int `json:"repeat_last_n"`
RepeatPenalty float32 `json:"repeat_penalty"`
PresencePenalty float32 `json:"presence_penalty"`
FrequencyPenalty float32 `json:"frequency_penalty"`
MaxTokens int `json:"max_tokens"`
// Deprecated: use MaxTokens instead
NumPredict int `json:"num_predict"`
} `json:"options"`
} }
type Runner struct { type Runner struct {
Model base.Model Model base.Model
Tokenizer *tokenizer.Tokenizer Tokenizer *tokenizer.Tokenizer
Requests chan Request Requests chan Request
Sampler *sample.Sampler
cache kvCache cache kvCache
contextLength int contextLength int
} }
@@ -81,6 +68,7 @@ func (r *Runner) Load(modelName string) error {
r.Model = m r.Model = m
r.Tokenizer = m.Tokenizer() r.Tokenizer = m.Tokenizer()
r.contextLength = m.MaxContextLength() r.contextLength = m.MaxContextLength()
r.Sampler = sample.New(r.contextLength)
mlx.EnableCompile() mlx.EnableCompile()
return nil return nil
@@ -149,7 +137,7 @@ func (r *Runner) Run(host, port string, mux http.Handler) error {
case <-ctx.Done(): case <-ctx.Done():
return nil return nil
case request := <-r.Requests: case request := <-r.Requests:
if err := request.Pipeline(request); err != nil { if err := request.Pipeline(request.Ctx, request); err != nil {
slog.Info("Request terminated", "error", err) slog.Info("Request terminated", "error", err)
var statusErr api.StatusError var statusErr api.StatusError
if !errors.As(err, &statusErr) { if !errors.As(err, &statusErr) {

View File

@@ -0,0 +1,286 @@
//go:build mlx
package sample
import (
"math"
"sort"
"testing"
"github.com/ollama/ollama/x/mlxrunner/mlx"
)
// logprobEntry is the (token id, logprob) pair returned by the sampler's
// top-K extraction, used after the test-side descending sort.
type logprobEntry struct {
id int
logprob float64
}
// runSampleLogprobs drives Sample on a fresh Sampler configured for logprobs
// and returns the greedily-sampled token id, its logprob, and the top-K
// entries sorted descending by logprob. Logits must be a [vocab]-shaped
// slice; the helper reshapes it to [1, vocab] before calling the sampler.
func runSampleLogprobs(t *testing.T, logits []float32, topK int) (int, float64, []logprobEntry) {
t.Helper()
s := New(128)
defer func() {
s.Free()
mlx.Sweep()
}()
s.Add(0, Options{Logprobs: true, TopLogprobs: topK}, nil)
tensor := mlx.FromValues(logits, 1, len(logits))
res := s.Sample([]int{0}, tensor)
mlx.Pin(res.Arrays()...)
defer mlx.Unpin(res.Arrays()...)
mlx.Sweep()
mlx.Eval(res.Arrays()...)
selected := res.Token.Int()
selLP := float64(res.Logprob.Floats()[0])
var top []logprobEntry
if topK > 0 && res.TopTokens != nil {
ids := res.TopTokens.Ints()
vals := res.TopLogprobs.Floats()
top = make([]logprobEntry, len(ids))
for i, id := range ids {
top[i] = logprobEntry{id: id, logprob: float64(vals[i])}
}
sort.Slice(top, func(i, j int) bool { return top[i].logprob > top[j].logprob })
}
return selected, selLP, top
}
func TestSampleLogprobsBasic(t *testing.T) {
tests := []struct {
name string
logits []float32
topK int
wantSelectedID int
wantTopLen int
}{
{
name: "single token without top logprobs",
logits: []float32{1.0, 0.5, 0.3, 0.1},
topK: 0,
wantSelectedID: 0,
wantTopLen: 0,
},
{
name: "single token with top logprobs",
logits: []float32{1.0, 0.5, 0.3, 0.1},
topK: 3,
wantSelectedID: 0,
wantTopLen: 3,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
selected, _, top := runSampleLogprobs(t, tt.logits, tt.topK)
if selected != tt.wantSelectedID {
t.Errorf("selected = %d, want %d", selected, tt.wantSelectedID)
}
if len(top) != tt.wantTopLen {
t.Errorf("top-K length = %d, want %d", len(top), tt.wantTopLen)
}
})
}
}
func TestSampleLogprobsNumericalStability(t *testing.T) {
logits := []float32{1000.0, 999.0, 998.0}
_, selLP, top := runSampleLogprobs(t, logits, 3)
if math.IsInf(selLP, 0) || math.IsNaN(selLP) {
t.Errorf("selected logprob is not finite: %f", selLP)
}
for i, e := range top {
if math.IsInf(e.logprob, 0) || math.IsNaN(e.logprob) {
t.Errorf("top[%d] logprob is not finite: %f", i, e.logprob)
}
}
for i := 1; i < len(top); i++ {
if top[i].logprob > top[i-1].logprob {
t.Errorf("top logprobs not descending: %f > %f", top[i].logprob, top[i-1].logprob)
}
}
}
func TestSampleLogprobsProbabilityCorrectness(t *testing.T) {
tests := []struct {
name string
logits []float32
}{
{"uniform", []float32{1.0, 1.0, 1.0, 1.0}},
{"different", []float32{2.0, 1.0, 0.5, 0.1}},
{"negative", []float32{-1.0, -2.0, -3.0, -4.0}},
{"mixed", []float32{5.0, -5.0, 0.0, 2.5}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
selected, selLP, top := runSampleLogprobs(t, tt.logits, len(tt.logits))
if selLP > 0 {
t.Errorf("selected logprob should be <= 0, got %f", selLP)
}
for i, e := range top {
if e.logprob > 0 {
t.Errorf("top[%d] logprob should be <= 0, got %f", i, e.logprob)
}
}
if tt.name == "uniform" {
want := 1.0 / float64(len(tt.logits))
got := math.Exp(selLP)
if math.Abs(got-want) > 1e-6 {
t.Errorf("uniform logits: selected prob = %f, want %f", got, want)
}
}
for i := 1; i < len(top); i++ {
if top[i].logprob > top[i-1].logprob {
t.Errorf("top logprobs not descending at %d: %f > %f",
i, top[i].logprob, top[i-1].logprob)
}
}
found := false
for _, e := range top {
if e.id == selected {
found = true
if math.Abs(e.logprob-selLP) > 1e-6 {
t.Errorf("selected logprob mismatch: selLP=%f top=%f", selLP, e.logprob)
}
break
}
}
if !found {
t.Errorf("selected token %d not present in top-K", selected)
}
})
}
}
func TestSampleLogprobsSoftmaxCorrectness(t *testing.T) {
tests := []struct {
name string
logits []float32
}{
{"small vocabulary", []float32{1.0, 2.0, 3.0}},
{"large differences", []float32{10.0, 0.0, -10.0}},
{"all equal", []float32{5.0, 5.0, 5.0, 5.0, 5.0}},
{"very large values", []float32{500.0, 499.0, 498.0}},
{"very small values", []float32{-500.0, -499.0, -498.0}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, _, top := runSampleLogprobs(t, tt.logits, len(tt.logits))
if len(top) != len(tt.logits) {
t.Fatalf("top-K length = %d, want %d", len(top), len(tt.logits))
}
var sum float64
for _, e := range top {
p := math.Exp(e.logprob)
if p < 0 || p > 1 {
t.Errorf("token %d: probability %f out of [0,1]", e.id, p)
}
sum += p
}
if math.Abs(sum-1.0) > 1e-5 {
t.Errorf("probabilities sum = %f, want 1.0", sum)
}
})
}
}
func TestSampleLogprobsSelectedTokenCorrectness(t *testing.T) {
logits := []float32{3.0, 1.0, 2.0, 0.5}
maxIdx := 0
for i, v := range logits[1:] {
if v > logits[maxIdx] {
maxIdx = i + 1
}
}
selected, selLP, top := runSampleLogprobs(t, logits, len(logits))
if selected != maxIdx {
t.Errorf("selected = %d, want argmax %d", selected, maxIdx)
}
if top[0].id != maxIdx {
t.Errorf("top[0].id = %d, want argmax %d", top[0].id, maxIdx)
}
if math.Abs(top[0].logprob-selLP) > 1e-6 {
t.Errorf("top[0].logprob = %f, want selected %f", top[0].logprob, selLP)
}
}
// TestBatchedLogprobsPerRow verifies that per-row logprobs in a batched
// sample call match the per-slot reference. The numerically-stable softmax
// must reduce along the last axis only, not over the whole batch.
func TestBatchedLogprobsPerRow(t *testing.T) {
rowA := []float32{2, 1, 0}
rowB := []float32{0, 5, 0}
_, wantA, _ := runSampleLogprobs(t, rowA, 0)
_, wantB, _ := runSampleLogprobs(t, rowB, 0)
s := New(128)
t.Cleanup(func() {
s.Free()
mlx.Sweep()
})
s.Add(1, Options{Logprobs: true}, nil)
s.Add(2, Options{Logprobs: true}, nil)
logits := mlx.FromValues(append(append([]float32{}, rowA...), rowB...), 2, 3)
res := s.Sample([]int{1, 2}, logits)
mlx.Pin(res.Arrays()...)
t.Cleanup(func() { mlx.Unpin(res.Arrays()...) })
mlx.Eval(res.Arrays()...)
got := res.Logprob.Floats()
if len(got) != 2 {
t.Fatalf("Logprob length = %d, want 2", len(got))
}
if math.Abs(float64(got[0])-wantA) > 1e-5 {
t.Errorf("row 0 logprob = %f, want %f (per-slot reference)", got[0], wantA)
}
if math.Abs(float64(got[1])-wantB) > 1e-5 {
t.Errorf("row 1 logprob = %f, want %f (per-slot reference)", got[1], wantB)
}
}
func TestSampleLogprobsTopKOrdering(t *testing.T) {
// Logits chosen so argmax order differs from index order.
logits := []float32{2.0, 5.0, 1.0, 4.0, 3.0}
wantOrder := []int{1, 3, 4, 0, 2}
_, _, top := runSampleLogprobs(t, logits, len(logits))
if len(top) != len(wantOrder) {
t.Fatalf("top-K length = %d, want %d", len(top), len(wantOrder))
}
for i, e := range top {
if e.id != wantOrder[i] {
t.Errorf("top[%d].id = %d, want %d", i, e.id, wantOrder[i])
}
}
for i := 1; i < len(top); i++ {
if top[i].logprob > top[i-1].logprob {
t.Errorf("top[%d].logprob (%f) > top[%d].logprob (%f)",
i, top[i].logprob, i-1, top[i-1].logprob)
}
}
}

View File

@@ -1,14 +1,14 @@
package sample package sample
import ( import (
"fmt"
"math" "math"
"slices"
"github.com/ollama/ollama/x/mlxrunner/mlx" "github.com/ollama/ollama/x/mlxrunner/mlx"
) )
type Transform func(*Sampler, *mlx.Array) *mlx.Array type Options struct {
type Sampler struct {
Temperature float32 Temperature float32
TopP float32 TopP float32
MinP float32 MinP float32
@@ -18,198 +18,557 @@ type Sampler struct {
PresencePenalty float32 PresencePenalty float32
FrequencyPenalty float32 FrequencyPenalty float32
history *mlx.Array // Logprobs causes Sample to populate Result.Logprob with the selected
// token's log-probability. TopLogprobs (when > 0) adds top-K pairs.
Logprobs bool
TopLogprobs int
}
// Result bundles the outputs of one decode step. Logprob/TopTokens/
// TopLogprobs are populated whenever any registered slot has Logprobs
// (respectively TopLogprobs>0). Consumers need to filter by their
// per-slot Options.
type Result struct {
Token *mlx.Array // sampled token ids, shape [B]
Logprob *mlx.Array // sampled-token logprobs, shape [B,1]; nil unless any registered slot has Logprobs
TopTokens *mlx.Array // top-K token ids, shape [B,maxK]; nil unless any registered slot has TopLogprobs>0
TopLogprobs *mlx.Array // top-K logprobs, shape [B,maxK]; same
}
// Arrays returns the tensor fields as a slice so callers can drive the mlx
// lifecycle verbs (Pin, Unpin, Eval, AsyncEval) over the whole group. Unset
// fields stay nil; the mlx helpers skip them.
func (r Result) Arrays() []*mlx.Array {
return []*mlx.Array{r.Token, r.Logprob, r.TopTokens, r.TopLogprobs}
}
// Sampler is a batched, slot-based sampler. Sequences are registered with
// Add and released with Remove. Each Sample call takes a subset of
// registered slots (in any order) with their [B,V] logits, samples one
// token per row, and appends it to that slot's ring-buffer history. Slots
// not named in a given call are untouched.
type Sampler struct {
slots []*slotState
byID map[int]*slotState
// history is the pooled ring-buffer storage, [B, W] int32. Row i
// belongs to slots[i]; W is max(RepeatLastN) across penalty slots.
// Allocated on the first penalty slot, rebuilt only in Add/Remove.
history *mlx.Array
// allSameOpts: every registered slot shares Options. When true the
// canonical shared value is s.slots[0].opts.
allSameOpts bool
// anyLogprobs / maxTopLogprobs: compute-for-all output config.
// Sample populates Logprob (and Top* when maxTopLogprobs>0) whenever
// any registered slot requests them, even if that slot isn't in the
// current call.
anyLogprobs bool
maxTopLogprobs int
// numCtx is the runner's context window; normalize uses it to
// resolve the repeat_last_n == -1 sentinel.
numCtx int
}
type slotState struct {
opts Options
transforms []transform
historyLen int historyLen int
transforms []Transform
} }
func New(temp, top_p, min_p float32, top_k, repeatLastN int, repeatPenalty, presencePenalty, frequencyPenalty float32) *Sampler { type slotCtx struct {
if repeatPenalty <= 0 { opts Options
repeatPenalty = 1 history *mlx.Array // 2D [B, W] when penalties are configured; nil otherwise
}
type transform func(*slotCtx, *mlx.Array) *mlx.Array
// New constructs an empty sampler with no registered slots. numCtx is
// the runner's context window and must be positive.
func New(numCtx int) *Sampler {
return &Sampler{
byID: make(map[int]*slotState),
allSameOpts: true,
numCtx: numCtx,
}
}
// historyWidth returns the column count of the pooled history tensor,
// or 0 when no penalty slot has forced it to be allocated.
func (s *Sampler) historyWidth() int {
if s.history == nil {
return 0
}
return s.history.Dim(1)
}
func (o Options) usesHistory() bool {
// RepeatLastN == 0 disables the penalty ring per the repeat_last_n API
// contract (0 = disabled), overriding any penalty coefficients.
if o.RepeatLastN == 0 {
return false
}
return o.RepeatPenalty != 1 || o.PresencePenalty != 0 || o.FrequencyPenalty != 0
}
func (o Options) normalize(numCtx int) Options {
if o.RepeatPenalty <= 0 {
o.RepeatPenalty = 1
}
// Resolve the repeat_last_n == -1 sentinel ("-1 = num_ctx") against
// the caller's context window.
if o.RepeatLastN < 0 {
o.RepeatLastN = numCtx
}
if !o.usesHistory() {
// Zero the ring capacity so slots that differ only in a spurious
// RepeatLastN still batch together and don't inflate pool width.
o.RepeatLastN = 0
}
return o
}
func (o Options) buildTransforms() []transform {
var ts []transform
if o.usesHistory() {
ts = append(ts, penalty)
} }
s := &Sampler{ hasTopP := o.TopP > 0 && o.TopP < 1
Temperature: temp, hasTopK := o.TopK > 0
TopP: top_p, switch {
MinP: min_p, case hasTopP:
TopK: top_k, // topKTopP always does a full descending sort for the top-P
RepeatLastN: repeatLastN, // cumulative mask and opportunistically masks top-K during the
RepeatPenalty: repeatPenalty, // same pass when it is also configured.
PresencePenalty: presencePenalty, ts = append(ts, topKTopP)
FrequencyPenalty: frequencyPenalty, case hasTopK:
// Argpartition (partial sort) is cheaper than a full sort.
ts = append(ts, topK)
} }
var transforms []Transform if o.MinP != 0 {
if s.usesHistory() { ts = append(ts, minP)
transforms = append(transforms, penalty)
} }
if top_p > 0 && top_p < 1 { if o.Temperature == 0 {
transforms = append(transforms, topP) ts = append(ts, greedy)
}
if min_p != 0 {
transforms = append(transforms, minP)
}
if top_k > 0 {
transforms = append(transforms, topK)
}
if temp == 0 {
transforms = append(transforms, greedy)
} else { } else {
transforms = append(transforms, temperature) ts = append(ts, temperature)
} }
return ts
s.transforms = transforms
return s
} }
func (s *Sampler) usesHistory() bool { // Add registers a sequence under seqID. The last RepeatLastN entries of
return s.RepeatPenalty != 1 || s.PresencePenalty != 0 || s.FrequencyPenalty != 0 // priorTokens seed the ring buffer.
} func (s *Sampler) Add(seqID int, opts Options, priorTokens []int32) {
if _, dup := s.byID[seqID]; dup {
func (s *Sampler) setHistory(history *mlx.Array, historyLen int) { panic(fmt.Sprintf("sample.Sampler.Add: seqID %d already registered", seqID))
if history != nil {
mlx.Pin(history)
} }
if s.history != nil {
opts = opts.normalize(s.numCtx)
slot := &slotState{
opts: opts,
transforms: opts.buildTransforms(),
}
// Grow the pool to hold this slot's row. The pool is lazy — the first
// penalty slot allocates it — and thereafter every registered slot
// gets a row (rows for non-penalty slots are zero and never read).
// Invariant: s.history is pinned whenever non-nil.
if s.history != nil || opts.usesHistory() {
targetWidth := max(opts.RepeatLastN, s.historyWidth())
newRow := makeHistoryRow(priorTokens, opts.RepeatLastN, targetWidth)
var pool *mlx.Array
switch {
case s.history == nil && len(s.slots) == 0:
pool = newRow
case s.history == nil:
// First penalty slot with non-penalty slots already registered;
// seed zero rows so s.slots and pool row indices stay aligned.
zeros := mlx.Zeros(mlx.DTypeInt32, len(s.slots), targetWidth)
pool = zeros.Concatenate(0, newRow)
case targetWidth > s.historyWidth():
pad := mlx.Zeros(mlx.DTypeInt32, s.history.Dim(0), targetWidth-s.historyWidth())
pool = s.history.Concatenate(1, pad).Concatenate(0, newRow)
default:
pool = s.history.Concatenate(0, newRow)
}
mlx.Pin(pool)
mlx.Unpin(s.history) mlx.Unpin(s.history)
s.history = pool
if opts.usesHistory() {
// Cap on seed so the next write's ring position
// (historyLen % RepeatLastN) lands at 0, overwriting the
// oldest entry when the ring was filled from priors.
slot.historyLen = min(len(priorTokens), opts.RepeatLastN)
}
} }
s.history = history
s.historyLen = historyLen s.slots = append(s.slots, slot)
s.byID[seqID] = slot
s.recomputeInvariants()
} }
func (s *Sampler) ResetHistory(history []int32) { // makeHistoryRow builds a [1, width] int32 row with the last repeatLastN
if !s.usesHistory() { // entries of priorTokens packed into [0, min(len, repeatLastN)), zeros
// elsewhere.
func makeHistoryRow(priorTokens []int32, repeatLastN, width int) *mlx.Array {
take := min(len(priorTokens), repeatLastN)
if take <= 0 {
return mlx.Zeros(mlx.DTypeInt32, 1, width)
}
row := make([]int32, width)
copy(row, priorTokens[len(priorTokens)-take:])
return mlx.NewArrayInt32(row, []int32{1, int32(width)})
}
// recomputeInvariants refreshes allSameOpts and anyLogprobs/maxTopLogprobs
// from s.slots. Called at the end of Add and Remove.
func (s *Sampler) recomputeInvariants() {
if len(s.slots) == 0 {
s.allSameOpts = true
s.anyLogprobs = false
s.maxTopLogprobs = 0
return return
} }
if s.RepeatLastN > 0 && len(history) > s.RepeatLastN { first := s.slots[0].opts
history = history[len(history)-s.RepeatLastN:] s.allSameOpts = true
s.anyLogprobs = false
s.maxTopLogprobs = 0
for _, slot := range s.slots {
if slot.opts != first {
s.allSameOpts = false
}
if slot.opts.Logprobs {
s.anyLogprobs = true
if slot.opts.TopLogprobs > s.maxTopLogprobs {
s.maxTopLogprobs = slot.opts.TopLogprobs
}
}
} }
if len(history) == 0 { }
s.setHistory(nil, 0)
// Remove releases the slot. The pool tensor is rebuilt to drop the row.
func (s *Sampler) Remove(seqID int) {
slot, ok := s.byID[seqID]
if !ok {
return
}
delete(s.byID, seqID)
row := slices.Index(s.slots, slot)
s.slots = slices.Delete(s.slots, row, row+1)
s.recomputeInvariants()
if s.history == nil {
return return
} }
tokens := append([]int32(nil), history...) n := s.history.Dim(0)
s.setHistory(mlx.NewArrayInt32(tokens, []int32{int32(len(tokens))}), len(tokens)) var newHistory *mlx.Array
} switch {
case n == 1:
func (s *Sampler) AppendToken(token *mlx.Array) { newHistory = nil
if !s.usesHistory() || token == nil { case row == 0:
return newHistory = s.history.Slice(mlx.Slice(1, n), mlx.Slice())
} case row == n-1:
newHistory = s.history.Slice(mlx.Slice(0, row), mlx.Slice())
next := token.AsType(mlx.DTypeInt32) default:
nextLen := next.Size() before := s.history.Slice(mlx.Slice(0, row), mlx.Slice())
after := s.history.Slice(mlx.Slice(row+1, n), mlx.Slice())
if s.history != nil && s.historyLen > 0 { newHistory = before.Concatenate(0, after)
next = s.history.Concatenate(0, next) }
nextLen += s.historyLen
} mlx.Pin(newHistory)
mlx.Unpin(s.history)
if s.RepeatLastN > 0 && nextLen > s.RepeatLastN { s.history = newHistory
trim := nextLen - s.RepeatLastN
next = next.Slice(mlx.Slice(trim, nextLen))
nextLen = s.RepeatLastN
}
s.setHistory(next, nextLen)
} }
// Free releases the pooled history tensor and resets the sampler to the
// New-equivalent state so it may be reused.
func (s *Sampler) Free() { func (s *Sampler) Free() {
s.setHistory(nil, 0) mlx.Unpin(s.history)
} *s = Sampler{
byID: make(map[int]*slotState),
func (s *Sampler) Sample(logits *mlx.Array) *mlx.Array { allSameOpts: true,
for _, transform := range s.transforms { numCtx: s.numCtx,
logits = transform(s, logits)
} }
return logits
} }
func greedy(_ *Sampler, logits *mlx.Array) *mlx.Array { // Sample draws one token per row of logits ([B,V]); seqIDs[i] names the
return logits.Argmax(-1, false) // slot whose logits live at row i. Each sampled token is appended to its
} // slot's ring. Slots not named in seqIDs are untouched.
func (s *Sampler) Sample(seqIDs []int, logits *mlx.Array) Result {
func temperature(s *Sampler, logits *mlx.Array) *mlx.Array { if len(seqIDs) == 0 {
return mlx.DivScalar(logits, s.Temperature).Categorical(-1) return Result{}
}
func topP(s *Sampler, logits *mlx.Array) *mlx.Array {
if s.TopP <= 0 || s.TopP >= 1 {
return logits
} }
order := logits.Negative().ArgsortAxis(-1) slots := make([]*slotState, len(seqIDs))
sortedLogits := logits.TakeAlongAxis(order, -1) for i, id := range seqIDs {
sortedProbs := mlx.SoftmaxAxis(sortedLogits, -1, true) slot, ok := s.byID[id]
prevCumProbs := sortedProbs.Cumsum(-1, false, true).Subtract(sortedProbs) if !ok {
keep := prevCumProbs.Less(mlx.FromValue(s.TopP)) panic(fmt.Sprintf("sample.Sampler.Sample: seqID %d not registered", id))
filtered := mlx.Where(keep, sortedLogits, mlx.FromValue(float32(math.Inf(-1)))) }
return logits.PutAlongAxis(order, filtered, -1) slots[i] = slot
}
func minP(s *Sampler, logits *mlx.Array) *mlx.Array {
if s.MinP <= 0 || s.MinP > 1 {
return logits
} }
maxLogits := logits.TakeAlongAxis(logits.Argmax(-1, true), -1) var token *mlx.Array
minLogits := mlx.AddScalar(maxLogits, float32(math.Log(float64(s.MinP)))) if opts0, ok := s.canBatch(slots); ok {
token = s.sampleTokensUniform(slots, opts0, logits)
} else {
token = s.sampleTokensSerial(slots, logits)
}
res := Result{Token: token}
if s.anyLogprobs {
// Log-softmax over original logits so every row holds a truthful
// value (compute-for-all; consumers filter per-slot). Subtract
// max first for numerical stability in the logsumexp.
lp := logits.AsType(mlx.DTypeFloat32)
lp = lp.Subtract(lp.MaxAxis(-1, true))
lp = lp.Subtract(lp.LogsumexpAxis(-1, true))
res.Logprob = lp.TakeAlongAxis(token.ExpandDims(-1), -1)
if s.maxTopLogprobs > 0 {
k := s.maxTopLogprobs
if vocab := lp.Dim(lp.NumDims() - 1); k > vocab {
k = vocab
}
// Argpartition on the negated values places the K largest
// (unsorted) in positions [0:K].
idx := lp.Negative().ArgpartitionAxis(k-1, -1).Slice(mlx.Slice(), mlx.Slice(0, k))
res.TopTokens = idx.AsType(mlx.DTypeInt32)
res.TopLogprobs = lp.TakeAlongAxis(idx, -1)
}
}
return res
}
// canBatch reports whether the call can take the uniform batched path.
// All slots must share Options; when penalties are active the call must
// additionally cover every registered slot in registration order with a
// full ring, because the uniform path indexes the pool positionally.
func (s *Sampler) canBatch(slots []*slotState) (Options, bool) {
if !s.allSameOpts {
return Options{}, false
}
// slots is non-empty (Sample guards) and every slot is registered,
// so s.slots[0].opts is the canonical shared value.
shared := s.slots[0].opts
if !shared.usesHistory() {
return shared, true
}
if len(slots) != len(s.slots) {
return Options{}, false
}
for i, slot := range slots {
if s.slots[i] != slot || slot.historyLen < shared.RepeatLastN {
return Options{}, false
}
}
return shared, true
}
// sampleTokensUniform runs one fused transform pass over the whole batch.
// Reached only when canBatch is true, which lets the pool be used in place
// with a single PutAlongAxis write-back and no gather.
func (s *Sampler) sampleTokensUniform(slots []*slotState, opts Options, logits *mlx.Array) *mlx.Array {
B := len(slots)
var hist *mlx.Array
if opts.usesHistory() {
hist = s.history
if s.historyWidth() > opts.RepeatLastN {
hist = hist.Slice(mlx.Slice(), mlx.Slice(0, opts.RepeatLastN))
}
}
ctx := &slotCtx{opts: opts, history: hist}
scores := logits
for _, t := range slots[0].transforms {
scores = t(ctx, scores)
}
token := scores
if !opts.usesHistory() {
return token
}
writeIdxData := make([]int32, B)
for i, slot := range slots {
writeIdxData[i] = int32(slot.historyLen % opts.RepeatLastN)
slot.historyLen++
}
writeIdx := mlx.NewArrayInt32(writeIdxData, []int32{int32(B), 1})
s.history.Set(s.history.PutAlongAxis(writeIdx, token.ExpandDims(-1), 1))
return token
}
// sampleTokensSerial runs each slot's transforms against its own row of
// logits.
func (s *Sampler) sampleTokensSerial(slots []*slotState, logits *mlx.Array) *mlx.Array {
perSlotTokens := make([]*mlx.Array, len(slots))
rowOf := make(map[*slotState]int, len(s.slots))
for i, slot := range s.slots {
rowOf[slot] = i
}
for i, slot := range slots {
row := logits.Slice(mlx.Slice(i, i+1), mlx.Slice())
var hist *mlx.Array
if slot.opts.usesHistory() && slot.historyLen > 0 && s.history != nil {
poolRow := rowOf[slot]
fill := min(slot.historyLen, slot.opts.RepeatLastN)
hist = s.history.Slice(
mlx.Slice(poolRow, poolRow+1),
mlx.Slice(0, fill),
)
}
ctx := &slotCtx{opts: slot.opts, history: hist}
scores := row
for _, t := range slot.transforms {
scores = t(ctx, scores)
}
perSlotTokens[i] = scores
}
token := mlx.Concatenate(perSlotTokens, 0)
if s.history != nil {
// For each writing slot collect its flat (row-major) pool offset
// and the call-order position of its token. One PutAlongAxis on a
// flat view of the pool scatters all writes in a single op.
flatOffsets := make([]int32, 0, len(slots))
tokenPos := make([]int32, 0, len(slots))
for i, slot := range slots {
if !slot.opts.usesHistory() {
continue
}
ringPos := slot.historyLen % slot.opts.RepeatLastN
flatOffsets = append(flatOffsets, int32(rowOf[slot]*s.historyWidth()+ringPos))
tokenPos = append(tokenPos, int32(i))
slot.historyLen++
}
if len(flatOffsets) > 0 {
m := len(flatOffsets)
flatIdx := mlx.NewArrayInt32(flatOffsets, []int32{int32(m), 1})
writingTokens := token
if m != len(slots) {
tokenPosIdx := mlx.NewArrayInt32(tokenPos, []int32{int32(m)})
writingTokens = token.TakeAxis(tokenPosIdx, 0)
}
flatHist := s.history.Reshape(s.history.Dim(0)*s.historyWidth(), 1)
s.history.Set(flatHist.PutAlongAxis(flatIdx, writingTokens.ExpandDims(-1), 0).Reshape(s.history.Dim(0), s.historyWidth()))
}
}
return token
}
func greedy(_ *slotCtx, scores *mlx.Array) *mlx.Array {
return scores.Argmax(-1, false).AsType(mlx.DTypeInt32)
}
func temperature(ctx *slotCtx, scores *mlx.Array) *mlx.Array {
return mlx.DivScalar(scores, ctx.opts.Temperature).Categorical(-1).AsType(mlx.DTypeInt32)
}
// topKTopP applies top-P in a descending sort pass and, when top-K is also
// configured, masks any surviving value below the K-th largest in the same
// pass. Callers dispatch here whenever top-P is enabled — the top-K-only case
// uses a cheaper partial sort via the topK transform.
func topKTopP(ctx *slotCtx, scores *mlx.Array) *mlx.Array {
vocab := scores.Dim(scores.NumDims() - 1)
applyTopK := ctx.opts.TopK > 0 && ctx.opts.TopK < vocab
order := scores.Negative().ArgsortAxis(-1)
sorted := scores.TakeAlongAxis(order, -1)
negInf := mlx.FromValue(float32(math.Inf(-1)))
// Top-P: in descending order, keep tokens whose exclusive cumulative
// probability is still below TopP.
probs := mlx.SoftmaxAxis(sorted, -1, true)
prevCumProbs := probs.Cumsum(-1, false, true).Subtract(probs)
keep := prevCumProbs.Less(mlx.FromValue(ctx.opts.TopP))
sorted = mlx.Where(keep, sorted, negInf)
out := scores.PutAlongAxis(order, sorted, -1)
// Top-K: sorted is already in descending order, so positions [K, V) are
// the ones to drop. Scatter -inf through their original-layout indices
// (order[K:]). Positional (not value-based) so exactly K tokens survive —
// ties at the K-th logit get broken by the sort order rather than
// promoted through the filter.
if applyTopK {
dropOrder := order.Slice(mlx.Slice(), mlx.Slice(ctx.opts.TopK, mlx.End))
out = out.PutAlongAxis(dropOrder, negInf, -1)
}
return out
}
func minP(ctx *slotCtx, scores *mlx.Array) *mlx.Array {
if ctx.opts.MinP <= 0 || ctx.opts.MinP > 1 {
return scores
}
maxScore := scores.MaxAxis(-1, true)
threshold := mlx.AddScalar(maxScore, float32(math.Log(float64(ctx.opts.MinP))))
return mlx.Where( return mlx.Where(
logits.Less(minLogits), scores.Less(threshold),
mlx.FromValue(float32(math.Inf(-1))), mlx.FromValue(float32(math.Inf(-1))),
logits, scores,
) )
} }
func topK(s *Sampler, logits *mlx.Array) *mlx.Array { func topK(ctx *slotCtx, scores *mlx.Array) *mlx.Array {
if s.TopK <= 0 { if ctx.opts.TopK <= 0 {
return logits return scores
}
vocab := scores.Dim(scores.NumDims() - 1)
if ctx.opts.TopK >= vocab {
return scores
} }
vocab := logits.Dim(logits.NumDims() - 1) mask := scores.Negative().ArgpartitionAxis(ctx.opts.TopK-1, -1).Slice(mlx.Slice(), mlx.Slice(ctx.opts.TopK, mlx.End))
if s.TopK >= vocab { return scores.PutAlongAxis(mask, mlx.FromValue(float32(math.Inf(-1))), -1)
return logits
}
mask := logits.Negative().ArgpartitionAxis(s.TopK-1, -1).Slice(mlx.Slice(), mlx.Slice(s.TopK, mlx.End))
return logits.PutAlongAxis(mask, mlx.FromValue(float32(math.Inf(-1))), -1)
} }
func penalty(s *Sampler, logits *mlx.Array) *mlx.Array { func penalty(ctx *slotCtx, scores *mlx.Array) *mlx.Array {
if s.historyLen == 0 { tokenIndices := ctx.history
return logits if tokenIndices == nil {
return scores
} }
tokenIndices := s.history if ctx.opts.RepeatPenalty != 1 || ctx.opts.PresencePenalty != 0 {
if logits.NumDims() > 1 { adjusted := scores.TakeAlongAxis(tokenIndices, -1)
tokenIndices = tokenIndices.ExpandDims(0) if ctx.opts.RepeatPenalty != 1 {
}
if s.RepeatPenalty != 1 || s.PresencePenalty != 0 {
adjusted := logits.TakeAlongAxis(tokenIndices, -1)
if s.RepeatPenalty != 1 {
factor := mlx.Where( factor := mlx.Where(
adjusted.Less(mlx.FromValue(float32(0))), adjusted.Less(mlx.FromValue(float32(0))),
mlx.FromValue(s.RepeatPenalty), mlx.FromValue(ctx.opts.RepeatPenalty),
mlx.FromValue(1/s.RepeatPenalty), mlx.FromValue(1/ctx.opts.RepeatPenalty),
) )
adjusted = adjusted.Multiply(factor) adjusted = adjusted.Multiply(factor)
} }
if s.PresencePenalty != 0 { if ctx.opts.PresencePenalty != 0 {
adjusted = mlx.AddScalar(adjusted, -s.PresencePenalty) adjusted = mlx.AddScalar(adjusted, -ctx.opts.PresencePenalty)
} }
logits = logits.PutAlongAxis(tokenIndices, adjusted, -1) scores = scores.PutAlongAxis(tokenIndices, adjusted, -1)
} }
if s.FrequencyPenalty != 0 { if ctx.opts.FrequencyPenalty != 0 {
logits = logits.ScatterAddAxis(tokenIndices, mlx.FromValue(-s.FrequencyPenalty), -1) scores = scores.ScatterAddAxis(tokenIndices, mlx.FromValue(-ctx.opts.FrequencyPenalty), -1)
} }
return logits return scores
} }

View File

@@ -9,94 +9,283 @@ import (
"github.com/ollama/ollama/x/mlxrunner/mlx" "github.com/ollama/ollama/x/mlxrunner/mlx"
) )
func TestPresencePenaltyUsesAppendedTokenImmediately(t *testing.T) { // slotLogits builds a [1, V] logits tensor for a single-slot Sample call.
// RepeatLastN = 1, PresencePenalty = 6 func slotLogits(values []float32) *mlx.Array {
s := New(0, 0, 0, 0, 1, 1, 6, 0) return mlx.FromValues(values, 1, len(values))
defer func() { }
// batchLogits stacks per-row float32 slices of equal length into a [B, V]
// logits tensor.
func batchLogits(rows ...[]float32) *mlx.Array {
v := len(rows[0])
flat := make([]float32, 0, len(rows)*v)
for _, r := range rows {
if len(r) != v {
panic("batchLogits: rows must share vocab size")
}
flat = append(flat, r...)
}
return mlx.FromValues(flat, len(rows), v)
}
// sampleOne runs Sample on a freshly-added single slot and returns the
// sampled token id. Used both for the single-slot options table and as the
// reference oracle for the batched-equivalence test.
func sampleOne(t *testing.T, opts Options, priorTokens []int32, values []float32) int {
t.Helper()
s := New(128)
t.Cleanup(func() {
s.Free() s.Free()
mlx.Sweep() mlx.Sweep()
}() })
s.Add(0, opts, priorTokens)
s.ResetHistory([]int32{0}) got := s.Sample([]int{0}, slotLogits(values)).Token
s.AppendToken(mlx.NewArrayInt32([]int32{1}, []int32{1}))
logits := mlx.FromValues([]float32{0, 5, 4}, 3)
got := s.Sample(logits)
mlx.Eval(got) mlx.Eval(got)
return got.Int()
}
// logits will be [0, -1, 4] after the penalty // logOf returns log(p) as a float32 so tests can build logits that softmax to
// and then (index) 2 after the greedy sampler // a chosen probability distribution.
gotInt := got.Int() func logOf(p float64) float32 { return float32(math.Log(p)) }
if gotInt != 2 {
t.Fatalf("got %d, want 2", gotInt) // TestSampleSingleSlotOptions pins the per-slot behavior of each Options
// knob against a concrete expected token. Expected values are worked out by
// hand from the math of each transform, not from a second call into the
// sampler — so a regression in any single transform shows up here.
func TestSampleSingleSlotOptions(t *testing.T) {
cases := []struct {
name string
opts Options
priors []int32
logits []float32
want int
}{
{
name: "presence penalty",
opts: Options{RepeatLastN: 1, PresencePenalty: 6},
priors: []int32{1},
logits: []float32{0, 5, 4},
want: 2, // token 1: 5 - 6 = -1, argmax shifts to 2
},
{
name: "repeat penalty on positive logits",
opts: Options{RepeatLastN: 1, RepeatPenalty: 2},
priors: []int32{1},
logits: []float32{0, 5, 4},
want: 2, // token 1 positive → divided: 5/2 = 2.5, argmax shifts to 2
},
{
name: "repeat penalty on negative logits",
opts: Options{RepeatLastN: 1, RepeatPenalty: 4},
priors: []int32{1},
logits: []float32{-5, -1, -3},
want: 2, // token 1 negative → multiplied: -1*4 = -4, argmax shifts to 2
},
{
name: "frequency penalty",
opts: Options{RepeatLastN: 4, FrequencyPenalty: 2},
priors: []int32{1, 1},
logits: []float32{0, 5, 4},
want: 2, // 5 - 2*count(1)=2*2=4 → 1, argmax shifts to 2
},
{
name: "top-k",
opts: Options{Temperature: 1, TopK: 1},
logits: []float32{1, 5, 4},
want: 1, // only argmax survives → deterministic even with temperature
},
{
name: "top-p",
opts: Options{Temperature: 1, TopP: 0.4},
logits: []float32{logOf(0.5), logOf(0.3), logOf(0.2)},
want: 0, // exclusive cumsum below 0.4 keeps only token 0
},
{
name: "min-p",
opts: Options{Temperature: 1, MinP: 0.7},
logits: []float32{logOf(0.5), logOf(0.3), logOf(0.2)},
want: 0, // threshold 0.5*0.7=0.35 drops all but the top token
},
{
name: "RepeatLastN=0 disables penalties",
opts: Options{RepeatLastN: 0, RepeatPenalty: 2, PresencePenalty: 10},
priors: []int32{1},
logits: []float32{0, 5, 4},
want: 1, // 0 = disabled per API contract, argmax unchanged
},
{
name: "RepeatLastN=-1 resolves to num_ctx",
opts: Options{RepeatLastN: -1, PresencePenalty: 6},
priors: []int32{1},
logits: []float32{0, 5, 4},
want: 2, // -1 → num_ctx (128); penalty applies, argmax shifts
},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
if got := sampleOne(t, tc.opts, tc.priors, tc.logits); got != tc.want {
t.Errorf("got %d, want %d", got, tc.want)
}
})
} }
} }
func TestRepeatPenaltyUsesHistoryWithoutPresencePenalty(t *testing.T) { // TestSampleHistoryWindow verifies that penalty history respects the
s := New(0, 0, 0, 0, 1, 2, 0, 0) // RepeatLastN window: priors longer than RepeatLastN are trimmed on Add,
defer func() { // and once the ring wraps, tokens that rotate out no longer contribute
// to penalties.
func TestSampleHistoryWindow(t *testing.T) {
s := New(128)
t.Cleanup(func() {
s.Free() s.Free()
mlx.Sweep() mlx.Sweep()
}() })
s.ResetHistory([]int32{1}) // RepeatLastN=2 with priors {1, 2, 3}: makeHistoryRow keeps only
// {2, 3}. Token 1 was trimmed — its penalty is NOT active.
s.Add(0, Options{RepeatLastN: 2, PresencePenalty: 10}, []int32{1, 2, 3})
logits := mlx.FromValues([]float32{0, 5, 4}, 3) // Step 1: logits favor token 1 (trimmed). If the trim were broken it
got := s.Sample(logits) // would be penalized and the argmax would move.
mlx.Eval(got) step1 := s.Sample([]int{0}, slotLogits([]float32{0, 5, 0, 0, 0})).Token
mlx.Eval(step1)
if got := step1.Int(); got != 1 {
t.Fatalf("step 1 = %d, want 1 (token 1 trimmed from priors)", got)
}
// After step 1 the ring holds {1, 3}; token 2 has rotated out.
// token 1 is repeated and positive, so 5 / 2 falls below token 2. // Step 2: logits favor token 2 (rotated out). If the ring wrap were
gotInt := got.Int() // wrong, token 2 would still be penalized.
if gotInt != 2 { step2 := s.Sample([]int{0}, slotLogits([]float32{0, 0, 5, 0, 0})).Token
t.Fatalf("got %d, want 2", gotInt) mlx.Eval(step2)
if got := step2.Int(); got != 2 {
t.Fatalf("step 2 = %d, want 2 (token 2 rotated out of ring)", got)
} }
} }
func TestFrequencyPenaltyUsesTokenCounts(t *testing.T) { // TestBatchSamplingPreservesPerSlotBehavior is the core equivalence test:
s := New(0, 0, 0, 0, 4, 1, 0, 2) // for every representative dispatch branch (uniform, serial on mixed opts,
defer func() { // serial on partial ring, subset/out-of-order), a batched Sample call must
s.Free() // produce the same token per row as running the same slot alone.
mlx.Sweep() func TestBatchSamplingPreservesPerSlotBehavior(t *testing.T) {
}() type slot struct {
id int
opts Options
priors []int32
}
s.ResetHistory([]int32{1, 1}) cases := []struct {
name string
slots []slot
sample []int
rows [][]float32
}{
{
name: "uniform",
slots: []slot{
{10, Options{RepeatLastN: 2, PresencePenalty: 5}, []int32{1, 2}},
{20, Options{RepeatLastN: 2, PresencePenalty: 5}, []int32{0, 2}},
},
sample: []int{10, 20},
rows: [][]float32{{0, 5, 4}, {3, 0, 0}},
},
{
name: "serial — mixed opts",
slots: []slot{
{1, Options{RepeatLastN: 1, RepeatPenalty: 2}, []int32{1}},
{2, Options{Temperature: 1, TopK: 1}, nil},
},
sample: []int{1, 2},
rows: [][]float32{{0, 5, 4, 1}, {2, 1, 5, 3}},
},
{
name: "serial — partial ring",
slots: []slot{
{1, Options{RepeatLastN: 4, PresencePenalty: 5}, []int32{1, 1, 1, 1}},
{2, Options{RepeatLastN: 4, PresencePenalty: 5}, []int32{2}},
},
sample: []int{1, 2},
rows: [][]float32{{0, 5, 4}, {0, 4, 5}},
},
{
name: "subset out-of-order",
slots: []slot{
{10, Options{RepeatLastN: 2, PresencePenalty: 10}, []int32{1, 1}},
{20, Options{RepeatLastN: 2, PresencePenalty: 10}, []int32{2, 2}},
{30, Options{RepeatLastN: 2, PresencePenalty: 10}, []int32{3, 3}},
},
sample: []int{30, 10},
rows: [][]float32{{5, 5, 5, 0, 5, 5}, {5, 0, 5, 5, 0, 5}},
},
}
logits := mlx.FromValues([]float32{0, 5, 4}, 3) for _, tc := range cases {
got := s.Sample(logits) t.Run(tc.name, func(t *testing.T) {
mlx.Eval(got) // Per-slot reference for each sampled seq.
want := make([]int, len(tc.sample))
for i, id := range tc.sample {
var spec slot
for _, s := range tc.slots {
if s.id == id {
spec = s
break
}
}
want[i] = sampleOne(t, spec.opts, spec.priors, tc.rows[i])
}
// token 1 appears twice, so 5 - (2 * 2) falls below token 2. // Batched call.
gotInt := got.Int() s := New(128)
if gotInt != 2 { t.Cleanup(func() {
t.Fatalf("got %d, want 2", gotInt) s.Free()
mlx.Sweep()
})
for _, spec := range tc.slots {
s.Add(spec.id, spec.opts, spec.priors)
}
res := s.Sample(tc.sample, batchLogits(tc.rows...))
mlx.Eval(res.Token)
got := res.Token.Ints()
for i, id := range tc.sample {
if got[i] != want[i] {
t.Errorf("seq %d: batched = %d, per-slot = %d", id, got[i], want[i])
}
}
})
} }
} }
func TestMinPMasksTokensBelowThreshold(t *testing.T) { // TestRemoveDoesNotLeakHistory: after Remove, a newly-added slot at the
s := New(0, 0, 0.5, 0, 0, 1, 0, 0) // recycled row must start from its own priors only — no carryover from
defer func() { // the removed slot's history.
func TestRemoveDoesNotLeakHistory(t *testing.T) {
opts := Options{RepeatLastN: 1, PresencePenalty: 10}
s := New(128)
t.Cleanup(func() {
s.Free() s.Free()
mlx.Sweep() mlx.Sweep()
}() })
s.Add(1, opts, []int32{1})
s.Add(2, opts, []int32{2})
s.Remove(1)
s.Add(3, opts, []int32{0})
logits := mlx.FromValues([]float32{ // Slot 2 retains history {2}; slot 3 retains history {0}. With
float32(math.Log(0.5)), // equal logits and PresencePenalty=10 the argmax drops to the first
float32(math.Log(0.3)), // unpenalized token.
float32(math.Log(0.2)), res := s.Sample([]int{2, 3}, batchLogits(
}, 3) []float32{3, 3, 0},
got := minP(s, logits) []float32{3, 3, 0},
mlx.Eval(got) ))
mlx.Eval(res.Token)
gotFloats := got.Floats() tokens := res.Token.Ints()
if len(gotFloats) != 3 { if tokens[0] != 0 {
t.Fatalf("got %d scores, want 3", len(gotFloats)) t.Errorf("slot 2 = %d, want 0 (token 2 penalized)", tokens[0])
} }
if tokens[1] != 1 {
if math.IsInf(float64(gotFloats[0]), -1) || math.IsInf(float64(gotFloats[1]), -1) { t.Errorf("slot 3 = %d, want 1 (token 0 penalized, no slot-1 carryover)", tokens[1])
t.Fatalf("kept tokens were masked: %v", gotFloats)
}
if !math.IsInf(float64(gotFloats[2]), -1) {
t.Fatalf("lowest-probability token should be masked, got %v", gotFloats)
} }
} }

View File

@@ -2,7 +2,6 @@ package mlxrunner
import ( import (
"bytes" "bytes"
"cmp"
"context" "context"
"encoding/json" "encoding/json"
"flag" "flag"
@@ -87,25 +86,30 @@ func Execute(args []string) error {
mux.HandleFunc("POST /v1/completions", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("POST /v1/completions", func(w http.ResponseWriter, r *http.Request) {
request := Request{Responses: make(chan CompletionResponse)} request := Request{Responses: make(chan CompletionResponse)}
if err := json.NewDecoder(r.Body).Decode(&request.TextCompletionsRequest); err != nil { if err := json.NewDecoder(r.Body).Decode(&request.CompletionRequest); err != nil {
slog.Error("Failed to decode request", "error", err) slog.Error("Failed to decode request", "error", err)
http.Error(w, "Bad Request", http.StatusBadRequest) http.Error(w, "Bad Request", http.StatusBadRequest)
return return
} }
request.Options.MaxTokens = cmp.Or(request.Options.MaxTokens, request.Options.NumPredict)
request.Pipeline = runner.TextGenerationPipeline request.Pipeline = runner.TextGenerationPipeline
request.Sampler = sample.New( request.SamplerOpts = sample.Options{
request.Options.Temperature, Temperature: request.Options.Temperature,
request.Options.TopP, TopP: request.Options.TopP,
request.Options.MinP, MinP: request.Options.MinP,
request.Options.TopK, TopK: request.Options.TopK,
request.Options.RepeatLastN, RepeatLastN: request.Options.RepeatLastN,
request.Options.RepeatPenalty, RepeatPenalty: request.Options.RepeatPenalty,
request.Options.PresencePenalty, PresencePenalty: request.Options.PresencePenalty,
request.Options.FrequencyPenalty, FrequencyPenalty: request.Options.FrequencyPenalty,
) Logprobs: request.Logprobs,
TopLogprobs: request.TopLogprobs,
}
if err := runner.Prepare(&request); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var cancel context.CancelFunc var cancel context.CancelFunc
request.Ctx, cancel = context.WithCancel(r.Context()) request.Ctx, cancel = context.WithCancel(r.Context())

View File

@@ -144,6 +144,8 @@ func TestRouterForwardMatchesLegacy(t *testing.T) {
gotScores, gotInds := r.Forward(x, cfg) gotScores, gotInds := r.Forward(x, cfg)
wantScores, wantInds := legacyRouterForward(r, x, cfg) wantScores, wantInds := legacyRouterForward(r, x, cfg)
gotInds = gotInds.AsType(mlx.DTypeInt32)
wantInds = wantInds.AsType(mlx.DTypeInt32)
mlx.Eval(gotScores, gotInds, wantScores, wantInds) mlx.Eval(gotScores, gotInds, wantScores, wantInds)
if got, want := gotInds.Ints(), wantInds.Ints(); !intSlicesEqual(got, want) { if got, want := gotInds.Ints(), wantInds.Ints(); !intSlicesEqual(got, want) {

View File

@@ -169,8 +169,8 @@ func TestQuantizedLinearMXFP4MatchesDequantizedWeight(t *testing.T) {
dequantizedWeight := mlx.Dequantize(ql.Weight, ql.Scales, ql.QBiases, 32, 4, "mxfp4") dequantizedWeight := mlx.Dequantize(ql.Weight, ql.Scales, ql.QBiases, 32, 4, "mxfp4")
mlx.Eval(dequantizedWeight) mlx.Eval(dequantizedWeight)
qOut := ql.Forward(input) qOut := ql.Forward(input).AsType(mlx.DTypeFloat32)
dOut := NewLinear(dequantizedWeight, nil).Forward(input) dOut := NewLinear(dequantizedWeight, nil).Forward(input).AsType(mlx.DTypeFloat32)
mlx.Eval(qOut, dOut) mlx.Eval(qOut, dOut)
got := qOut.Floats() got := qOut.Floats()