From 9b0368a568f04019af163f15bc9b481b824e1a58 Mon Sep 17 00:00:00 2001 From: kayomn Date: Tue, 12 Aug 2025 23:40:21 +0100 Subject: [PATCH] Add priorities to mutl-threaded task schedule --- src/coral/Tasks.zig | 103 ++++++++++++++ src/coral/Tasks/Queue.zig | 117 +++++++++++++++ src/coral/asio.zig | 275 ------------------------------------ src/coral/coral.zig | 49 ++++++- src/coral/stack.zig | 10 +- src/coral/tree.zig | 66 ++++----- src/demos/graphics.zig | 66 ++++++++- src/ona/App.zig | 8 +- src/ona/App/SystemGraph.zig | 66 +++++---- src/ona/Assets.zig | 133 ----------------- src/ona/asset.zig | 84 +++++++++++ src/ona/ona.zig | 123 +++------------- 12 files changed, 510 insertions(+), 590 deletions(-) create mode 100644 src/coral/Tasks.zig create mode 100644 src/coral/Tasks/Queue.zig delete mode 100644 src/coral/asio.zig delete mode 100644 src/ona/Assets.zig create mode 100644 src/ona/asset.zig diff --git a/src/coral/Tasks.zig b/src/coral/Tasks.zig new file mode 100644 index 0000000..036e197 --- /dev/null +++ b/src/coral/Tasks.zig @@ -0,0 +1,103 @@ +const Queue = @import("./Tasks/Queue.zig"); + +const builtin = @import("builtin"); + +const coral = @import("./coral.zig"); + +const std = @import("std"); + +prioritised_tasks: [priorities.len]?*Queue = [_]?*Queue{null} ** priorities.len, + +pub const Priority = enum { + high_priority, + low_priority, + background, + + fn getThreadCount(self: Priority, cpu_count: usize) usize { + const cpu_share: f64 = @floatFromInt(cpu_count); + + return @intFromFloat(switch (self) { + .high_priority => cpu_share * 0.375, + .low_priority => cpu_share * 0.25, + .background => cpu_share * 0.125, + }); + } +}; + +const Self = @This(); + +pub fn create(self: *Self, allocator: std.mem.Allocator, priority: Priority, task: anytype) std.mem.Allocator.Error!*@TypeOf(task) { + const Task = @TypeOf(task); + + const run_fn = coral.meta.hasFn(Task, "run") orelse { + @compileError(std.fmt.comptimePrint("{s} requires a .run fn to be a valid task type", .{@typeName(Task)})); + }; + + if (run_fn.return_type.? != void) { + @compileError(std.fmt.comptimePrint("{s}.run fn must return a void type", .{@typeName(Task)})); + } + + if (run_fn.params.len != 1 or run_fn.params[0].type != *Task) { + @compileError(std.fmt.comptimePrint("{s}.run fn must accept a {s} as it's one and only parameter, not {s}", .{ + @typeName(Task), + @typeName(*Task), + if (run_fn.params[0].type) |Type| @typeName(Type) else "anytype", + })); + } + + const created_task = try allocator.create(Task); + + created_task.* = task; + + if (self.prioritised_tasks[@intFromEnum(priority)]) |tasks| { + tasks.enqueue(.initRef(created_task, Task.run)); + } else { + created_task.run(); + } + + return created_task; +} + +pub fn finish(self: *Self, priority: Priority) void { + if (self.prioritised_tasks[@intFromEnum(priority)]) |tasks| { + tasks.finish(); + } +} + +const priorities = std.enums.values(Priority); + +pub fn start(self: *Self) std.Thread.SpawnError!void { + self.* = .{}; + + if (!builtin.single_threaded) { + if (std.Thread.getCpuCount()) |cpu_count| { + errdefer { + self.stop(); + } + + inline for (priorities, &self.prioritised_tasks) |priority, *has_tasks| { + const thread_count = priority.getThreadCount(cpu_count); + + if (thread_count != 0) { + has_tasks.* = try Queue.spawn(thread_count); + } + } + } else |cpu_count_error| { + std.log.warn("Failed to get number of CPU cores available: {s}", .{@errorName(cpu_count_error)}); + } + } +} + +pub fn stop(self: *Self) void { + inline for (&self.prioritised_tasks) |*has_tasks| { + if (has_tasks.*) |tasks| { + tasks.stop(); + } + + has_tasks.* = null; + } + + for (priorities) |priority| { + self.finish(priority); + } +} diff --git a/src/coral/Tasks/Queue.zig b/src/coral/Tasks/Queue.zig new file mode 100644 index 0000000..f38372a --- /dev/null +++ b/src/coral/Tasks/Queue.zig @@ -0,0 +1,117 @@ +const coral = @import("../coral.zig"); + +const std = @import("std"); + +threads_active: std.atomic.Value(usize) = .init(0), +buffer: [max]Runnable = undefined, +head: usize = 0, +tail: usize = 0, +access: std.Thread.Mutex = .{}, +not_empty: std.Thread.Condition = .{}, +not_full: std.Thread.Condition = .{}, +waiting: std.Thread.WaitGroup = .{}, + +const Runnable = coral.Callable(void, &.{}); + +const Self = @This(); + +pub fn dequeue(self: *Self) Runnable { + self.access.lock(); + + defer { + self.access.unlock(); + } + + while (self.head == self.tail) { + self.not_empty.wait(&self.access); + } + + const item = self.buffer[self.head]; + const wrapped_head = (self.head + 1) % max; + + self.head = wrapped_head; + + self.not_full.signal(); + + return item; +} + +pub fn enqueue(self: *Self, item: Runnable) void { + self.access.lock(); + + defer { + self.access.unlock(); + } + + const wrapped_tail = (self.tail + 1) % max; + + while (wrapped_tail == self.head) { + self.not_full.wait(&self.access); + } + + self.waiting.start(); + + self.buffer[self.tail] = item; + self.tail = wrapped_tail; + + self.not_empty.signal(); +} + +pub fn finish(self: *Self) void { + self.waiting.wait(); + self.waiting.reset(); +} + +fn main(self: *Self) void { + _ = self.threads_active.fetchAdd(1, .monotonic); + + while (true) { + const runner = self.dequeue(); + + if (runner.isFn(poisonPill)) { + break; + } + + runner.call(.{}); + self.waiting.finish(); + } + + const previous_threads_active = self.threads_active.fetchSub(1, .monotonic); + + std.debug.assert(previous_threads_active != 0); + + if (previous_threads_active == 1) { + coral.heap.allocator.destroy(self); + } +} + +const max = 512; + +fn poisonPill() void {} + +pub fn stop(self: *Self) void { + var threads_remaining = self.threads_active.load(.monotonic); + const poison_pill = Runnable.initFn(poisonPill); + + while (threads_remaining != 0) : (threads_remaining -= 1) { + self.enqueue(poison_pill); + } +} + +pub fn spawn(thread_count: usize) std.Thread.SpawnError!*Self { + if (thread_count == 0) { + return error.Unexpected; + } + + const self = try coral.heap.allocator.create(Self); + + self.* = .{}; + + var threads_spawned: usize = 0; + + while (threads_spawned < thread_count) : (threads_spawned += 1) { + (try std.Thread.spawn(.{}, main, .{self})).detach(); + } + + return self; +} diff --git a/src/coral/asio.zig b/src/coral/asio.zig deleted file mode 100644 index 0b2e72a..0000000 --- a/src/coral/asio.zig +++ /dev/null @@ -1,275 +0,0 @@ -const builtin = @import("builtin"); - -const coral = @import("./coral.zig"); - -const std = @import("std"); - -pub fn CallTask(comptime function: anytype) type { - const Function = @TypeOf(function); - - const function_fn = switch (@typeInfo(Function)) { - .@"fn" => |@"fn"| @"fn", - else => @compileError("expeceted param `exec` to be an fn type"), - }; - - const Args = std.meta.ArgsTuple(Function); - - return struct { - args: Args, - returned: Future(function_fn.return_type.?), - - const Self = @This(); - - pub fn init(args: Args) Self { - return .{ - .args = args, - .returned = .empty, - }; - } - - pub fn run(self: *Self) void { - std.debug.assert(self.returned.resolve(@call(.auto, function, self.args))); - } - }; -} - -pub fn BlockingQueue(comptime max: usize, comptime T: type) type { - const circular_max = max + 1; - - return struct { - buffer: [circular_max]T, - head: usize, - tail: usize, - access: std.Thread.Mutex, - not_empty: std.Thread.Condition, - not_full: std.Thread.Condition, - - const Self = @This(); - - pub fn dequeue(self: *Self) T { - self.access.lock(); - - defer { - self.access.unlock(); - } - - while (self.head == self.tail) { - self.not_empty.wait(&self.access); - } - - const item = self.buffer[self.head]; - const wrapped_head = (self.head + 1) % circular_max; - - self.head = wrapped_head; - - self.not_full.signal(); - - return item; - } - - pub const empty = Self{ - .buffer = undefined, - .head = 0, - .tail = 0, - .access = .{}, - .not_empty = .{}, - .not_full = .{}, - }; - - pub fn enqueue(self: *Self, item: T) void { - self.access.lock(); - - defer { - self.access.unlock(); - } - - const wrapped_tail = (self.tail + 1) % circular_max; - - while (wrapped_tail == self.head) { - self.not_full.wait(&self.access); - } - - self.buffer[self.tail] = item; - self.tail = wrapped_tail; - - self.not_empty.signal(); - } - }; -} - -pub const Event = Future(void); - -pub fn Future(comptime Payload: type) type { - return struct { - payload: ?Payload, - resolved: std.Thread.ResetEvent, - - const Self = @This(); - - pub const empty = Self{ - .payload = null, - .resolved = .{}, - }; - - pub fn get(self: *Self) *Payload { - // TODO: Make async. - self.resolved.wait(); - - return &self.payload.?; - } - - pub fn resolve(self: *Self, payload: Payload) bool { - if (self.resolved.isSet()) { - return false; - } - - self.payload = payload; - - self.resolved.set(); - - return true; - } - }; -} - -pub const TaskQueue = struct { - threads: []std.Thread, - arena: std.heap.ArenaAllocator, - tasks: *Buffer, - - const Buffer = struct { - waiting: std.Thread.WaitGroup, - pending: BlockingQueue(512, Runnable), - - fn consume(self: *Buffer) void { - while (true) { - const runner = self.pending.dequeue(); - - if (runner.isFn(poison_pill)) { - break; - } - - runner.call(.{}); - self.waiting.finish(); - } - } - - fn poison(self: *Buffer, threads: []const std.Thread) void { - var threads_remaining = threads.len; - - while (threads_remaining != 0) : (threads_remaining -= 1) { - self.pending.enqueue(.initFn(poison_pill)); - } - - for (threads) |thread| { - thread.join(); - } - } - - fn poison_pill() void {} - }; - - const Runnable = coral.Callable(void, &.{}); - - pub fn create(self: *TaskQueue, task: anytype) std.mem.Allocator.Error!*const @TypeOf(task) { - const Task = @TypeOf(task); - - switch (@hasDecl(Task, "run")) { - true => switch (@TypeOf(Task.run)) { - fn (*Task) void, fn (*const Task) void => {}, - else => @compileError("param `task` must contain a `run` declaration"), - }, - - else => @compileError("`run` declaration on param `task` must be an fn type"), - } - - switch (builtin.single_threaded) { - true => { - Runnable.initRef(&task, task.run).call(.{}); - - return &task; - }, - - else => { - const allocated_task = try self.arena.allocator().create(Task); - - allocated_task.* = task; - - self.tasks.waiting.start(); - self.tasks.pending.enqueue(.initRef(allocated_task, Task.run)); - - return allocated_task; - }, - } - } - - pub fn execute(self: *TaskQueue, task: anytype) std.mem.Allocator.Error!void { - _ = try self.create(task); - } - - pub fn deinit(self: *TaskQueue) void { - self.tasks.waiting.wait(); - self.tasks.poison(self.threads); - self.arena.deinit(); - coral.heap.allocator.destroy(self.tasks); - coral.heap.allocator.free(self.threads); - - self.* = undefined; - } - - pub fn finish(self: *TaskQueue) void { - self.tasks.waiting.wait(); - self.tasks.waiting.reset(); - - _ = self.arena.reset(.retain_capacity); - } - - pub fn init(thread_count: usize) (std.mem.Allocator.Error || std.Thread.SpawnError)!TaskQueue { - const tasks = try coral.heap.allocator.create(Buffer); - - errdefer { - coral.heap.allocator.destroy(tasks); - } - - tasks.* = .{ - .pending = .empty, - .waiting = .{}, - }; - - const threads = try coral.heap.allocator.alloc(std.Thread, if (builtin.single_threaded) 0 else thread_count); - var threads_spawned: usize = 0; - - errdefer { - tasks.poison(threads[0..threads_spawned]); - coral.heap.allocator.free(threads); - } - - for (threads) |*thread| { - thread.* = try .spawn(.{}, Buffer.consume, .{tasks}); - threads_spawned += 1; - } - - std.debug.assert(threads_spawned == thread_count); - - for (threads) |thread| { - thread.setName("ona worker") catch |set_name_error| { - switch (set_name_error) { - error.Unsupported, error.NameTooLong => break, - else => continue, - } - }; - } - - return .{ - .arena = .init(coral.heap.allocator), - .tasks = tasks, - .threads = threads, - }; - } -}; - -pub fn wait_all(comptime Payload: type, futures: []const *Future(Payload)) void { - for (futures) |future| { - _ = future.get(); - } -} diff --git a/src/coral/coral.zig b/src/coral/coral.zig index 8781ad4..2a2a20b 100644 --- a/src/coral/coral.zig +++ b/src/coral/coral.zig @@ -1,8 +1,8 @@ pub const Box = @import("./Box.zig"); -pub const TypeId = @import("./TypeId.zig"); +const Tasks = @import("./Tasks.zig"); -pub const asio = @import("./asio.zig"); +pub const TypeId = @import("./TypeId.zig"); pub const bytes = @import("./bytes.zig"); @@ -24,6 +24,49 @@ pub const tree = @import("./tree.zig"); pub const utf8 = @import("./utf8.zig"); +pub fn CallTask(comptime function: anytype) type { + const Function = @TypeOf(function); + + const function_fn = switch (@typeInfo(Function)) { + .@"fn" => |@"fn"| @"fn", + else => @compileError("expeceted param `function` to be an fn type"), + }; + + return struct { + args: std.meta.ArgsTuple(Function), + payload: Return = undefined, + resolved: std.Thread.ResetEvent = .{}, + + const Return = function_fn.return_type.?; + + const Self = @This(); + + pub fn get(self: *const Self) Return { + std.debug.assert(self.resolved.isSet()); + self.resolved.wait(); + self.resolved.reset(); + + return self.payload; + } + + pub fn resolve(self: *Self, payload: Return) bool { + if (self.resolved.isSet()) { + return false; + } + + self.payload = payload; + + self.resolved.set(); + + return true; + } + + pub fn run(self: *Self) void { + std.debug.assert(self.resolve(@call(.auto, function, self.args))); + } + }; +} + pub fn Callable(comptime Output: type, comptime input_types: []const type) type { const InputTuple = std.meta.Tuple(input_types); @@ -135,3 +178,5 @@ pub fn expect(function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) me else => @call(.auto, function, args), }; } + +pub var tasks = Tasks{}; diff --git a/src/coral/stack.zig b/src/coral/stack.zig index ad41509..f49b8a2 100644 --- a/src/coral/stack.zig +++ b/src/coral/stack.zig @@ -1,8 +1,8 @@ const std = @import("std"); -fn Generic(comptime Item: type, comptime Buffer: type) type { +fn Generic(comptime Item: type, comptime Storage: type) type { return struct { - items: Buffer, + items: Storage, const Self = @This(); @@ -217,8 +217,10 @@ pub fn Sequential(comptime Item: type) type { return error.OutOfMemory; }; - self.ptr = (try allocator.realloc(self.ptr[0..self.cap], new_cap)).ptr; - self.cap = @intCast(new_cap); + const reallocation = try allocator.realloc(self.ptr[0..self.cap], new_cap); + + self.ptr = reallocation.ptr; + self.cap = new_cap; } pub fn set(self: *Self, index: usize, value: Item) bool { diff --git a/src/coral/tree.zig b/src/coral/tree.zig index 0a2edc0..756be79 100644 --- a/src/coral/tree.zig +++ b/src/coral/tree.zig @@ -50,8 +50,8 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits( fn getMax(self: *Node) *Node { var node = self; - while (self.has_lesser) |lesser| { - node = lesser; + while (node.has_greater) |greater| { + node = greater; } return node; @@ -60,7 +60,7 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits( fn getMin(self: *Node) *Node { var node = self; - while (self.has_lesser) |lesser| { + while (node.has_lesser) |lesser| { node = lesser; } @@ -133,40 +133,36 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits( nodes: ?*Node, pub fn next(self: *KeyValues) ?coral.KeyValuePair(Key, *Value) { - var nodes = self.nodes; + const current_node = self.nodes orelse { + return null; + }; - while (nodes) |node| { - const left = node.has_lesser orelse { - self.nodes = node.has_greater; + var successor_node: ?*Node = null; - return .{ - .key = node.key, - .value = &node.value, - }; - }; + if (current_node.has_greater) |greater| { + successor_node = greater.getMin(); + } else { + // Climb until at a left-child. + var search_node = current_node; + var has_parent = search_node.has_parent; - // Find the rightmost node in left subtree or link back to current - var pred = left; - while (pred.has_greater != null and pred.has_greater != node) { - pred = pred.has_greater.?; + while (has_parent) |parent| { + if (parent.has_lesser == search_node) { + successor_node = parent; + break; + } + + search_node = parent; + has_parent = parent.has_parent; } - - if (pred.has_greater != null) { - pred.has_greater = null; - self.nodes = node.has_greater; - - return .{ - .key = node.key, - .value = &node.value, - }; - } - - pred.has_greater = node; - self.nodes = node.has_lesser; - nodes = self.nodes; } - return null; + self.nodes = successor_node; + + return .{ + .key = current_node.key, + .value = ¤t_node.value, + }; } pub fn nextKey(self: *KeyValues) ?Key { @@ -310,7 +306,7 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits( pub fn getKey(self: Self, key: Key) ?Key { if (self.has_root) |root| { if (root.find(key)) |node| { - return &node.key; + return node.key; } } @@ -322,7 +318,11 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits( } pub fn keyValues(self: *const Self) KeyValues { - return .{ .nodes = self.has_root }; + if (self.has_root) |root| { + return .{ .nodes = root.getMin() }; + } + + return .{ .nodes = null }; } pub fn remove(self: *Self, key: Key) ?coral.KeyValuePair(Key, Value) { diff --git a/src/demos/graphics.zig b/src/demos/graphics.zig index 0e22cd5..8f61346 100644 --- a/src/demos/graphics.zig +++ b/src/demos/graphics.zig @@ -2,14 +2,72 @@ const ona = @import("ona"); const std = @import("std"); +const CRT = extern struct { + width: f32, + height: f32, + time: f32, + padding: [4]u8 = undefined, +}; + +const Effects = struct { + render_texture: ona.AssetHandle = .default, + image_textures: [2]ona.AssetHandle = [_]ona.AssetHandle{.default} ** 2, + last_time: f64 = 0, + image_index: usize = 0, + crt_effect: ona.AssetHandle = .default, +}; + pub fn main() void { ona.realtime_app .with(.initModule(ona.hid)) .with(.initModule(ona.gfx)) - .with(.initSystem(.render, .of(render))) + // .with(.initSystem(.render, .of(render))) .run(); } -fn render(commands: ona.gfx.Commands) void { - _ = commands; -} +// fn update(effects: ona.Write(Effects), loop: ona.Read(ona.Loop)) void { +// const update_seconds = 5; + +// if ((loop.state.elapsed_time - effects.state.last_time) > update_seconds) { +// effects.state.image_index = (effects.state.image_index + 1) % effects.state.image_textures.len; +// effects.state.last_time = loop.state.elapsed_time; +// } +// } + +// fn render(commands: ona.gfx.Commands) void { +// try commands.setTarget(.{ +// .texture = effects.state.render_texture, +// .clear_color = gfx.colors.black, +// .clear_depth = 0, +// .clear_stencil = 0, +// }); + +// const width: f32 = @floatFromInt(display.state.width); +// const height: f32 = @floatFromInt(display.state.height); + +// try commands.draw_texture(.{ +// .texture = effects.state.image_textures[effects.state.image_index], +// .size = .{ width, height }, +// }); + +// try commands.set_effect(.{ +// .effect = effects.state.crt_effect, + +// .properties = std.mem.asBytes(&CRT{ +// .width = width, +// .height = height, +// .time = @floatCast(loop.state.elapsed_time), +// }), +// }); + +// try commands.set_target(.{ +// .clear_color = null, +// .clear_depth = null, +// .clear_stencil = null, +// }); + +// try commands.draw_texture(.{ +// .texture = effects.state.render_texture, +// .size = .{ width, height }, +// }); +// } diff --git a/src/ona/App.zig b/src/ona/App.zig index 9ef9ea1..a3b53b1 100644 --- a/src/ona/App.zig +++ b/src/ona/App.zig @@ -82,16 +82,16 @@ pub fn init() error{OutOfMemory}!Self { pub fn on(self: *Self, comptime schedule: anytype, behavior: *const Behavior) error{OutOfMemory}!void { const schedule_name = scheduleName(schedule); - const systems = self.named_systems.get(schedule_name) orelse (try self.named_systems.insert(coral.heap.allocator, schedule_name, .empty)).?; + const systems = self.named_systems.get(schedule_name) orelse (try self.named_systems.insert(coral.heap.allocator, schedule_name, .{})).?; try systems.insert(self, behavior); } -pub fn run(self: *Self, tasks: *coral.asio.TaskQueue, comptime schedule: anytype) RunError!void { +pub fn run(self: *Self, comptime schedule: anytype) RunError!void { const schedule_name = scheduleName(schedule); - const systems = self.named_systems.get(schedule_name) orelse (try self.named_systems.insert(coral.heap.allocator, schedule_name, .empty)).?; + const systems = self.named_systems.get(schedule_name) orelse (try self.named_systems.insert(coral.heap.allocator, schedule_name, .{})).?; - try systems.run(self, tasks); + try systems.run(self); systems.applyDeferred(self); } diff --git a/src/ona/App/SystemGraph.zig b/src/ona/App/SystemGraph.zig index 0bfd098..f68afe1 100644 --- a/src/ona/App/SystemGraph.zig +++ b/src/ona/App/SystemGraph.zig @@ -4,13 +4,14 @@ const ona = @import("../ona.zig"); const std = @import("std"); -edges: Map(Edge), -processed: Map(void), -state_readers: AccessMap, -state_writers: AccessMap, -blocking_work: WorkSet, -parallel_work: WorkSet, -parallel_work_ranges: coral.stack.Sequential(usize), +edges: Map(Edge) = .empty, +processed: Map(void) = .empty, +state_readers: AccessMap = .empty, +state_writers: AccessMap = .empty, +blocking_work: WorkSet = .empty, +parallel_work: WorkSet = .empty, +parallel_work_ranges: coral.stack.Sequential(usize) = .empty, +run_arena: std.heap.ArenaAllocator = .init(coral.heap.allocator), const AccessMap = coral.tree.Binary(*const coral.TypeId, BehaviorSet, coral.tree.scalarTraits(*const coral.TypeId)); @@ -61,6 +62,7 @@ pub fn deinit(self: *Self) void { self.parallel_work.deinit(allocator); self.parallel_work_ranges.deinit(allocator); self.blocking_work.deinit(allocator); + self.run_arena.deinit(); inline for (.{ &self.edges, &self.state_readers, &self.state_writers }) |map| { var key_values = map.keyValues(); @@ -76,10 +78,10 @@ pub fn deinit(self: *Self) void { pub fn dependOnBehavior(self: *Self, app: *ona.App, dependant: *const ona.App.Behavior, dependency: *const ona.App.Behavior) std.mem.Allocator.Error!void { try self.insert(app, dependant); - const edges = self.edges.get(dependant).?; + const edge = self.edges.get(dependant).?; - if (std.mem.indexOfScalar(*const ona.App.Behavior, edges.dependencies.items.slice(), dependency) == null) { - try edges.dependencies.pushGrow(coral.heap.allocator, dependency); + if (std.mem.indexOfScalar(*const ona.App.Behavior, edge.dependencies.items.slice(), dependency) == null) { + try edge.dependencies.pushGrow(coral.heap.allocator, dependency); } } @@ -123,16 +125,6 @@ pub fn dependOnType(self: *Self, app: *ona.App, dependant: *const ona.App.Behavi } } -pub const empty = Self{ - .edges = .empty, - .processed = .empty, - .state_readers = .empty, - .state_writers = .empty, - .blocking_work = .empty, - .parallel_work = .empty, - .parallel_work_ranges = .empty, -}; - pub fn applyDeferred(self: *Self, app: *ona.App) void { for (self.parallel_work.items.slice()) |work| { work.behavior.apply(work.local_state, app); @@ -146,15 +138,16 @@ pub fn applyDeferred(self: *Self, app: *ona.App) void { pub fn insert(self: *Self, app: *ona.App, behavior: *const ona.App.Behavior) std.mem.Allocator.Error!void { self.processed.clear(); - if (try self.edges.insert(coral.heap.allocator, behavior, undefined)) |edge| { + // TODO: Refactor so partial initialisation isn't happening here anymore. + if (try self.edges.insert(coral.heap.allocator, behavior, .{ + .local_state = undefined, + .dependencies = .empty, + })) |edge| { errdefer { std.debug.assert(self.edges.remove(behavior) != null); } - edge.* = .{ - .local_state = try behavior.bind(behavior, app, self), - .dependencies = .empty, - }; + edge.local_state = try behavior.bind(behavior, app, self); } } @@ -186,7 +179,13 @@ fn process(self: *Self, behavior: *const ona.App.Behavior, edge: Edge) !Processe return processed; } -pub fn run(self: *Self, app: *const ona.App, tasks: *coral.asio.TaskQueue) RunError!void { +pub fn run(self: *Self, app: *const ona.App) RunError!void { + defer { + if (!self.run_arena.reset(.retain_capacity)) { + std.log.warn("Failed to retain run arena capacity for system graph", .{}); + } + } + if (self.processed.isEmpty()) { errdefer { self.processed.clear(); @@ -215,20 +214,19 @@ pub fn run(self: *Self, app: *const ona.App, tasks: *coral.asio.TaskQueue) RunEr } { - const parallel_work = self.parallel_work.items.slice(); - var parallel_work_offset: usize = 0; + const run_allocator = self.run_arena.allocator(); + var parallel_work = self.parallel_work.items.slice(); for (self.parallel_work_ranges.items.slice()) |parallel_work_range| { - try tasks.execute(coral.asio.CallTask(runWorkGroup).init(.{ - app, - parallel_work[parallel_work_offset .. parallel_work_offset + parallel_work_range], - })); + _ = try coral.tasks.create(run_allocator, .high_priority, coral.CallTask(runWorkGroup){ + .args = .{ app, parallel_work[0..parallel_work_range] }, + }); - parallel_work_offset += parallel_work_range; + parallel_work = parallel_work[parallel_work_range..]; } } - tasks.finish(); + coral.tasks.finish(.high_priority); runWorkGroup(app, self.blocking_work.items.slice()); } diff --git a/src/ona/Assets.zig b/src/ona/Assets.zig deleted file mode 100644 index 395f85e..0000000 --- a/src/ona/Assets.zig +++ /dev/null @@ -1,133 +0,0 @@ -const coral = @import("coral"); - -const ext = @import("ext"); - -const ona = @import("./ona.zig"); - -const std = @import("std"); - -storage: *ext.SDL_Storage, - -pub const Error = error{ - AssetNotFound, -}; - -const Instance = struct { - storage: *ext.SDL_Storage, - - pub fn deinit(self: *Instance) void { - while (!ext.SDL_StorageReady(self.storage)) { - ext.SDL_Delay(1); - } - - if (!ext.SDL_CloseStorage(self.storage)) { - std.log.warn("Failed to close title storage", .{}); - } - - self.storage = undefined; - } -}; - -const Self = @This(); - -pub fn bind(app: *ona.App) !void { - const title_storage = ext.SDL_OpenTitleStorage(null, 0) orelse { - std.log.err("Failed to open title storage", .{}); - - return error.SdlFailure; - }; - - try app.setState(Instance{ - .storage = title_storage, - }); -} - -pub fn init(instance: *Instance) Self { - return .{ - .storage = instance.storage, - }; -} - -pub fn load(self: Self, path: []const u8, allocator: std.mem.Allocator) (std.mem.Allocator.Error || Error)![]u8 { - const path_buffer = try normalize_path(path); - - while (!ext.SDL_StorageReady(self.storage)) { - ext.SDL_Delay(1); - } - - var asset_size: u64 = 0; - - if (!ext.SDL_GetStorageFileSize(self.storage, path_buffer.ptr(), &asset_size)) { - return error.AssetNotFound; - } - - const file_buffer = try allocator.alloc(u8, asset_size); - - errdefer { - allocator.free(file_buffer); - } - - if (!ext.SDL_ReadStorageFile(self.storage, path_buffer.ptr(), file_buffer.ptr, file_buffer.len)) { - return error.AssetNotFound; - } - - return file_buffer; -} - -pub fn normalize_path(data: []const u8) Error!coral.ShortString { - const separator = '/'; - var result = coral.ShortString.empty; - - for (0..data.len) |i| { - if (data[i] == separator) { - const next_i = i + 1; - - if (next_i == data.len) { - continue; - } - - if (data[next_i] == separator) { - continue; - } - } - - result.append(data[i]) catch |append_error| { - return switch (append_error) { - error.StringTooLong => error.AssetNotFound, - }; - }; - } - - if (result.isEmpty()) { - return error.AssetNotFound; - } - - return result; -} - -pub fn read(self: Self, path: []const u8, output: []u8) Error!void { - const path_buffer = try normalize_path(path); - - while (!ext.SDL_StorageReady(self.storage)) { - ext.SDL_Delay(1); - } - - if (!ext.SDL_ReadStorageFile(self.storage, path_buffer.ptr(), output.ptr, output.len)) { - return error.AssetNotFound; - } -} - -pub fn size(self: Self, path: []const u8) Error!usize { - const path_buffer = try normalize_path(path); - var asset_size: u64 = 0; - - while (!ext.SDL_StorageReady(self.storage)) { - ext.SDL_Delay(1); - } - - if (!ext.SDL_GetStorageFileSize(self.storage, path_buffer.ptr(), &asset_size)) { - return error.AssetNotFound; - } - - return asset_size; -} diff --git a/src/ona/asset.zig b/src/ona/asset.zig new file mode 100644 index 0000000..d3f9f3f --- /dev/null +++ b/src/ona/asset.zig @@ -0,0 +1,84 @@ +const coral = @import("coral"); + +const std = @import("std"); + +pub const Handle = packed struct { + index: u24, + salt: u20, + type_id: u20, +}; + +pub const Path = struct { + buffer: [256]u8, +}; + +pub fn Store(comptime Asset: type) type { + return struct { + released_handles: coral.stack.Sequential(Handle) = .empty, + + assets: coral.stack.Parallel(struct { + asset: Asset, + state: AssetState, + }) = .empty, + + const AssetState = struct { + usage: enum { vacant, reserved, occupied }, + salt: u20, + }; + + pub fn reserve(self: *Store) std.mem.Allocator.Error!Handle { + if (self.released_handles.pop()) |handle| { + const state = &self.assets.items.slice(.state)[handle.index]; + + state.usage = .reserved; + + return handle; + } + + const handle = Handle{ + .type_id = type_id, + .index = self.assets.items.len, + .salt = 1, + }; + + try self.assets.pushGrow(coral.heap.allocator, .{ + .asset = undefined, + + .state = .{ + .usage = .reserved, + .salt = handle.salt, + }, + }); + + return handle; + } + + pub fn resolve(self: *Store, reserved_handle: Handle, asset: Asset) bool { + const state = self.assets.items.slice(.state)[reserved_handle.index]; + + if (state.usage != .reserved) { + return false; + } + + if (reserved_handle.salt != state.salt) { + return false; + } + + self.assets.items.slice(.asset)[reserved_handle.index] = asset; + + return true; + } + + const type_id = generateTypeId(); + }; +} + +fn generateTypeId() error{Overflow}!u20 { + const id = struct { + var last_assigned: u20 = 0; + }; + + id.last_assigned = try std.math.add(u20, id.last_assigned, 1); + + return id.last_assigned; +} diff --git a/src/ona/ona.zig b/src/ona/ona.zig index 5ceb398..eb8febf 100644 --- a/src/ona/ona.zig +++ b/src/ona/ona.zig @@ -1,5 +1,7 @@ pub const App = @import("./App.zig"); +const asset = @import("./asset.zig"); + const coral = @import("coral"); pub const gfx = @import("./gfx.zig"); @@ -8,116 +10,36 @@ pub const hid = @import("./hid.zig"); const std = @import("std"); -pub const AssetPath = struct { u32 }; +pub const AssetHandle = asset.Handle; + +pub const AssetPath = asset.Path; pub fn Assets(comptime Asset: type) type { + const Store = asset.Store(Asset); + return struct { - queue: *Queue, - store: *Store, - - pub const Handle = packed struct { - index: u32, - salt: u32, - }; - - const Queue = coral.stack.Sequential(struct { - asset_path: AssetPath, - reserved_handle: Handle, - }); + cached: *Store, const Self = @This(); - const State = struct { - queue: Queue, - }; - - pub const Store = struct { - released_handles: coral.stack.Sequential(Handle) = .empty, - - assets: coral.stack.Parallel(struct { - asset: Asset, - state: AssetState, - }) = .empty, - - const AssetState = struct { - usage: enum(u32) { vacant, reserved, occupied }, - salt: u32, - }; - - pub fn reserve(self: *Store) std.mem.Allocator.Error!Handle { - if (self.released_handles.pop()) |handle| { - const state = &self.assets.items.slice(.state)[handle.index]; - - std.debug.assert(state.usage == .vacant); - std.debug.assert(state.salt == handle.salt); - - state.usage = .reserved; - - return handle; - } - - const handle = Handle{ - .index = self.assets.items.len, - .salt = 1, - }; - - try self.assets.pushGrow(coral.heap.allocator, .{ - .asset = undefined, - - .state = .{ - .usage = .reserved, - .salt = handle.salt, - }, - }); - - return handle; - } - - pub fn resolve(self: *Store, reserved_handle: Handle, asset: Asset) bool { - const state = self.assets.items.slice(.state)[reserved_handle.index]; - - if (state.usage != .reserved) { - return false; - } - - if (reserved_handle.salt != state.salt) { - return false; - } - - self.assets.items.slice(.asset)[reserved_handle.index] = asset; - - return true; - } - }; - - pub fn bind() State { - return .{ - .queue = .empty, - }; - } - - pub fn init(state: *State, store: *Store) Self { + pub fn init(store: *Store) Self { return .{ .store = store, - .queue = &state.queue, }; } - pub fn insert(self: Self, asset: Asset) std.mem.Allocator.Error!Handle { + pub fn insert(self: Self, uncached_asset: Asset) std.mem.Allocator.Error!AssetHandle { const reserved_handle = try self.store.reserve(); - std.debug.assert(self.store.resolve(reserved_handle, asset)); + std.debug.assert(self.store.resolve(reserved_handle, uncached_asset)); return reserved_handle; } - pub fn load(self: Self, path: AssetPath) std.mem.Allocator.Error!Handle { + pub fn load(self: Self, load_path: AssetPath) std.mem.Allocator.Error!AssetHandle { const reserved_handle = try self.store.reserve(); - try self.queue.pushGrow(coral.heap.allocator, .{ - .asset_path = path, - .reserved_handle = reserved_handle, - }); + _ = load_path; return reserved_handle; } @@ -311,7 +233,7 @@ pub fn Write(comptime State: type) type { pub const realtime_app = App.Setup.init(run_realtime_loop); pub fn registerAsset(app: *App, comptime Asset: type) std.mem.Allocator.Error!void { - const AssetStore = Assets(Asset).Store; + const AssetStore = asset.Store(Asset); try app.setState(AssetStore{}); } @@ -331,14 +253,13 @@ fn run_realtime_loop(app: *App) !void { const time = app.hasState(App.Time).?; const exit_channel = app.hasState(Channel(App.Exit)).?; - const virtual_thread_count = std.Thread.getCpuCount() catch 0; - var tasks = try coral.asio.TaskQueue.init(virtual_thread_count / 2); + try coral.tasks.start(); defer { - tasks.deinit(); + coral.tasks.stop(); } - try app.run(&tasks, .load); + try app.run(.load); const ticks_initial = std.time.milliTimestamp(); var ticks_previous = ticks_initial; @@ -352,19 +273,19 @@ fn run_realtime_loop(app: *App) !void { time.elapsed = @as(f64, @floatFromInt(ticks_current - ticks_initial)) / milliseconds_per_second; ticks_previous = ticks_current; - try app.run(&tasks, .pre_update); + try app.run(.pre_update); while (accumulated_time >= target_frame_time) : (accumulated_time -= target_frame_time) { - try app.run(&tasks, .update); + try app.run(.update); } - try app.run(&tasks, .post_update); - try app.run(&tasks, .render); + try app.run(.post_update); + try app.run(.render); const exit_messages = exit_channel.messages(); if (exit_messages.len != 0) { - try app.run(&tasks, .exit); + try app.run(.exit); switch (exit_messages[exit_messages.len - 1]) { .success => {