From a6df554eb353dcf28c343c5c2ad480a2fa28b9dd Mon Sep 17 00:00:00 2001 From: Abhishek Krishna Date: Sun, 24 May 2026 17:10:59 +0530 Subject: [PATCH] =?UTF-8?q?feat(oracle):=20MuzixStreamingOracle=20?= =?UTF-8?q?=E2=80=94=20on-chain=20consumer=20surface=20(closes=20spec=20Ph?= =?UTF-8?q?ase=201)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements the Phase-1 MVP described in oracle/SPECIFICATION.md: the on-chain consumer-facing contract that authorized pushers submit verified streaming-revenue records to, and that downstream MUSD royalty distributors read from. Scope (matches IStreamingRevenueOracle in the spec): - Per-DSP registry with per-DSP min-confidence override (default 7500 bps). - Authorized pusher set + per-pusher 1h cooldown (spec §2 "Rate limiting"). - Period sanity: periodStart < periodEnd, periodEnd <= block.timestamp. - Append-only history with per-(catalog, dsp) latest pointers for cheap on-chain reads, plus a getRevenueForPeriod view for off-chain RPC consumers. - isDataFresh / freshnessWindow (default 24h, admin-configurable). - Pausable circuit breaker (spec §3 "Fail-Safes"). - subscribeToUpdates emits an event so off-chain indexers can pre-filter their watch list without paying for storage. Out of scope (tracked for later phases): on-chain consensus, node staking/slashing, dispute window, Chainlink fallback, cross-chain. 35 new Foundry tests cover the registry, pusher auth, confidence floor (default + per-DSP override), period invariants, cooldown across single + batch submissions, freshness, period-sum aggregation, pause/unpause, and out-of-bounds history access. Full suite: 50/50 passing. --- oracle/README.md | 43 +++- src/MuzixStreamingOracle.sol | 403 ++++++++++++++++++++++++++++++++ test/MuzixStreamingOracle.t.sol | 401 +++++++++++++++++++++++++++++++ 3 files changed, 838 insertions(+), 9 deletions(-) create mode 100644 src/MuzixStreamingOracle.sol create mode 100644 test/MuzixStreamingOracle.t.sol diff --git a/oracle/README.md b/oracle/README.md index 5c0e9f9e..99560540 100644 --- a/oracle/README.md +++ b/oracle/README.md @@ -14,24 +14,49 @@ The Streaming Revenue Oracle aggregates revenue data from Digital Service Provid - **Confidence Scoring**: Multi-factor scoring for data reliability - **Smart Contract Integration**: Easy consumption by royalty distribution contracts +## Status + +| Component | State | Source | +|-----------|-------|--------| +| Consumer-facing contract | **Shipped** (Phase 1 MVP) | [`src/MuzixStreamingOracle.sol`](../src/MuzixStreamingOracle.sol) | +| Off-chain pusher node | In design | [SPECIFICATION.md §"Oracle Node Network"](./SPECIFICATION.md) | +| On-chain consensus + slashing | Not started | [SPECIFICATION.md Phase 2/3](./SPECIFICATION.md) | +| Chainlink fallback wiring | Not started | — | + ## Quick Start ```solidity -import {IStreamingRevenueOracle} from "./interfaces/IStreamingRevenueOracle.sol"; - -contract MyContract { - IStreamingRevenueOracle oracle; - - function getRevenue(bytes32 catalogId) external view returns (uint256) { - StreamingRevenue memory revenue = oracle.getLatestRevenue(catalogId); - return revenue.revenueUsd; +import {MuzixStreamingOracle} from "../src/MuzixStreamingOracle.sol"; + +contract RoyaltyDistributor { + MuzixStreamingOracle public oracle; + + function distributeIfFresh(bytes32 catalogId) external { + require(oracle.isDataFresh(catalogId), "stale"); + MuzixStreamingOracle.StreamingRevenue memory rev = oracle.getLatestRevenue(catalogId); + // ... settle MUSD against royalty splits using rev.revenueUsd } } ``` +## What the on-chain contract enforces + +The deployed contract trusts the off-chain node network for consensus and verifies a focused +set of on-chain invariants per submission: + +- **Pusher authorization** — only addresses added via `setPusher` may submit. +- **DSP registry** — submissions for unregistered or deactivated DSPs revert. +- **Confidence floor** — per-DSP `minConfidenceScore` overrides the default 7500 bps. +- **Period sanity** — `periodStart < periodEnd` and `periodEnd <= block.timestamp`. +- **Cooldown** — one submission round per pusher per `SUBMISSION_COOLDOWN` (1h); + batches share the cooldown so a single round can land an arbitrary number of records. +- **Circuit breaker** — `pause()` halts all new submissions; reads continue working. + ## Architecture -See [SPECIFICATION.md](./SPECIFICATION.md) for detailed architecture and implementation guide. +See [SPECIFICATION.md](./SPECIFICATION.md) for the full architecture, node-network design, and +roadmap. The on-chain contract implements the Phase 1 (MVP) consumer surface; subsequent +phases add the node-network and dispute machinery. ## Data Flow diff --git a/src/MuzixStreamingOracle.sol b/src/MuzixStreamingOracle.sol new file mode 100644 index 00000000..2c365544 --- /dev/null +++ b/src/MuzixStreamingOracle.sol @@ -0,0 +1,403 @@ +// SPDX-License-Identifier: MIT +pragma solidity ^0.8.20; + +import "@openzeppelin/contracts/access/Ownable.sol"; +import "@openzeppelin/contracts/utils/Pausable.sol"; + +/** + * @title MuzixStreamingOracle + * @author kcolbchain + * @notice On-chain streaming-revenue feed for music catalogs. Authorized + * pushers submit per-(catalog, DSP, period) revenue records produced + * off-chain by the Muzix oracle node network. Consumers (e.g. an MUSD + * royalty distributor) read the latest verified data per catalog to + * trigger settlement. + * + * This contract implements the consumer-facing surface of + * `oracle/SPECIFICATION.md` Phase 1 (MVP): + * - Per-DSP registry with per-DSP min-confidence override. + * - Authorized pusher set (off-chain consensus is enforced by the + * node network; this contract trusts the post-consensus push). + * - Per-submission confidence floor (default 7500 bps) and + * freshness window (default 24h) — both admin-configurable. + * - Per-pusher submission cooldown (default 1h) per spec §2. + * - Append-only history for off-chain auditability; latest pointer + * per catalog and per (catalog, dsp) for cheap on-chain reads. + * - Pausable circuit breaker (spec §3 "Fail-Safes"). + * + * Out of scope for this contract (tracked separately): + * - On-chain node staking, slashing, and consensus aggregation. + * - Chainlink fallback wiring. + * - Dispute / challenge windows. + * - Cross-chain delivery. + * + * Revenue is denominated in USD with 6 decimals — same convention + * the off-chain pipeline emits and downstream MUSD payouts consume. + */ +contract MuzixStreamingOracle is Ownable, Pausable { + // ----------------------------------------------------------------------- + // Constants + // ----------------------------------------------------------------------- + + /// @notice Basis-points denominator for confidence scores. + uint256 public constant CONFIDENCE_BASIS = 10000; + + /// @notice Default minimum acceptable confidence score (75% per spec). + uint256 public constant DEFAULT_MIN_CONFIDENCE = 7500; + + /// @notice Default freshness window for `isDataFresh` (24h per spec). + uint256 public constant DEFAULT_FRESHNESS_WINDOW = 24 hours; + + /// @notice Per-pusher submission cooldown (spec §2 "Rate limiting"). + uint256 public constant SUBMISSION_COOLDOWN = 1 hours; + + // ----------------------------------------------------------------------- + // Types + // ----------------------------------------------------------------------- + + /// @notice Per-(catalog, DSP, period) verified revenue record. Layout + /// tracks `oracle/SPECIFICATION.md` §"Data Schema" exactly so + /// off-chain producers can re-encode without surprises. + struct StreamingRevenue { + bytes32 catalogId; + bytes32 dspId; + uint256 totalStreams; + uint256 revenueUsd; + uint256 periodStart; + uint256 periodEnd; + bytes32 territoryHash; + bytes32 dataSourceHash; + uint256 lastUpdated; + uint256 confidenceScore; + } + + /// @notice DSP registry entry. `minConfidenceScore == 0` falls back to + /// `DEFAULT_MIN_CONFIDENCE` at submission time so admins can + /// opt into the default without an explicit write. + struct DSPInfo { + bytes32 dspId; + string name; + uint256 weight; + bool isActive; + uint256 minConfidenceScore; + } + + // ----------------------------------------------------------------------- + // Storage + // ----------------------------------------------------------------------- + + /// @notice Freshness window used by `isDataFresh`. Admin-configurable so + /// long-tail catalogs (monthly settlement) can opt into a wider + /// window without re-deploying. + uint256 public freshnessWindow; + + mapping(bytes32 => DSPInfo) internal _dsps; + bytes32[] internal _dspIds; + mapping(bytes32 => bool) internal _dspRegistered; + + mapping(address => bool) public isPusher; + mapping(address => uint256) public lastSubmissionAt; + + // catalog → dsp → latest record for that (catalog, DSP) pair. + mapping(bytes32 => mapping(bytes32 => StreamingRevenue)) internal _latestByDsp; + // catalog → most-recently-updated record across all DSPs. + mapping(bytes32 => StreamingRevenue) internal _latest; + // catalog → append-only history (one entry per accepted submission). + mapping(bytes32 => StreamingRevenue[]) internal _history; + // catalog → has at least one accepted record (distinguishes "never seen" + // from "all-zero record"). + mapping(bytes32 => bool) internal _hasRecord; + + // ----------------------------------------------------------------------- + // Events + // ----------------------------------------------------------------------- + + event RevenueUpdated( + bytes32 indexed catalogId, + bytes32 indexed dspId, + uint256 revenueUsd, + uint256 confidenceScore, + uint256 periodEnd, + uint256 timestamp + ); + + event DSPRegistered(bytes32 indexed dspId, string name, uint256 weight, uint256 minConfidenceScore); + + event DSPUpdated(bytes32 indexed dspId, uint256 weight, bool isActive, uint256 minConfidenceScore); + + event PusherUpdated(address indexed pusher, bool authorized); + + event FreshnessWindowUpdated(uint256 oldWindow, uint256 newWindow); + + event SubscribedToUpdates(bytes32 indexed catalogId, address indexed subscriber); + + // ----------------------------------------------------------------------- + // Errors + // ----------------------------------------------------------------------- + + error NotPusher(address caller); + error CatalogIdRequired(); + error DspIdRequired(); + error DSPNotRegistered(bytes32 dspId); + error DSPAlreadyRegistered(bytes32 dspId); + error DSPInactive(bytes32 dspId); + error InvalidPeriod(uint256 periodStart, uint256 periodEnd); + error PeriodInFuture(uint256 periodEnd, uint256 nowTs); + error ConfidenceOutOfRange(uint256 confidenceScore); + error ConfidenceTooLow(uint256 got, uint256 min); + error CooldownActive(address pusher, uint256 nextAllowed); + error EmptyBatch(); + error ZeroAddress(); + error HistoryIndexOutOfBounds(uint256 idx, uint256 length); + + // ----------------------------------------------------------------------- + // Constructor + // ----------------------------------------------------------------------- + + constructor(address initialOwner) Ownable(initialOwner) { + if (initialOwner == address(0)) revert ZeroAddress(); + freshnessWindow = DEFAULT_FRESHNESS_WINDOW; + } + + // ----------------------------------------------------------------------- + // Modifiers + // ----------------------------------------------------------------------- + + modifier onlyPusher() { + if (!isPusher[msg.sender]) revert NotPusher(msg.sender); + _; + } + + // ----------------------------------------------------------------------- + // Admin — DSP registry + // ----------------------------------------------------------------------- + + /** + * @notice Register a new DSP. `minConfidenceScore` may be `0` to inherit + * `DEFAULT_MIN_CONFIDENCE` at submission time. + */ + function registerDSP(bytes32 dspId, string calldata name, uint256 weight, uint256 minConfidenceScore) + external + onlyOwner + { + if (dspId == bytes32(0)) revert DspIdRequired(); + if (_dspRegistered[dspId]) revert DSPAlreadyRegistered(dspId); + if (minConfidenceScore > CONFIDENCE_BASIS) revert ConfidenceOutOfRange(minConfidenceScore); + + _dsps[dspId] = DSPInfo({ + dspId: dspId, + name: name, + weight: weight, + isActive: true, + minConfidenceScore: minConfidenceScore + }); + _dspIds.push(dspId); + _dspRegistered[dspId] = true; + + emit DSPRegistered(dspId, name, weight, minConfidenceScore); + } + + /** + * @notice Update an existing DSP's weight, active flag, or min-confidence. + */ + function updateDSP(bytes32 dspId, uint256 weight, bool isActive, uint256 minConfidenceScore) external onlyOwner { + if (!_dspRegistered[dspId]) revert DSPNotRegistered(dspId); + if (minConfidenceScore > CONFIDENCE_BASIS) revert ConfidenceOutOfRange(minConfidenceScore); + + DSPInfo storage info = _dsps[dspId]; + info.weight = weight; + info.isActive = isActive; + info.minConfidenceScore = minConfidenceScore; + + emit DSPUpdated(dspId, weight, isActive, minConfidenceScore); + } + + // ----------------------------------------------------------------------- + // Admin — pushers & circuit breaker + // ----------------------------------------------------------------------- + + function setPusher(address pusher, bool authorized) external onlyOwner { + if (pusher == address(0)) revert ZeroAddress(); + isPusher[pusher] = authorized; + emit PusherUpdated(pusher, authorized); + } + + function setFreshnessWindow(uint256 newWindow) external onlyOwner { + uint256 old = freshnessWindow; + freshnessWindow = newWindow; + emit FreshnessWindowUpdated(old, newWindow); + } + + function pause() external onlyOwner { + _pause(); + } + + function unpause() external onlyOwner { + _unpause(); + } + + // ----------------------------------------------------------------------- + // Submission + // ----------------------------------------------------------------------- + + /** + * @notice Submit a single verified revenue record. The off-chain node + * network is expected to have already reached consensus; this + * contract enforces only on-chain invariants (registry, confidence + * floor, period sanity, pusher auth, cooldown). + */ + function submitRevenue(StreamingRevenue calldata data) external whenNotPaused onlyPusher { + _checkCooldown(msg.sender); + _submit(data); + lastSubmissionAt[msg.sender] = block.timestamp; + } + + /** + * @notice Batch variant. Cooldown is applied once for the whole batch + * (a batch is one logical aggregation round per spec §"Consensus + * Mechanism"), so a pusher can land an arbitrary number of + * records per round without artificially serialising them. + */ + function batchSubmitRevenue(StreamingRevenue[] calldata updates) external whenNotPaused onlyPusher { + if (updates.length == 0) revert EmptyBatch(); + _checkCooldown(msg.sender); + for (uint256 i = 0; i < updates.length; i++) { + _submit(updates[i]); + } + lastSubmissionAt[msg.sender] = block.timestamp; + } + + function _checkCooldown(address pusher) internal view { + uint256 last = lastSubmissionAt[pusher]; + if (last == 0) return; + uint256 nextAllowed = last + SUBMISSION_COOLDOWN; + if (block.timestamp < nextAllowed) revert CooldownActive(pusher, nextAllowed); + } + + function _submit(StreamingRevenue calldata data) internal { + if (data.catalogId == bytes32(0)) revert CatalogIdRequired(); + if (data.dspId == bytes32(0)) revert DspIdRequired(); + if (!_dspRegistered[data.dspId]) revert DSPNotRegistered(data.dspId); + + DSPInfo memory dsp = _dsps[data.dspId]; + if (!dsp.isActive) revert DSPInactive(data.dspId); + + if (data.confidenceScore > CONFIDENCE_BASIS) revert ConfidenceOutOfRange(data.confidenceScore); + uint256 minConf = dsp.minConfidenceScore == 0 ? DEFAULT_MIN_CONFIDENCE : dsp.minConfidenceScore; + if (data.confidenceScore < minConf) revert ConfidenceTooLow(data.confidenceScore, minConf); + + if (data.periodStart >= data.periodEnd) revert InvalidPeriod(data.periodStart, data.periodEnd); + if (data.periodEnd > block.timestamp) revert PeriodInFuture(data.periodEnd, block.timestamp); + + // Stamp lastUpdated server-side so the on-chain record is canonical + // and off-chain clocks can't desync the freshness window. + StreamingRevenue memory rec = data; + rec.lastUpdated = block.timestamp; + + _latestByDsp[rec.catalogId][rec.dspId] = rec; + _latest[rec.catalogId] = rec; + _hasRecord[rec.catalogId] = true; + _history[rec.catalogId].push(rec); + + emit RevenueUpdated(rec.catalogId, rec.dspId, rec.revenueUsd, rec.confidenceScore, rec.periodEnd, rec.lastUpdated); + } + + // ----------------------------------------------------------------------- + // Consumer reads + // ----------------------------------------------------------------------- + + /** + * @notice Most recently submitted record for a catalog (across all DSPs). + * Returns a zeroed struct if no record has ever been accepted — + * pair with `hasRecord` to disambiguate. + */ + function getLatestRevenue(bytes32 catalogId) external view returns (StreamingRevenue memory) { + return _latest[catalogId]; + } + + /** + * @notice Most recently submitted record for a specific (catalog, DSP). + */ + function getLatestRevenueByDsp(bytes32 catalogId, bytes32 dspId) + external + view + returns (StreamingRevenue memory) + { + return _latestByDsp[catalogId][dspId]; + } + + /** + * @notice Sum of `revenueUsd` over every historical record for this + * catalog whose `[periodStart, periodEnd]` is fully contained + * within the queried window. + * @dev Iterates the full history for `catalogId`. Intended for + * off-chain RPC consumers; on-chain callers should prefer + * `_latestByDsp` lookups or event indexing to avoid unbounded gas. + */ + function getRevenueForPeriod(bytes32 catalogId, uint256 periodStart, uint256 periodEnd) + external + view + returns (uint256 totalRevenue) + { + StreamingRevenue[] storage h = _history[catalogId]; + for (uint256 i = 0; i < h.length; i++) { + if (h[i].periodStart >= periodStart && h[i].periodEnd <= periodEnd) { + totalRevenue += h[i].revenueUsd; + } + } + } + + /** + * @notice True if the latest record for `catalogId` is within + * `freshnessWindow` seconds of `block.timestamp`. + */ + function isDataFresh(bytes32 catalogId) external view returns (bool) { + if (!_hasRecord[catalogId]) return false; + return block.timestamp - _latest[catalogId].lastUpdated <= freshnessWindow; + } + + /** + * @notice Off-chain indexers can pre-filter their watch list by listening + * for `SubscribedToUpdates`. Pure on-chain bookkeeping — no + * callback, no storage write — so subscribing is gas-cheap and + * carries no economic commitment. + */ + function subscribeToUpdates(bytes32 catalogId) external { + if (catalogId == bytes32(0)) revert CatalogIdRequired(); + emit SubscribedToUpdates(catalogId, msg.sender); + } + + // ----------------------------------------------------------------------- + // Introspection + // ----------------------------------------------------------------------- + + function hasRecord(bytes32 catalogId) external view returns (bool) { + return _hasRecord[catalogId]; + } + + function historyLength(bytes32 catalogId) external view returns (uint256) { + return _history[catalogId].length; + } + + function historyAt(bytes32 catalogId, uint256 idx) external view returns (StreamingRevenue memory) { + StreamingRevenue[] storage h = _history[catalogId]; + if (idx >= h.length) revert HistoryIndexOutOfBounds(idx, h.length); + return h[idx]; + } + + function dspCount() external view returns (uint256) { + return _dspIds.length; + } + + function dspIdAt(uint256 idx) external view returns (bytes32) { + return _dspIds[idx]; + } + + function getDSP(bytes32 dspId) external view returns (DSPInfo memory) { + return _dsps[dspId]; + } + + function isDSPRegistered(bytes32 dspId) external view returns (bool) { + return _dspRegistered[dspId]; + } +} diff --git a/test/MuzixStreamingOracle.t.sol b/test/MuzixStreamingOracle.t.sol new file mode 100644 index 00000000..4cac6f2d --- /dev/null +++ b/test/MuzixStreamingOracle.t.sol @@ -0,0 +1,401 @@ +// SPDX-License-Identifier: MIT +pragma solidity ^0.8.20; + +import "forge-std/Test.sol"; +import "../src/MuzixStreamingOracle.sol"; +import {Ownable} from "@openzeppelin/contracts/access/Ownable.sol"; +import {Pausable} from "@openzeppelin/contracts/utils/Pausable.sol"; + +contract MuzixStreamingOracleTest is Test { + MuzixStreamingOracle internal oracle; + + address internal owner = address(0xA11CE); + address internal pusher = address(0xBEEF); + address internal stranger = address(0xC0DE); + address internal subscriber = address(0xD00D); + + bytes32 internal constant SPOTIFY = keccak256("spotify"); + bytes32 internal constant APPLE = keccak256("apple_music"); + bytes32 internal constant CATALOG_A = keccak256("catalog/A"); + + uint256 internal periodStart; + uint256 internal periodEnd; + + function setUp() public { + // Warp far enough that we can submit "past" periods that start at + // non-trivial timestamps without underflow. + vm.warp(1_900_000_000); + periodStart = block.timestamp - 30 days; + periodEnd = block.timestamp - 1 days; + + vm.prank(owner); + oracle = new MuzixStreamingOracle(owner); + + vm.startPrank(owner); + oracle.registerDSP(SPOTIFY, "Spotify", 5000, 0); // 0 ⇒ default min confidence (7500) + oracle.registerDSP(APPLE, "Apple Music", 3000, 8000); // explicit override + oracle.setPusher(pusher, true); + vm.stopPrank(); + } + + // --------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------- + + function _rec(bytes32 dsp, uint256 revenueUsd, uint256 confidence) + internal + view + returns (MuzixStreamingOracle.StreamingRevenue memory r) + { + r.catalogId = CATALOG_A; + r.dspId = dsp; + r.totalStreams = 1_000_000; + r.revenueUsd = revenueUsd; + r.periodStart = periodStart; + r.periodEnd = periodEnd; + r.territoryHash = keccak256("US,UK,DE"); + r.dataSourceHash = keccak256("source-hash-v1"); + r.lastUpdated = 0; // overwritten on-chain + r.confidenceScore = confidence; + } + + // --------------------------------------------------------------------- + // Constructor + // --------------------------------------------------------------------- + + function testConstructorRejectsZeroOwner() public { + vm.expectRevert( + abi.encodeWithSelector(Ownable.OwnableInvalidOwner.selector, address(0)) + ); + new MuzixStreamingOracle(address(0)); + } + + function testConstructorSetsDefaults() public view { + assertEq(oracle.freshnessWindow(), oracle.DEFAULT_FRESHNESS_WINDOW()); + assertEq(oracle.owner(), owner); + } + + // --------------------------------------------------------------------- + // DSP registry + // --------------------------------------------------------------------- + + function testRegisterDSPStoresRecord() public view { + MuzixStreamingOracle.DSPInfo memory info = oracle.getDSP(SPOTIFY); + assertEq(info.dspId, SPOTIFY); + assertEq(info.name, "Spotify"); + assertEq(info.weight, 5000); + assertTrue(info.isActive); + assertEq(info.minConfidenceScore, 0); + assertEq(oracle.dspCount(), 2); + assertEq(oracle.dspIdAt(0), SPOTIFY); + assertEq(oracle.dspIdAt(1), APPLE); + assertTrue(oracle.isDSPRegistered(SPOTIFY)); + } + + function testRegisterDSPRevertsOnZeroId() public { + vm.prank(owner); + vm.expectRevert(MuzixStreamingOracle.DspIdRequired.selector); + oracle.registerDSP(bytes32(0), "x", 0, 0); + } + + function testRegisterDSPRevertsOnDuplicate() public { + vm.prank(owner); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.DSPAlreadyRegistered.selector, SPOTIFY)); + oracle.registerDSP(SPOTIFY, "Spotify-2", 1, 0); + } + + function testRegisterDSPRevertsOnInvalidConfidence() public { + vm.prank(owner); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.ConfidenceOutOfRange.selector, 10001)); + oracle.registerDSP(keccak256("youtube"), "YouTube", 1, 10001); + } + + function testUpdateDSPRevertsWhenNotRegistered() public { + bytes32 unknown = keccak256("tidal"); + vm.prank(owner); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.DSPNotRegistered.selector, unknown)); + oracle.updateDSP(unknown, 1, true, 0); + } + + function testUpdateDSPMutatesFields() public { + vm.prank(owner); + oracle.updateDSP(SPOTIFY, 7000, false, 9000); + + MuzixStreamingOracle.DSPInfo memory info = oracle.getDSP(SPOTIFY); + assertEq(info.weight, 7000); + assertFalse(info.isActive); + assertEq(info.minConfidenceScore, 9000); + } + + // --------------------------------------------------------------------- + // Pushers / circuit breaker / access control + // --------------------------------------------------------------------- + + function testRegisterDSPRevertsForNonOwner() public { + vm.prank(stranger); + vm.expectRevert(abi.encodeWithSelector(Ownable.OwnableUnauthorizedAccount.selector, stranger)); + oracle.registerDSP(keccak256("amazon"), "Amazon", 1, 0); + } + + function testSetPusherRevertsForNonOwner() public { + vm.prank(stranger); + vm.expectRevert(abi.encodeWithSelector(Ownable.OwnableUnauthorizedAccount.selector, stranger)); + oracle.setPusher(stranger, true); + } + + function testSetPusherZeroAddress() public { + vm.prank(owner); + vm.expectRevert(MuzixStreamingOracle.ZeroAddress.selector); + oracle.setPusher(address(0), true); + } + + function testSubmitRevertsForUnauthorized() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1_000_000, 8000); + vm.prank(stranger); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.NotPusher.selector, stranger)); + oracle.submitRevenue(r); + } + + function testPauseBlocksSubmission() public { + vm.prank(owner); + oracle.pause(); + + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1_000_000, 8000); + vm.prank(pusher); + vm.expectRevert(Pausable.EnforcedPause.selector); + oracle.submitRevenue(r); + + vm.prank(owner); + oracle.unpause(); + + vm.prank(pusher); + oracle.submitRevenue(r); // succeeds after unpause + assertTrue(oracle.hasRecord(CATALOG_A)); + } + + // --------------------------------------------------------------------- + // Submission validation + // --------------------------------------------------------------------- + + function testSubmitRevenueHappyPath() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1_234_000_000, 9500); + + vm.prank(pusher); + oracle.submitRevenue(r); + + MuzixStreamingOracle.StreamingRevenue memory latest = oracle.getLatestRevenue(CATALOG_A); + assertEq(latest.revenueUsd, 1_234_000_000); + assertEq(latest.confidenceScore, 9500); + assertEq(latest.dspId, SPOTIFY); + assertEq(latest.lastUpdated, block.timestamp); + + MuzixStreamingOracle.StreamingRevenue memory byDsp = oracle.getLatestRevenueByDsp(CATALOG_A, SPOTIFY); + assertEq(byDsp.revenueUsd, 1_234_000_000); + + assertEq(oracle.historyLength(CATALOG_A), 1); + assertEq(oracle.historyAt(CATALOG_A, 0).revenueUsd, 1_234_000_000); + assertTrue(oracle.hasRecord(CATALOG_A)); + assertTrue(oracle.isDataFresh(CATALOG_A)); + } + + function testSubmitRevertsOnUnregisteredDSP() public { + bytes32 unknown = keccak256("tidal"); + MuzixStreamingOracle.StreamingRevenue memory r = _rec(unknown, 1, 9500); + vm.prank(pusher); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.DSPNotRegistered.selector, unknown)); + oracle.submitRevenue(r); + } + + function testSubmitRevertsOnInactiveDSP() public { + vm.prank(owner); + oracle.updateDSP(SPOTIFY, 5000, false, 0); + + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 9500); + vm.prank(pusher); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.DSPInactive.selector, SPOTIFY)); + oracle.submitRevenue(r); + } + + function testSubmitRevertsOnZeroCatalog() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 9500); + r.catalogId = bytes32(0); + vm.prank(pusher); + vm.expectRevert(MuzixStreamingOracle.CatalogIdRequired.selector); + oracle.submitRevenue(r); + } + + function testSubmitRevertsOnZeroDsp() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 9500); + r.dspId = bytes32(0); + vm.prank(pusher); + vm.expectRevert(MuzixStreamingOracle.DspIdRequired.selector); + oracle.submitRevenue(r); + } + + function testSubmitConfidenceTooLowUsesDefault() public { + // Spotify has minConfidenceScore == 0, which falls back to DEFAULT_MIN_CONFIDENCE (7500). + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 7499); + vm.prank(pusher); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.ConfidenceTooLow.selector, 7499, 7500)); + oracle.submitRevenue(r); + } + + function testSubmitConfidenceTooLowUsesPerDspOverride() public { + // Apple has minConfidenceScore == 8000. + MuzixStreamingOracle.StreamingRevenue memory r = _rec(APPLE, 1, 7999); + vm.prank(pusher); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.ConfidenceTooLow.selector, 7999, 8000)); + oracle.submitRevenue(r); + } + + function testSubmitConfidenceOutOfRange() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 10001); + vm.prank(pusher); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.ConfidenceOutOfRange.selector, 10001)); + oracle.submitRevenue(r); + } + + function testSubmitInvalidPeriod() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 9500); + r.periodStart = periodEnd; + r.periodEnd = periodStart; + vm.prank(pusher); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.InvalidPeriod.selector, periodEnd, periodStart)); + oracle.submitRevenue(r); + } + + function testSubmitPeriodInFuture() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 9500); + r.periodEnd = block.timestamp + 1 days; + vm.prank(pusher); + vm.expectRevert( + abi.encodeWithSelector(MuzixStreamingOracle.PeriodInFuture.selector, r.periodEnd, block.timestamp) + ); + oracle.submitRevenue(r); + } + + // --------------------------------------------------------------------- + // Cooldown + // --------------------------------------------------------------------- + + function testCooldownBlocksFastResubmit() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 9500); + vm.prank(pusher); + oracle.submitRevenue(r); + + uint256 nextAllowed = block.timestamp + oracle.SUBMISSION_COOLDOWN(); + vm.warp(block.timestamp + 30 minutes); + vm.prank(pusher); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.CooldownActive.selector, pusher, nextAllowed)); + oracle.submitRevenue(r); + } + + function testCooldownLiftsAfterWindow() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 9500); + vm.prank(pusher); + oracle.submitRevenue(r); + + vm.warp(block.timestamp + 1 hours + 1); + vm.prank(pusher); + oracle.submitRevenue(r); // second submission ok + assertEq(oracle.historyLength(CATALOG_A), 2); + } + + function testBatchSharesCooldown() public { + MuzixStreamingOracle.StreamingRevenue[] memory batch = new MuzixStreamingOracle.StreamingRevenue[](2); + batch[0] = _rec(SPOTIFY, 1_000, 9500); + batch[1] = _rec(APPLE, 2_000, 8500); + + vm.prank(pusher); + oracle.batchSubmitRevenue(batch); + // Cooldown applies to subsequent calls; calling again immediately reverts. + uint256 nextAllowed = block.timestamp + oracle.SUBMISSION_COOLDOWN(); + vm.prank(pusher); + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.CooldownActive.selector, pusher, nextAllowed)); + oracle.batchSubmitRevenue(batch); + } + + function testBatchEmptyReverts() public { + MuzixStreamingOracle.StreamingRevenue[] memory empty; + vm.prank(pusher); + vm.expectRevert(MuzixStreamingOracle.EmptyBatch.selector); + oracle.batchSubmitRevenue(empty); + } + + function testBatchUpdatesAllPointers() public { + MuzixStreamingOracle.StreamingRevenue[] memory batch = new MuzixStreamingOracle.StreamingRevenue[](2); + batch[0] = _rec(SPOTIFY, 1_000, 9500); + batch[1] = _rec(APPLE, 2_000, 8500); + + vm.prank(pusher); + oracle.batchSubmitRevenue(batch); + + assertEq(oracle.getLatestRevenueByDsp(CATALOG_A, SPOTIFY).revenueUsd, 1_000); + assertEq(oracle.getLatestRevenueByDsp(CATALOG_A, APPLE).revenueUsd, 2_000); + // Latest-across is the last one submitted in the batch. + assertEq(oracle.getLatestRevenue(CATALOG_A).dspId, APPLE); + assertEq(oracle.historyLength(CATALOG_A), 2); + } + + // --------------------------------------------------------------------- + // Period sum, freshness, subscription + // --------------------------------------------------------------------- + + function testGetRevenueForPeriodSumsMatching() public { + MuzixStreamingOracle.StreamingRevenue memory inside = _rec(SPOTIFY, 1_000, 9500); + MuzixStreamingOracle.StreamingRevenue memory outside = _rec(APPLE, 9_999, 9500); + outside.periodStart = periodStart - 60 days; // fully before window — should not count + outside.periodEnd = periodStart - 30 days; + + MuzixStreamingOracle.StreamingRevenue[] memory batch = new MuzixStreamingOracle.StreamingRevenue[](2); + batch[0] = inside; + batch[1] = outside; + + vm.prank(pusher); + oracle.batchSubmitRevenue(batch); + + uint256 sum = oracle.getRevenueForPeriod(CATALOG_A, periodStart - 1, periodEnd + 1); + assertEq(sum, 1_000); + } + + function testIsDataFreshFalseAfterStale() public { + MuzixStreamingOracle.StreamingRevenue memory r = _rec(SPOTIFY, 1, 9500); + vm.prank(pusher); + oracle.submitRevenue(r); + assertTrue(oracle.isDataFresh(CATALOG_A)); + + vm.warp(block.timestamp + oracle.DEFAULT_FRESHNESS_WINDOW() + 1); + assertFalse(oracle.isDataFresh(CATALOG_A)); + } + + function testIsDataFreshFalseWhenNoRecord() public view { + assertFalse(oracle.isDataFresh(keccak256("never-seen"))); + } + + function testSetFreshnessWindowAdminOnly() public { + vm.prank(stranger); + vm.expectRevert(abi.encodeWithSelector(Ownable.OwnableUnauthorizedAccount.selector, stranger)); + oracle.setFreshnessWindow(1 days); + + vm.prank(owner); + oracle.setFreshnessWindow(7 days); + assertEq(oracle.freshnessWindow(), 7 days); + } + + function testSubscribeEmitsEvent() public { + vm.expectEmit(true, true, false, true); + emit MuzixStreamingOracle.SubscribedToUpdates(CATALOG_A, subscriber); + vm.prank(subscriber); + oracle.subscribeToUpdates(CATALOG_A); + } + + function testSubscribeRevertsOnZeroCatalog() public { + vm.expectRevert(MuzixStreamingOracle.CatalogIdRequired.selector); + oracle.subscribeToUpdates(bytes32(0)); + } + + function testHistoryAtOutOfBounds() public { + vm.expectRevert(abi.encodeWithSelector(MuzixStreamingOracle.HistoryIndexOutOfBounds.selector, 0, 0)); + oracle.historyAt(CATALOG_A, 0); + } +}