Add priorities to mutl-threaded task schedule

This commit is contained in:
kayomn 2025-08-12 23:40:21 +01:00
parent 90f51656f0
commit 9b0368a568
12 changed files with 510 additions and 590 deletions

103
src/coral/Tasks.zig Normal file
View File

@ -0,0 +1,103 @@
const Queue = @import("./Tasks/Queue.zig");
const builtin = @import("builtin");
const coral = @import("./coral.zig");
const std = @import("std");
prioritised_tasks: [priorities.len]?*Queue = [_]?*Queue{null} ** priorities.len,
pub const Priority = enum {
high_priority,
low_priority,
background,
fn getThreadCount(self: Priority, cpu_count: usize) usize {
const cpu_share: f64 = @floatFromInt(cpu_count);
return @intFromFloat(switch (self) {
.high_priority => cpu_share * 0.375,
.low_priority => cpu_share * 0.25,
.background => cpu_share * 0.125,
});
}
};
const Self = @This();
pub fn create(self: *Self, allocator: std.mem.Allocator, priority: Priority, task: anytype) std.mem.Allocator.Error!*@TypeOf(task) {
const Task = @TypeOf(task);
const run_fn = coral.meta.hasFn(Task, "run") orelse {
@compileError(std.fmt.comptimePrint("{s} requires a .run fn to be a valid task type", .{@typeName(Task)}));
};
if (run_fn.return_type.? != void) {
@compileError(std.fmt.comptimePrint("{s}.run fn must return a void type", .{@typeName(Task)}));
}
if (run_fn.params.len != 1 or run_fn.params[0].type != *Task) {
@compileError(std.fmt.comptimePrint("{s}.run fn must accept a {s} as it's one and only parameter, not {s}", .{
@typeName(Task),
@typeName(*Task),
if (run_fn.params[0].type) |Type| @typeName(Type) else "anytype",
}));
}
const created_task = try allocator.create(Task);
created_task.* = task;
if (self.prioritised_tasks[@intFromEnum(priority)]) |tasks| {
tasks.enqueue(.initRef(created_task, Task.run));
} else {
created_task.run();
}
return created_task;
}
pub fn finish(self: *Self, priority: Priority) void {
if (self.prioritised_tasks[@intFromEnum(priority)]) |tasks| {
tasks.finish();
}
}
const priorities = std.enums.values(Priority);
pub fn start(self: *Self) std.Thread.SpawnError!void {
self.* = .{};
if (!builtin.single_threaded) {
if (std.Thread.getCpuCount()) |cpu_count| {
errdefer {
self.stop();
}
inline for (priorities, &self.prioritised_tasks) |priority, *has_tasks| {
const thread_count = priority.getThreadCount(cpu_count);
if (thread_count != 0) {
has_tasks.* = try Queue.spawn(thread_count);
}
}
} else |cpu_count_error| {
std.log.warn("Failed to get number of CPU cores available: {s}", .{@errorName(cpu_count_error)});
}
}
}
pub fn stop(self: *Self) void {
inline for (&self.prioritised_tasks) |*has_tasks| {
if (has_tasks.*) |tasks| {
tasks.stop();
}
has_tasks.* = null;
}
for (priorities) |priority| {
self.finish(priority);
}
}

117
src/coral/Tasks/Queue.zig Normal file
View File

@ -0,0 +1,117 @@
const coral = @import("../coral.zig");
const std = @import("std");
threads_active: std.atomic.Value(usize) = .init(0),
buffer: [max]Runnable = undefined,
head: usize = 0,
tail: usize = 0,
access: std.Thread.Mutex = .{},
not_empty: std.Thread.Condition = .{},
not_full: std.Thread.Condition = .{},
waiting: std.Thread.WaitGroup = .{},
const Runnable = coral.Callable(void, &.{});
const Self = @This();
pub fn dequeue(self: *Self) Runnable {
self.access.lock();
defer {
self.access.unlock();
}
while (self.head == self.tail) {
self.not_empty.wait(&self.access);
}
const item = self.buffer[self.head];
const wrapped_head = (self.head + 1) % max;
self.head = wrapped_head;
self.not_full.signal();
return item;
}
pub fn enqueue(self: *Self, item: Runnable) void {
self.access.lock();
defer {
self.access.unlock();
}
const wrapped_tail = (self.tail + 1) % max;
while (wrapped_tail == self.head) {
self.not_full.wait(&self.access);
}
self.waiting.start();
self.buffer[self.tail] = item;
self.tail = wrapped_tail;
self.not_empty.signal();
}
pub fn finish(self: *Self) void {
self.waiting.wait();
self.waiting.reset();
}
fn main(self: *Self) void {
_ = self.threads_active.fetchAdd(1, .monotonic);
while (true) {
const runner = self.dequeue();
if (runner.isFn(poisonPill)) {
break;
}
runner.call(.{});
self.waiting.finish();
}
const previous_threads_active = self.threads_active.fetchSub(1, .monotonic);
std.debug.assert(previous_threads_active != 0);
if (previous_threads_active == 1) {
coral.heap.allocator.destroy(self);
}
}
const max = 512;
fn poisonPill() void {}
pub fn stop(self: *Self) void {
var threads_remaining = self.threads_active.load(.monotonic);
const poison_pill = Runnable.initFn(poisonPill);
while (threads_remaining != 0) : (threads_remaining -= 1) {
self.enqueue(poison_pill);
}
}
pub fn spawn(thread_count: usize) std.Thread.SpawnError!*Self {
if (thread_count == 0) {
return error.Unexpected;
}
const self = try coral.heap.allocator.create(Self);
self.* = .{};
var threads_spawned: usize = 0;
while (threads_spawned < thread_count) : (threads_spawned += 1) {
(try std.Thread.spawn(.{}, main, .{self})).detach();
}
return self;
}

View File

@ -1,275 +0,0 @@
const builtin = @import("builtin");
const coral = @import("./coral.zig");
const std = @import("std");
pub fn CallTask(comptime function: anytype) type {
const Function = @TypeOf(function);
const function_fn = switch (@typeInfo(Function)) {
.@"fn" => |@"fn"| @"fn",
else => @compileError("expeceted param `exec` to be an fn type"),
};
const Args = std.meta.ArgsTuple(Function);
return struct {
args: Args,
returned: Future(function_fn.return_type.?),
const Self = @This();
pub fn init(args: Args) Self {
return .{
.args = args,
.returned = .empty,
};
}
pub fn run(self: *Self) void {
std.debug.assert(self.returned.resolve(@call(.auto, function, self.args)));
}
};
}
pub fn BlockingQueue(comptime max: usize, comptime T: type) type {
const circular_max = max + 1;
return struct {
buffer: [circular_max]T,
head: usize,
tail: usize,
access: std.Thread.Mutex,
not_empty: std.Thread.Condition,
not_full: std.Thread.Condition,
const Self = @This();
pub fn dequeue(self: *Self) T {
self.access.lock();
defer {
self.access.unlock();
}
while (self.head == self.tail) {
self.not_empty.wait(&self.access);
}
const item = self.buffer[self.head];
const wrapped_head = (self.head + 1) % circular_max;
self.head = wrapped_head;
self.not_full.signal();
return item;
}
pub const empty = Self{
.buffer = undefined,
.head = 0,
.tail = 0,
.access = .{},
.not_empty = .{},
.not_full = .{},
};
pub fn enqueue(self: *Self, item: T) void {
self.access.lock();
defer {
self.access.unlock();
}
const wrapped_tail = (self.tail + 1) % circular_max;
while (wrapped_tail == self.head) {
self.not_full.wait(&self.access);
}
self.buffer[self.tail] = item;
self.tail = wrapped_tail;
self.not_empty.signal();
}
};
}
pub const Event = Future(void);
pub fn Future(comptime Payload: type) type {
return struct {
payload: ?Payload,
resolved: std.Thread.ResetEvent,
const Self = @This();
pub const empty = Self{
.payload = null,
.resolved = .{},
};
pub fn get(self: *Self) *Payload {
// TODO: Make async.
self.resolved.wait();
return &self.payload.?;
}
pub fn resolve(self: *Self, payload: Payload) bool {
if (self.resolved.isSet()) {
return false;
}
self.payload = payload;
self.resolved.set();
return true;
}
};
}
pub const TaskQueue = struct {
threads: []std.Thread,
arena: std.heap.ArenaAllocator,
tasks: *Buffer,
const Buffer = struct {
waiting: std.Thread.WaitGroup,
pending: BlockingQueue(512, Runnable),
fn consume(self: *Buffer) void {
while (true) {
const runner = self.pending.dequeue();
if (runner.isFn(poison_pill)) {
break;
}
runner.call(.{});
self.waiting.finish();
}
}
fn poison(self: *Buffer, threads: []const std.Thread) void {
var threads_remaining = threads.len;
while (threads_remaining != 0) : (threads_remaining -= 1) {
self.pending.enqueue(.initFn(poison_pill));
}
for (threads) |thread| {
thread.join();
}
}
fn poison_pill() void {}
};
const Runnable = coral.Callable(void, &.{});
pub fn create(self: *TaskQueue, task: anytype) std.mem.Allocator.Error!*const @TypeOf(task) {
const Task = @TypeOf(task);
switch (@hasDecl(Task, "run")) {
true => switch (@TypeOf(Task.run)) {
fn (*Task) void, fn (*const Task) void => {},
else => @compileError("param `task` must contain a `run` declaration"),
},
else => @compileError("`run` declaration on param `task` must be an fn type"),
}
switch (builtin.single_threaded) {
true => {
Runnable.initRef(&task, task.run).call(.{});
return &task;
},
else => {
const allocated_task = try self.arena.allocator().create(Task);
allocated_task.* = task;
self.tasks.waiting.start();
self.tasks.pending.enqueue(.initRef(allocated_task, Task.run));
return allocated_task;
},
}
}
pub fn execute(self: *TaskQueue, task: anytype) std.mem.Allocator.Error!void {
_ = try self.create(task);
}
pub fn deinit(self: *TaskQueue) void {
self.tasks.waiting.wait();
self.tasks.poison(self.threads);
self.arena.deinit();
coral.heap.allocator.destroy(self.tasks);
coral.heap.allocator.free(self.threads);
self.* = undefined;
}
pub fn finish(self: *TaskQueue) void {
self.tasks.waiting.wait();
self.tasks.waiting.reset();
_ = self.arena.reset(.retain_capacity);
}
pub fn init(thread_count: usize) (std.mem.Allocator.Error || std.Thread.SpawnError)!TaskQueue {
const tasks = try coral.heap.allocator.create(Buffer);
errdefer {
coral.heap.allocator.destroy(tasks);
}
tasks.* = .{
.pending = .empty,
.waiting = .{},
};
const threads = try coral.heap.allocator.alloc(std.Thread, if (builtin.single_threaded) 0 else thread_count);
var threads_spawned: usize = 0;
errdefer {
tasks.poison(threads[0..threads_spawned]);
coral.heap.allocator.free(threads);
}
for (threads) |*thread| {
thread.* = try .spawn(.{}, Buffer.consume, .{tasks});
threads_spawned += 1;
}
std.debug.assert(threads_spawned == thread_count);
for (threads) |thread| {
thread.setName("ona worker") catch |set_name_error| {
switch (set_name_error) {
error.Unsupported, error.NameTooLong => break,
else => continue,
}
};
}
return .{
.arena = .init(coral.heap.allocator),
.tasks = tasks,
.threads = threads,
};
}
};
pub fn wait_all(comptime Payload: type, futures: []const *Future(Payload)) void {
for (futures) |future| {
_ = future.get();
}
}

View File

@ -1,8 +1,8 @@
pub const Box = @import("./Box.zig"); pub const Box = @import("./Box.zig");
pub const TypeId = @import("./TypeId.zig"); const Tasks = @import("./Tasks.zig");
pub const asio = @import("./asio.zig"); pub const TypeId = @import("./TypeId.zig");
pub const bytes = @import("./bytes.zig"); pub const bytes = @import("./bytes.zig");
@ -24,6 +24,49 @@ pub const tree = @import("./tree.zig");
pub const utf8 = @import("./utf8.zig"); pub const utf8 = @import("./utf8.zig");
pub fn CallTask(comptime function: anytype) type {
const Function = @TypeOf(function);
const function_fn = switch (@typeInfo(Function)) {
.@"fn" => |@"fn"| @"fn",
else => @compileError("expeceted param `function` to be an fn type"),
};
return struct {
args: std.meta.ArgsTuple(Function),
payload: Return = undefined,
resolved: std.Thread.ResetEvent = .{},
const Return = function_fn.return_type.?;
const Self = @This();
pub fn get(self: *const Self) Return {
std.debug.assert(self.resolved.isSet());
self.resolved.wait();
self.resolved.reset();
return self.payload;
}
pub fn resolve(self: *Self, payload: Return) bool {
if (self.resolved.isSet()) {
return false;
}
self.payload = payload;
self.resolved.set();
return true;
}
pub fn run(self: *Self) void {
std.debug.assert(self.resolve(@call(.auto, function, self.args)));
}
};
}
pub fn Callable(comptime Output: type, comptime input_types: []const type) type { pub fn Callable(comptime Output: type, comptime input_types: []const type) type {
const InputTuple = std.meta.Tuple(input_types); const InputTuple = std.meta.Tuple(input_types);
@ -135,3 +178,5 @@ pub fn expect(function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) me
else => @call(.auto, function, args), else => @call(.auto, function, args),
}; };
} }
pub var tasks = Tasks{};

View File

@ -1,8 +1,8 @@
const std = @import("std"); const std = @import("std");
fn Generic(comptime Item: type, comptime Buffer: type) type { fn Generic(comptime Item: type, comptime Storage: type) type {
return struct { return struct {
items: Buffer, items: Storage,
const Self = @This(); const Self = @This();
@ -217,8 +217,10 @@ pub fn Sequential(comptime Item: type) type {
return error.OutOfMemory; return error.OutOfMemory;
}; };
self.ptr = (try allocator.realloc(self.ptr[0..self.cap], new_cap)).ptr; const reallocation = try allocator.realloc(self.ptr[0..self.cap], new_cap);
self.cap = @intCast(new_cap);
self.ptr = reallocation.ptr;
self.cap = new_cap;
} }
pub fn set(self: *Self, index: usize, value: Item) bool { pub fn set(self: *Self, index: usize, value: Item) bool {

View File

@ -50,8 +50,8 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits(
fn getMax(self: *Node) *Node { fn getMax(self: *Node) *Node {
var node = self; var node = self;
while (self.has_lesser) |lesser| { while (node.has_greater) |greater| {
node = lesser; node = greater;
} }
return node; return node;
@ -60,7 +60,7 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits(
fn getMin(self: *Node) *Node { fn getMin(self: *Node) *Node {
var node = self; var node = self;
while (self.has_lesser) |lesser| { while (node.has_lesser) |lesser| {
node = lesser; node = lesser;
} }
@ -133,40 +133,36 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits(
nodes: ?*Node, nodes: ?*Node,
pub fn next(self: *KeyValues) ?coral.KeyValuePair(Key, *Value) { pub fn next(self: *KeyValues) ?coral.KeyValuePair(Key, *Value) {
var nodes = self.nodes; const current_node = self.nodes orelse {
while (nodes) |node| {
const left = node.has_lesser orelse {
self.nodes = node.has_greater;
return .{
.key = node.key,
.value = &node.value,
};
};
// Find the rightmost node in left subtree or link back to current
var pred = left;
while (pred.has_greater != null and pred.has_greater != node) {
pred = pred.has_greater.?;
}
if (pred.has_greater != null) {
pred.has_greater = null;
self.nodes = node.has_greater;
return .{
.key = node.key,
.value = &node.value,
};
}
pred.has_greater = node;
self.nodes = node.has_lesser;
nodes = self.nodes;
}
return null; return null;
};
var successor_node: ?*Node = null;
if (current_node.has_greater) |greater| {
successor_node = greater.getMin();
} else {
// Climb until at a left-child.
var search_node = current_node;
var has_parent = search_node.has_parent;
while (has_parent) |parent| {
if (parent.has_lesser == search_node) {
successor_node = parent;
break;
}
search_node = parent;
has_parent = parent.has_parent;
}
}
self.nodes = successor_node;
return .{
.key = current_node.key,
.value = &current_node.value,
};
} }
pub fn nextKey(self: *KeyValues) ?Key { pub fn nextKey(self: *KeyValues) ?Key {
@ -310,7 +306,7 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits(
pub fn getKey(self: Self, key: Key) ?Key { pub fn getKey(self: Self, key: Key) ?Key {
if (self.has_root) |root| { if (self.has_root) |root| {
if (root.find(key)) |node| { if (root.find(key)) |node| {
return &node.key; return node.key;
} }
} }
@ -322,7 +318,11 @@ pub fn Binary(comptime Key: type, comptime Value: type, comptime traits: Traits(
} }
pub fn keyValues(self: *const Self) KeyValues { pub fn keyValues(self: *const Self) KeyValues {
return .{ .nodes = self.has_root }; if (self.has_root) |root| {
return .{ .nodes = root.getMin() };
}
return .{ .nodes = null };
} }
pub fn remove(self: *Self, key: Key) ?coral.KeyValuePair(Key, Value) { pub fn remove(self: *Self, key: Key) ?coral.KeyValuePair(Key, Value) {

View File

@ -2,14 +2,72 @@ const ona = @import("ona");
const std = @import("std"); const std = @import("std");
const CRT = extern struct {
width: f32,
height: f32,
time: f32,
padding: [4]u8 = undefined,
};
const Effects = struct {
render_texture: ona.AssetHandle = .default,
image_textures: [2]ona.AssetHandle = [_]ona.AssetHandle{.default} ** 2,
last_time: f64 = 0,
image_index: usize = 0,
crt_effect: ona.AssetHandle = .default,
};
pub fn main() void { pub fn main() void {
ona.realtime_app ona.realtime_app
.with(.initModule(ona.hid)) .with(.initModule(ona.hid))
.with(.initModule(ona.gfx)) .with(.initModule(ona.gfx))
.with(.initSystem(.render, .of(render))) // .with(.initSystem(.render, .of(render)))
.run(); .run();
} }
fn render(commands: ona.gfx.Commands) void { // fn update(effects: ona.Write(Effects), loop: ona.Read(ona.Loop)) void {
_ = commands; // const update_seconds = 5;
}
// if ((loop.state.elapsed_time - effects.state.last_time) > update_seconds) {
// effects.state.image_index = (effects.state.image_index + 1) % effects.state.image_textures.len;
// effects.state.last_time = loop.state.elapsed_time;
// }
// }
// fn render(commands: ona.gfx.Commands) void {
// try commands.setTarget(.{
// .texture = effects.state.render_texture,
// .clear_color = gfx.colors.black,
// .clear_depth = 0,
// .clear_stencil = 0,
// });
// const width: f32 = @floatFromInt(display.state.width);
// const height: f32 = @floatFromInt(display.state.height);
// try commands.draw_texture(.{
// .texture = effects.state.image_textures[effects.state.image_index],
// .size = .{ width, height },
// });
// try commands.set_effect(.{
// .effect = effects.state.crt_effect,
// .properties = std.mem.asBytes(&CRT{
// .width = width,
// .height = height,
// .time = @floatCast(loop.state.elapsed_time),
// }),
// });
// try commands.set_target(.{
// .clear_color = null,
// .clear_depth = null,
// .clear_stencil = null,
// });
// try commands.draw_texture(.{
// .texture = effects.state.render_texture,
// .size = .{ width, height },
// });
// }

View File

@ -82,16 +82,16 @@ pub fn init() error{OutOfMemory}!Self {
pub fn on(self: *Self, comptime schedule: anytype, behavior: *const Behavior) error{OutOfMemory}!void { pub fn on(self: *Self, comptime schedule: anytype, behavior: *const Behavior) error{OutOfMemory}!void {
const schedule_name = scheduleName(schedule); const schedule_name = scheduleName(schedule);
const systems = self.named_systems.get(schedule_name) orelse (try self.named_systems.insert(coral.heap.allocator, schedule_name, .empty)).?; const systems = self.named_systems.get(schedule_name) orelse (try self.named_systems.insert(coral.heap.allocator, schedule_name, .{})).?;
try systems.insert(self, behavior); try systems.insert(self, behavior);
} }
pub fn run(self: *Self, tasks: *coral.asio.TaskQueue, comptime schedule: anytype) RunError!void { pub fn run(self: *Self, comptime schedule: anytype) RunError!void {
const schedule_name = scheduleName(schedule); const schedule_name = scheduleName(schedule);
const systems = self.named_systems.get(schedule_name) orelse (try self.named_systems.insert(coral.heap.allocator, schedule_name, .empty)).?; const systems = self.named_systems.get(schedule_name) orelse (try self.named_systems.insert(coral.heap.allocator, schedule_name, .{})).?;
try systems.run(self, tasks); try systems.run(self);
systems.applyDeferred(self); systems.applyDeferred(self);
} }

View File

@ -4,13 +4,14 @@ const ona = @import("../ona.zig");
const std = @import("std"); const std = @import("std");
edges: Map(Edge), edges: Map(Edge) = .empty,
processed: Map(void), processed: Map(void) = .empty,
state_readers: AccessMap, state_readers: AccessMap = .empty,
state_writers: AccessMap, state_writers: AccessMap = .empty,
blocking_work: WorkSet, blocking_work: WorkSet = .empty,
parallel_work: WorkSet, parallel_work: WorkSet = .empty,
parallel_work_ranges: coral.stack.Sequential(usize), parallel_work_ranges: coral.stack.Sequential(usize) = .empty,
run_arena: std.heap.ArenaAllocator = .init(coral.heap.allocator),
const AccessMap = coral.tree.Binary(*const coral.TypeId, BehaviorSet, coral.tree.scalarTraits(*const coral.TypeId)); const AccessMap = coral.tree.Binary(*const coral.TypeId, BehaviorSet, coral.tree.scalarTraits(*const coral.TypeId));
@ -61,6 +62,7 @@ pub fn deinit(self: *Self) void {
self.parallel_work.deinit(allocator); self.parallel_work.deinit(allocator);
self.parallel_work_ranges.deinit(allocator); self.parallel_work_ranges.deinit(allocator);
self.blocking_work.deinit(allocator); self.blocking_work.deinit(allocator);
self.run_arena.deinit();
inline for (.{ &self.edges, &self.state_readers, &self.state_writers }) |map| { inline for (.{ &self.edges, &self.state_readers, &self.state_writers }) |map| {
var key_values = map.keyValues(); var key_values = map.keyValues();
@ -76,10 +78,10 @@ pub fn deinit(self: *Self) void {
pub fn dependOnBehavior(self: *Self, app: *ona.App, dependant: *const ona.App.Behavior, dependency: *const ona.App.Behavior) std.mem.Allocator.Error!void { pub fn dependOnBehavior(self: *Self, app: *ona.App, dependant: *const ona.App.Behavior, dependency: *const ona.App.Behavior) std.mem.Allocator.Error!void {
try self.insert(app, dependant); try self.insert(app, dependant);
const edges = self.edges.get(dependant).?; const edge = self.edges.get(dependant).?;
if (std.mem.indexOfScalar(*const ona.App.Behavior, edges.dependencies.items.slice(), dependency) == null) { if (std.mem.indexOfScalar(*const ona.App.Behavior, edge.dependencies.items.slice(), dependency) == null) {
try edges.dependencies.pushGrow(coral.heap.allocator, dependency); try edge.dependencies.pushGrow(coral.heap.allocator, dependency);
} }
} }
@ -123,16 +125,6 @@ pub fn dependOnType(self: *Self, app: *ona.App, dependant: *const ona.App.Behavi
} }
} }
pub const empty = Self{
.edges = .empty,
.processed = .empty,
.state_readers = .empty,
.state_writers = .empty,
.blocking_work = .empty,
.parallel_work = .empty,
.parallel_work_ranges = .empty,
};
pub fn applyDeferred(self: *Self, app: *ona.App) void { pub fn applyDeferred(self: *Self, app: *ona.App) void {
for (self.parallel_work.items.slice()) |work| { for (self.parallel_work.items.slice()) |work| {
work.behavior.apply(work.local_state, app); work.behavior.apply(work.local_state, app);
@ -146,15 +138,16 @@ pub fn applyDeferred(self: *Self, app: *ona.App) void {
pub fn insert(self: *Self, app: *ona.App, behavior: *const ona.App.Behavior) std.mem.Allocator.Error!void { pub fn insert(self: *Self, app: *ona.App, behavior: *const ona.App.Behavior) std.mem.Allocator.Error!void {
self.processed.clear(); self.processed.clear();
if (try self.edges.insert(coral.heap.allocator, behavior, undefined)) |edge| { // TODO: Refactor so partial initialisation isn't happening here anymore.
if (try self.edges.insert(coral.heap.allocator, behavior, .{
.local_state = undefined,
.dependencies = .empty,
})) |edge| {
errdefer { errdefer {
std.debug.assert(self.edges.remove(behavior) != null); std.debug.assert(self.edges.remove(behavior) != null);
} }
edge.* = .{ edge.local_state = try behavior.bind(behavior, app, self);
.local_state = try behavior.bind(behavior, app, self),
.dependencies = .empty,
};
} }
} }
@ -186,7 +179,13 @@ fn process(self: *Self, behavior: *const ona.App.Behavior, edge: Edge) !Processe
return processed; return processed;
} }
pub fn run(self: *Self, app: *const ona.App, tasks: *coral.asio.TaskQueue) RunError!void { pub fn run(self: *Self, app: *const ona.App) RunError!void {
defer {
if (!self.run_arena.reset(.retain_capacity)) {
std.log.warn("Failed to retain run arena capacity for system graph", .{});
}
}
if (self.processed.isEmpty()) { if (self.processed.isEmpty()) {
errdefer { errdefer {
self.processed.clear(); self.processed.clear();
@ -215,20 +214,19 @@ pub fn run(self: *Self, app: *const ona.App, tasks: *coral.asio.TaskQueue) RunEr
} }
{ {
const parallel_work = self.parallel_work.items.slice(); const run_allocator = self.run_arena.allocator();
var parallel_work_offset: usize = 0; var parallel_work = self.parallel_work.items.slice();
for (self.parallel_work_ranges.items.slice()) |parallel_work_range| { for (self.parallel_work_ranges.items.slice()) |parallel_work_range| {
try tasks.execute(coral.asio.CallTask(runWorkGroup).init(.{ _ = try coral.tasks.create(run_allocator, .high_priority, coral.CallTask(runWorkGroup){
app, .args = .{ app, parallel_work[0..parallel_work_range] },
parallel_work[parallel_work_offset .. parallel_work_offset + parallel_work_range], });
}));
parallel_work_offset += parallel_work_range; parallel_work = parallel_work[parallel_work_range..];
} }
} }
tasks.finish(); coral.tasks.finish(.high_priority);
runWorkGroup(app, self.blocking_work.items.slice()); runWorkGroup(app, self.blocking_work.items.slice());
} }

View File

@ -1,133 +0,0 @@
const coral = @import("coral");
const ext = @import("ext");
const ona = @import("./ona.zig");
const std = @import("std");
storage: *ext.SDL_Storage,
pub const Error = error{
AssetNotFound,
};
const Instance = struct {
storage: *ext.SDL_Storage,
pub fn deinit(self: *Instance) void {
while (!ext.SDL_StorageReady(self.storage)) {
ext.SDL_Delay(1);
}
if (!ext.SDL_CloseStorage(self.storage)) {
std.log.warn("Failed to close title storage", .{});
}
self.storage = undefined;
}
};
const Self = @This();
pub fn bind(app: *ona.App) !void {
const title_storage = ext.SDL_OpenTitleStorage(null, 0) orelse {
std.log.err("Failed to open title storage", .{});
return error.SdlFailure;
};
try app.setState(Instance{
.storage = title_storage,
});
}
pub fn init(instance: *Instance) Self {
return .{
.storage = instance.storage,
};
}
pub fn load(self: Self, path: []const u8, allocator: std.mem.Allocator) (std.mem.Allocator.Error || Error)![]u8 {
const path_buffer = try normalize_path(path);
while (!ext.SDL_StorageReady(self.storage)) {
ext.SDL_Delay(1);
}
var asset_size: u64 = 0;
if (!ext.SDL_GetStorageFileSize(self.storage, path_buffer.ptr(), &asset_size)) {
return error.AssetNotFound;
}
const file_buffer = try allocator.alloc(u8, asset_size);
errdefer {
allocator.free(file_buffer);
}
if (!ext.SDL_ReadStorageFile(self.storage, path_buffer.ptr(), file_buffer.ptr, file_buffer.len)) {
return error.AssetNotFound;
}
return file_buffer;
}
pub fn normalize_path(data: []const u8) Error!coral.ShortString {
const separator = '/';
var result = coral.ShortString.empty;
for (0..data.len) |i| {
if (data[i] == separator) {
const next_i = i + 1;
if (next_i == data.len) {
continue;
}
if (data[next_i] == separator) {
continue;
}
}
result.append(data[i]) catch |append_error| {
return switch (append_error) {
error.StringTooLong => error.AssetNotFound,
};
};
}
if (result.isEmpty()) {
return error.AssetNotFound;
}
return result;
}
pub fn read(self: Self, path: []const u8, output: []u8) Error!void {
const path_buffer = try normalize_path(path);
while (!ext.SDL_StorageReady(self.storage)) {
ext.SDL_Delay(1);
}
if (!ext.SDL_ReadStorageFile(self.storage, path_buffer.ptr(), output.ptr, output.len)) {
return error.AssetNotFound;
}
}
pub fn size(self: Self, path: []const u8) Error!usize {
const path_buffer = try normalize_path(path);
var asset_size: u64 = 0;
while (!ext.SDL_StorageReady(self.storage)) {
ext.SDL_Delay(1);
}
if (!ext.SDL_GetStorageFileSize(self.storage, path_buffer.ptr(), &asset_size)) {
return error.AssetNotFound;
}
return asset_size;
}

84
src/ona/asset.zig Normal file
View File

@ -0,0 +1,84 @@
const coral = @import("coral");
const std = @import("std");
pub const Handle = packed struct {
index: u24,
salt: u20,
type_id: u20,
};
pub const Path = struct {
buffer: [256]u8,
};
pub fn Store(comptime Asset: type) type {
return struct {
released_handles: coral.stack.Sequential(Handle) = .empty,
assets: coral.stack.Parallel(struct {
asset: Asset,
state: AssetState,
}) = .empty,
const AssetState = struct {
usage: enum { vacant, reserved, occupied },
salt: u20,
};
pub fn reserve(self: *Store) std.mem.Allocator.Error!Handle {
if (self.released_handles.pop()) |handle| {
const state = &self.assets.items.slice(.state)[handle.index];
state.usage = .reserved;
return handle;
}
const handle = Handle{
.type_id = type_id,
.index = self.assets.items.len,
.salt = 1,
};
try self.assets.pushGrow(coral.heap.allocator, .{
.asset = undefined,
.state = .{
.usage = .reserved,
.salt = handle.salt,
},
});
return handle;
}
pub fn resolve(self: *Store, reserved_handle: Handle, asset: Asset) bool {
const state = self.assets.items.slice(.state)[reserved_handle.index];
if (state.usage != .reserved) {
return false;
}
if (reserved_handle.salt != state.salt) {
return false;
}
self.assets.items.slice(.asset)[reserved_handle.index] = asset;
return true;
}
const type_id = generateTypeId();
};
}
fn generateTypeId() error{Overflow}!u20 {
const id = struct {
var last_assigned: u20 = 0;
};
id.last_assigned = try std.math.add(u20, id.last_assigned, 1);
return id.last_assigned;
}

View File

@ -1,5 +1,7 @@
pub const App = @import("./App.zig"); pub const App = @import("./App.zig");
const asset = @import("./asset.zig");
const coral = @import("coral"); const coral = @import("coral");
pub const gfx = @import("./gfx.zig"); pub const gfx = @import("./gfx.zig");
@ -8,116 +10,36 @@ pub const hid = @import("./hid.zig");
const std = @import("std"); const std = @import("std");
pub const AssetPath = struct { u32 }; pub const AssetHandle = asset.Handle;
pub const AssetPath = asset.Path;
pub fn Assets(comptime Asset: type) type { pub fn Assets(comptime Asset: type) type {
const Store = asset.Store(Asset);
return struct { return struct {
queue: *Queue, cached: *Store,
store: *Store,
pub const Handle = packed struct {
index: u32,
salt: u32,
};
const Queue = coral.stack.Sequential(struct {
asset_path: AssetPath,
reserved_handle: Handle,
});
const Self = @This(); const Self = @This();
const State = struct { pub fn init(store: *Store) Self {
queue: Queue,
};
pub const Store = struct {
released_handles: coral.stack.Sequential(Handle) = .empty,
assets: coral.stack.Parallel(struct {
asset: Asset,
state: AssetState,
}) = .empty,
const AssetState = struct {
usage: enum(u32) { vacant, reserved, occupied },
salt: u32,
};
pub fn reserve(self: *Store) std.mem.Allocator.Error!Handle {
if (self.released_handles.pop()) |handle| {
const state = &self.assets.items.slice(.state)[handle.index];
std.debug.assert(state.usage == .vacant);
std.debug.assert(state.salt == handle.salt);
state.usage = .reserved;
return handle;
}
const handle = Handle{
.index = self.assets.items.len,
.salt = 1,
};
try self.assets.pushGrow(coral.heap.allocator, .{
.asset = undefined,
.state = .{
.usage = .reserved,
.salt = handle.salt,
},
});
return handle;
}
pub fn resolve(self: *Store, reserved_handle: Handle, asset: Asset) bool {
const state = self.assets.items.slice(.state)[reserved_handle.index];
if (state.usage != .reserved) {
return false;
}
if (reserved_handle.salt != state.salt) {
return false;
}
self.assets.items.slice(.asset)[reserved_handle.index] = asset;
return true;
}
};
pub fn bind() State {
return .{
.queue = .empty,
};
}
pub fn init(state: *State, store: *Store) Self {
return .{ return .{
.store = store, .store = store,
.queue = &state.queue,
}; };
} }
pub fn insert(self: Self, asset: Asset) std.mem.Allocator.Error!Handle { pub fn insert(self: Self, uncached_asset: Asset) std.mem.Allocator.Error!AssetHandle {
const reserved_handle = try self.store.reserve(); const reserved_handle = try self.store.reserve();
std.debug.assert(self.store.resolve(reserved_handle, asset)); std.debug.assert(self.store.resolve(reserved_handle, uncached_asset));
return reserved_handle; return reserved_handle;
} }
pub fn load(self: Self, path: AssetPath) std.mem.Allocator.Error!Handle { pub fn load(self: Self, load_path: AssetPath) std.mem.Allocator.Error!AssetHandle {
const reserved_handle = try self.store.reserve(); const reserved_handle = try self.store.reserve();
try self.queue.pushGrow(coral.heap.allocator, .{ _ = load_path;
.asset_path = path,
.reserved_handle = reserved_handle,
});
return reserved_handle; return reserved_handle;
} }
@ -311,7 +233,7 @@ pub fn Write(comptime State: type) type {
pub const realtime_app = App.Setup.init(run_realtime_loop); pub const realtime_app = App.Setup.init(run_realtime_loop);
pub fn registerAsset(app: *App, comptime Asset: type) std.mem.Allocator.Error!void { pub fn registerAsset(app: *App, comptime Asset: type) std.mem.Allocator.Error!void {
const AssetStore = Assets(Asset).Store; const AssetStore = asset.Store(Asset);
try app.setState(AssetStore{}); try app.setState(AssetStore{});
} }
@ -331,14 +253,13 @@ fn run_realtime_loop(app: *App) !void {
const time = app.hasState(App.Time).?; const time = app.hasState(App.Time).?;
const exit_channel = app.hasState(Channel(App.Exit)).?; const exit_channel = app.hasState(Channel(App.Exit)).?;
const virtual_thread_count = std.Thread.getCpuCount() catch 0; try coral.tasks.start();
var tasks = try coral.asio.TaskQueue.init(virtual_thread_count / 2);
defer { defer {
tasks.deinit(); coral.tasks.stop();
} }
try app.run(&tasks, .load); try app.run(.load);
const ticks_initial = std.time.milliTimestamp(); const ticks_initial = std.time.milliTimestamp();
var ticks_previous = ticks_initial; var ticks_previous = ticks_initial;
@ -352,19 +273,19 @@ fn run_realtime_loop(app: *App) !void {
time.elapsed = @as(f64, @floatFromInt(ticks_current - ticks_initial)) / milliseconds_per_second; time.elapsed = @as(f64, @floatFromInt(ticks_current - ticks_initial)) / milliseconds_per_second;
ticks_previous = ticks_current; ticks_previous = ticks_current;
try app.run(&tasks, .pre_update); try app.run(.pre_update);
while (accumulated_time >= target_frame_time) : (accumulated_time -= target_frame_time) { while (accumulated_time >= target_frame_time) : (accumulated_time -= target_frame_time) {
try app.run(&tasks, .update); try app.run(.update);
} }
try app.run(&tasks, .post_update); try app.run(.post_update);
try app.run(&tasks, .render); try app.run(.render);
const exit_messages = exit_channel.messages(); const exit_messages = exit_channel.messages();
if (exit_messages.len != 0) { if (exit_messages.len != 0) {
try app.run(&tasks, .exit); try app.run(.exit);
switch (exit_messages[exit_messages.len - 1]) { switch (exit_messages[exit_messages.len - 1]) {
.success => { .success => {