# SPDX-FileCopyrightText: 2026 Ahmed Imamović
# SPDX-FileCopyrightText: 2026 Tarik Hamedović
# SPDX-License-Identifier: BSD-2-Clause
# streams.py
#
# UberClock high-speed streaming primitives (UC domain)
# =====================================================
# UC-domain streaming building blocks used to feed the UberDDR3 S2MM DMA writer:
#
# 1) RampSource:
# Deterministic 16-bit-lane ramp packed into BUS_DATA_WIDTH-bit beats.
#
# 2) SamplePackerStream:
# Samples one value per UC cycle, packs 16-bit samples into BUS_DATA_WIDTH
# beats (LANES = BUS_DATA_WIDTH/16), and outputs a valid/ready beat stream.
#
# 3) UCStreamMux:
# Select between RampSource and an external beat-stream.
#
# Notes:
# - These blocks are UC-domain only. CDC to sys must be done outside.
# - Migen Signal() does NOT accept `description=...` (that's for LiteX CSRs).
#
from __future__ import annotations
from migen import *
from litex.gen import *
from litex.gen import LiteXModule
from migen.genlib.fifo import SyncFIFO
# -----------------------------------------------------------------------------
# RampSource
# -----------------------------------------------------------------------------
[docs]
class RampSource(LiteXModule):
"""
Generates a 16-bit-lane ramp packed into bus_data_width-bit beats.
Packing:
- Beat width: bus_data_width bits
- Lanes: LANES = bus_data_width / 16
- Lane i contains: (base_step + i) as a 16-bit value
- base_step increments by LANES per accepted beat
"""
def __init__(self, bus_data_width: int = 256, max_beats: int = (1 << 23)):
assert bus_data_width % 16 == 0
LANES = bus_data_width // 16
# Control / stream interface
self.start = Signal() # pulse to start ramp streaming (UC domain)
self.valid = Signal()
self.ready = Signal()
self.data = Signal(bus_data_width)
self.bytes = Signal(max=(bus_data_width // 8) + 1)
self.last = Signal()
self.length_beats = Signal(32, reset=256)
# Internal state
running = Signal()
base_step = Signal(16) # lane0 value (increments by LANES per beat)
beat_idx = Signal(max=max_beats) # beat index counter
# Clamp length into [1..MAX_BEATS]
length_clamped = Signal.like(beat_idx)
self.comb += [
If(self.length_beats == 0,
length_clamped.eq(1)
).Elif(self.length_beats >= max_beats,
length_clamped.eq(max_beats)
).Else(
length_clamped.eq(self.length_beats[:len(beat_idx)])
)
]
# Beat counter / ramp progression
self.sync += [
If(self.start,
running.eq(1),
beat_idx.eq(0),
base_step.eq(0),
).Elif(running & self.ready,
base_step.eq(base_step + LANES),
If(beat_idx == (length_clamped - 1),
running.eq(0)
).Else(
beat_idx.eq(beat_idx + 1)
)
)
]
# Outputs
self.comb += [
self.valid.eq(running),
self.bytes.eq(bus_data_width // 8),
self.last.eq(running & (beat_idx == (length_clamped - 1))),
]
# Ramp packing: lane i = base_step + i
for i in range(LANES):
self.comb += self.data[16*i:16*(i+1)].eq(base_step + i)
# -----------------------------------------------------------------------------
# SamplePackerStream
# -----------------------------------------------------------------------------
[docs]
class SamplePackerStream(LiteXModule):
"""
Samples one value per UC cycle, packs 16-bit samples into bus_data_width beats.
Packing:
- Each UC cycle produces one 16-bit sample (sign-extended from sample_width)
- LANES = bus_data_width / 16 samples are packed into one beat
- The first sample goes to lane 0 (bits [15:0]), then lane 1, ..., lane LANES-1
- After LANES cycles, one beat is enqueued into the beat FIFO
Output:
- Beat FIFO drains using valid/ready
- `frames` is the number of beats to capture/emit (one beat = LANES samples)
- overflow=1 means the beat FIFO could not accept completed beats in time
"""
def __init__(self, sample_width: int = 12, bus_data_width: int = 256, beat_fifo_depth: int = 512):
assert bus_data_width % 16 == 0
LANES = bus_data_width // 16
# Inputs
self.sample_in = Signal(sample_width)
self.start = Signal() # pulse
self.frames = Signal(32) # number of beats (frames) to capture/emit
# Stream outputs
self.valid = Signal()
self.ready = Signal()
self.data = Signal(bus_data_width)
self.bytes = Signal(max=(bus_data_width // 8) + 1)
self.last = Signal()
self.overflow = Signal()
self.comb += self.bytes.eq(bus_data_width // 8)
# Sign-extend to 16-bit
sample16 = Signal(16)
self.comb += sample16.eq(
Cat(self.sample_in,
Replicate(self.sample_in[sample_width - 1], 16 - sample_width))
)
# Beat FIFO (UC domain)
bf = SyncFIFO(width=bus_data_width, depth=beat_fifo_depth)
self.submodules.bf = bf
# Ping-pong packing buffers
buf0 = Array(Signal(16) for _ in range(LANES))
buf1 = Array(Signal(16) for _ in range(LANES))
active = Signal() # 0 -> fill buf0, 1 -> fill buf1
fill_idx = Signal(max=LANES) # lane being written
pend0 = Signal() # buf0 complete, waiting enqueue
pend1 = Signal() # buf1 complete, waiting enqueue
running = Signal()
frames_packed = Signal(32) # complete beats formed (not necessarily enqueued yet)
frames_sent = Signal(32) # beats popped downstream
# Assemble words from buffers
word0 = Signal(bus_data_width)
word1 = Signal(bus_data_width)
for i in range(LANES):
self.comb += [
word0[16*i:16*(i+1)].eq(buf0[i]),
word1[16*i:16*(i+1)].eq(buf1[i]),
]
# Sampling / packing (every UC cycle)
self.sync += [
If(self.start,
running.eq(1),
active.eq(0),
fill_idx.eq(0),
pend0.eq(0),
pend1.eq(0),
frames_packed.eq(0),
frames_sent.eq(0),
self.overflow.eq(0),
).Elif(running & ~self.overflow,
If(active == 0,
buf0[fill_idx].eq(sample16)
).Else(
buf1[fill_idx].eq(sample16)
),
If(fill_idx == (LANES - 1),
If(active == 0,
If(pend0 | pend1,
self.overflow.eq(1),
running.eq(0),
).Else(
pend0.eq(1),
)
).Else(
If(pend0 | pend1,
self.overflow.eq(1),
running.eq(0),
).Else(
pend1.eq(1),
)
),
active.eq(~active),
fill_idx.eq(0),
If(frames_packed == (self.frames - 1),
running.eq(0),
).Else(
frames_packed.eq(frames_packed + 1),
)
).Else(
fill_idx.eq(fill_idx + 1)
)
)
]
do_enq0 = Signal()
do_enq1 = Signal()
self.comb += [
do_enq0.eq(pend0 & bf.writable),
do_enq1.eq(~do_enq0 & pend1 & bf.writable),
]
self.comb += [
bf.we.eq(do_enq0 | do_enq1),
bf.din.eq(Mux(do_enq0, word0, word1)),
]
self.sync += [
If(do_enq0, pend0.eq(0)),
If(do_enq1, pend1.eq(0)),
]
# Output stream from beat FIFO
self.comb += [
self.valid.eq(bf.readable),
self.data.eq(bf.dout),
bf.re.eq(self.valid & self.ready),
self.last.eq(self.valid & (frames_sent == (self.frames - 1))),
]
self.sync += [
If(self.start,
frames_sent.eq(0)
).Elif(self.valid & self.ready,
If(frames_sent != (self.frames - 1),
frames_sent.eq(frames_sent + 1)
)
)
]
# -----------------------------------------------------------------------------
# UCStreamMux
# -----------------------------------------------------------------------------
[docs]
class UCStreamMux(LiteXModule):
"""
Selects between:
- RampSource (internal test pattern)
- External beat stream (typically SamplePackerStream)
Control:
- start: starts ramp transfer when use_external=0
- use_external: 0=ramp, 1=external stream
- ramp_length_beats: ramp length in beats
"""
def __init__(self, bus_data_width: int = 256, max_beats: int = (1 << 23)):
# Control
self.start = Signal()
self.use_external = Signal()
self.ramp_length_beats = Signal(32, reset=256)
# Muxed stream outputs
self.valid = Signal()
self.ready = Signal()
self.data = Signal(bus_data_width)
self.bytes = Signal(max=(bus_data_width // 8) + 1)
self.last = Signal()
# External stream inputs
self.ext_valid = Signal()
self.ext_ready = Signal()
self.ext_data = Signal(bus_data_width)
self.ext_bytes = Signal(max=(bus_data_width // 8) + 1)
self.ext_last = Signal()
# Internal ramp generator
self.submodules.ramp = RampSource(bus_data_width=bus_data_width, max_beats=max_beats)
self.comb += [
self.ramp.length_beats.eq(self.ramp_length_beats),
self.ramp.start.eq(self.start & ~self.use_external),
]
self.comb += [
self.valid.eq(Mux(self.use_external, self.ext_valid, self.ramp.valid)),
self.data .eq(Mux(self.use_external, self.ext_data, self.ramp.data)),
self.bytes.eq(Mux(self.use_external, self.ext_bytes, self.ramp.bytes)),
self.last .eq(Mux(self.use_external, self.ext_last, self.ramp.last)),
]
self.comb += [
self.ext_ready.eq(self.ready & self.use_external),
self.ramp.ready.eq(self.ready & ~self.use_external),
]