ona/src/coral/system.zig

499 lines
14 KiB
Zig
Raw Normal View History

const dag = @import("./dag.zig");
const heap = @import("./heap.zig");
const map = @import("./map.zig");
const resource = @import("./resource.zig");
const slices = @import("./slices.zig");
const stack = @import("./stack.zig");
const std = @import("std");
const World = @import("./World.zig");
pub const BindContext = struct {
node: dag.Node,
systems: *Schedule,
world: *World,
pub const ResourceAccess = std.meta.Tag(Schedule.ResourceAccess);
pub fn accesses_resource(self: BindContext, access: ResourceAccess, id: resource.TypeID) bool {
const resource_accesses = &self.systems.graph.get_ptr(self.node).?.resource_accesses;
for (resource_accesses.values) |resource_access| {
switch (resource_access) {
.read_only => |resource_id| {
if (access == .read_only and resource_id == id) {
return true;
}
},
.read_write => |resource_id| {
if (access == .read_write and resource_id == id) {
return true;
}
},
}
}
return false;
}
pub fn register_read_write_resource_access(self: BindContext, thread_restriction: World.ThreadRestriction, comptime Resource: type) std.mem.Allocator.Error!?*Resource {
const value = self.world.get_resource(thread_restriction, Resource) orelse {
return null;
};
const id = resource.type_id(Resource);
if (!self.accesses_resource(.read_write, id)) {
try self.systems.graph.get_ptr(self.node).?.resource_accesses.push_grow(.{.read_write = id});
}
const read_write_resource_nodes = lazily_create: {
break: lazily_create self.systems.read_write_resource_id_nodes.get_ptr(id) orelse insert: {
std.debug.assert(try self.systems.read_write_resource_id_nodes.emplace(id, .{
.allocator = heap.allocator,
}));
break: insert self.systems.read_write_resource_id_nodes.get_ptr(id).?;
};
};
if (slices.index_of(read_write_resource_nodes.values, 0, self.node) == null) {
try read_write_resource_nodes.push_grow(self.node);
}
return value;
}
pub fn register_read_only_resource_access(self: BindContext, thread_restriction: World.ThreadRestriction, comptime Resource: type) std.mem.Allocator.Error!?*const Resource {
const value = self.world.get_resource(thread_restriction, Resource) orelse {
return null;
};
const id = resource.type_id(Resource);
if (!self.accesses_resource(.read_only, id)) {
try self.systems.graph.get_ptr(self.node).?.resource_accesses.push_grow(.{.read_only = id});
}
const read_only_resource_nodes = lazily_create: {
break: lazily_create self.systems.read_only_resource_id_nodes.get_ptr(id) orelse insert: {
std.debug.assert(try self.systems.read_only_resource_id_nodes.emplace(id, .{
.allocator = heap.allocator,
}));
break: insert self.systems.read_only_resource_id_nodes.get_ptr(id).?;
};
};
if (slices.index_of(read_only_resource_nodes.values, 0, self.node) == null) {
try read_only_resource_nodes.push_grow(self.node);
}
return value;
}
};
pub const Info = struct {
execute: *const fn ([]const *const Parameter, *const [max_parameters]?*anyopaque) anyerror!void,
parameters: [max_parameters]*const Parameter = undefined,
parameter_count: u4 = 0,
thread_restriction: World.ThreadRestriction = .none,
pub const Parameter = struct {
thread_restriction: World.ThreadRestriction,
init: *const fn (*anyopaque, ?*anyopaque) void,
bind: *const fn (std.mem.Allocator, BindContext) std.mem.Allocator.Error!?*anyopaque,
unbind: *const fn (std.mem.Allocator, ?*anyopaque) void,
};
pub fn used_parameters(self: *const Info) []const *const Parameter {
return self.parameters[0 .. self.parameter_count];
}
};
pub const Order = struct {
label: []const u8 = "",
run_after: []const *const Info = &.{},
run_before: []const *const Info = &.{},
};
pub const Schedule = struct {
label: [:0]const u8,
graph: Graph,
arena: std.heap.ArenaAllocator,
system_id_nodes: map.Hashed(usize, NodeBundle, map.usize_traits),
read_write_resource_id_nodes: ResourceNodeBundle,
read_only_resource_id_nodes: ResourceNodeBundle,
parallel_work_bundles: ParallelNodeBundles,
blocking_work: NodeBundle,
const Dependency = struct {
kind: Kind,
id: usize,
const Kind = enum {
after,
before,
};
};
const Graph = dag.Graph(struct {
info: *const Info,
label: [:0]u8,
dependencies: []Dependency,
parameter_states: [max_parameters]?*anyopaque = [_]?*anyopaque{null} ** max_parameters,
resource_accesses: stack.Sequential(ResourceAccess),
});
const NodeBundle = stack.Sequential(dag.Node);
const ParallelNodeBundles = stack.Sequential(NodeBundle);
const ResourceAccess = union (enum) {
read_only: resource.TypeID,
read_write: resource.TypeID,
};
const ResourceNodeBundle = map.Hashed(resource.TypeID, NodeBundle, map.enum_traits(resource.TypeID));
pub fn deinit(self: *Schedule) void {
{
var nodes = self.system_id_nodes.entries();
while (nodes.next()) |node| {
node.value.deinit();
}
}
{
var nodes = self.read_write_resource_id_nodes.entries();
while (nodes.next()) |node| {
node.value.deinit();
}
}
{
var nodes = self.read_only_resource_id_nodes.entries();
while (nodes.next()) |node| {
node.value.deinit();
}
}
var nodes = self.graph.nodes();
while (nodes.next()) |node| {
const system = self.graph.get_ptr(node).?;
for (system.info.used_parameters(), system.parameter_states[0 .. system.info.parameter_count]) |parameter, state| {
parameter.unbind(self.arena.allocator(), state);
}
system.resource_accesses.deinit();
heap.allocator.free(system.dependencies);
heap.allocator.free(system.label);
}
for (self.parallel_work_bundles.values) |*bundle| {
bundle.deinit();
}
self.parallel_work_bundles.deinit();
self.blocking_work.deinit();
self.graph.deinit();
self.system_id_nodes.deinit();
self.read_write_resource_id_nodes.deinit();
self.read_only_resource_id_nodes.deinit();
self.arena.deinit();
}
pub fn run(self: *Schedule, world: *World) anyerror!void {
if (self.is_invalidated()) {
const work = struct {
fn regenerate_graph(schedule: *Schedule) !void {
schedule.graph.clear_edges();
var nodes = schedule.graph.nodes();
while (nodes.next()) |node| {
const system = schedule.graph.get_ptr(node).?;
for (system.dependencies) |order| {
const dependencies = schedule.system_id_nodes.get(@intFromPtr(system.info)) orelse {
@panic("unable to resolve missing explicit system dependency");
};
if (dependencies.is_empty()) {
@panic("unable to resolve missing explicit system dependency");
}
switch (order.kind) {
.after => {
for (dependencies.values) |dependency_node| {
std.debug.assert(try schedule.graph.insert_edge(node, dependency_node));
}
},
.before => {
for (dependencies.values) |dependency_node| {
std.debug.assert(try schedule.graph.insert_edge(dependency_node, node));
}
},
}
}
for (system.resource_accesses.values) |resource_access| {
switch (resource_access) {
.read_write => |resource_id| {
const read_write_dependencies = schedule.read_write_resource_id_nodes.get(resource_id) orelse {
@panic("unable to resolve missing implicit read-write parameter dependency");
};
for (read_write_dependencies.values) |dependency_node| {
std.debug.assert(try schedule.graph.insert_edge(node, dependency_node));
}
if (schedule.read_only_resource_id_nodes.get(resource_id)) |dependencies| {
for (dependencies.values) |dependency_node| {
std.debug.assert(try schedule.graph.insert_edge(node, dependency_node));
}
}
},
.read_only => |resource_id| {
if (schedule.read_only_resource_id_nodes.get(resource_id)) |dependencies| {
for (dependencies.values) |dependency_node| {
std.debug.assert(try schedule.graph.insert_edge(node, dependency_node));
}
}
},
}
}
}
}
fn populate_bundle(bundle: *NodeBundle, graph: *Graph, node: dag.Node) !void {
std.debug.assert(graph.mark_visited(node));
for (graph.edge_nodes(node).?) |edge| {
if (graph.visited(edge).?) {
continue;
}
try populate_bundle(bundle, graph, edge);
}
try bundle.push_grow(node);
}
fn sort(schedule: *Schedule) !void {
defer schedule.graph.reset_visited();
var nodes = schedule.graph.nodes();
while (nodes.next()) |node| {
if (schedule.graph.visited(node).?) {
continue;
}
try schedule.parallel_work_bundles.push_grow(.{.allocator = heap.allocator});
const bundle = schedule.parallel_work_bundles.get_ptr().?;
errdefer {
bundle.deinit();
std.debug.assert(schedule.parallel_work_bundles.pop());
}
try populate_bundle(bundle, &schedule.graph, node);
}
for (schedule.parallel_work_bundles.values) |*work| {
var index = @as(usize, 0);
while (index < work.len()) : (index += 1) {
const node = work.values[index];
switch (schedule.graph.get_ptr(node).?.info.thread_restriction) {
.none => continue,
.main => {
const extracted_work = work.values[index ..];
try schedule.blocking_work.grow(extracted_work.len);
std.debug.assert(schedule.blocking_work.push_all(extracted_work));
std.debug.assert(work.pop_many(extracted_work.len));
},
}
}
}
}
};
try work.regenerate_graph(self);
try work.sort(self);
}
// TODO: Refactor so the thread pool is a global resource rather than owned per-world.
if (world.thread_pool) |thread_pool| {
const parallel = struct {
fn run(work_group: *std.Thread.WaitGroup, graph: Graph, bundle: NodeBundle) void {
defer work_group.finish();
for (bundle.values) |node| {
const system = graph.get_ptr(node).?;
// TODO: std lib thread pool sucks for many reasons and this is one of them.
system.info.execute(system.info.used_parameters(), &system.parameter_states) catch unreachable;
}
}
};
var work_group = std.Thread.WaitGroup{};
for (self.parallel_work_bundles.values) |bundle| {
work_group.start();
try thread_pool.spawn(parallel.run, .{&work_group, self.graph, bundle});
}
thread_pool.waitAndWork(&work_group);
} else {
for (self.parallel_work_bundles.values) |bundle| {
for (bundle.values) |node| {
const system = self.graph.get_ptr(node).?;
try system.info.execute(system.info.used_parameters(), &system.parameter_states);
}
}
}
for (self.blocking_work.values) |node| {
const system = self.graph.get_ptr(node).?;
try system.info.execute(system.info.used_parameters(), &system.parameter_states);
}
}
pub fn init(label: []const u8) std.mem.Allocator.Error!Schedule {
var arena = std.heap.ArenaAllocator.init(heap.allocator);
errdefer arena.deinit();
const duped_label = try arena.allocator().dupeZ(u8, label);
return .{
.graph = Graph.init(heap.allocator),
.label = duped_label,
.arena = arena,
.system_id_nodes = .{.allocator = heap.allocator},
.read_write_resource_id_nodes = .{.allocator = heap.allocator},
.read_only_resource_id_nodes = .{.allocator = heap.allocator},
.parallel_work_bundles = .{.allocator = heap.allocator},
.blocking_work = .{.allocator = heap.allocator},
};
}
pub fn invalidate_work(self: *Schedule) void {
self.blocking_work.clear();
for (self.parallel_work_bundles.values) |*bundle| {
bundle.deinit();
}
self.parallel_work_bundles.clear();
}
pub fn is_invalidated(self: Schedule) bool {
return self.parallel_work_bundles.is_empty() and self.blocking_work.is_empty();
}
pub fn then(self: *Schedule, world: *World, info: *const Info, order: Order) std.mem.Allocator.Error!void {
const nodes = lazily_create: {
const system_id = @intFromPtr(info);
break: lazily_create self.system_id_nodes.get_ptr(system_id) orelse insert: {
std.debug.assert(try self.system_id_nodes.emplace(system_id, .{
.allocator = self.system_id_nodes.allocator,
}));
break: insert self.system_id_nodes.get_ptr(system_id).?;
};
};
const dependencies = init: {
const total_run_orders = order.run_after.len + order.run_before.len;
const dependencies = try heap.allocator.alloc(Dependency, total_run_orders);
var dependencies_written = @as(usize, 0);
for (order.run_after) |after_system| {
dependencies[dependencies_written] = .{
.id = @intFromPtr(after_system),
.kind = .after,
};
dependencies_written += 1;
}
for (order.run_before) |before_system| {
dependencies[dependencies_written] = .{
.id = @intFromPtr(before_system),
.kind = .before,
};
dependencies_written += 1;
}
break: init dependencies;
};
errdefer heap.allocator.free(dependencies);
const label = try heap.allocator.dupeZ(u8, if (order.label.len == 0) "anonymous system" else order.label);
errdefer heap.allocator.free(label);
const node = try self.graph.append(.{
.info = info,
.label = label,
.dependencies = dependencies,
.resource_accesses = .{.allocator = heap.allocator},
});
const system = self.graph.get_ptr(node).?;
errdefer {
for (info.used_parameters(), system.parameter_states[0 .. info.parameter_count]) |parameter, state| {
if (state) |initialized_state| {
parameter.unbind(self.arena.allocator(), initialized_state);
}
}
std.debug.assert(self.graph.remove_node(node) != null);
}
for (system.parameter_states[0 .. info.parameter_count], info.used_parameters()) |*state, parameter| {
state.* = try parameter.bind(self.arena.allocator(), .{
.world = world,
.node = node,
.systems = self,
});
}
try nodes.push_grow(node);
self.invalidate_work();
}
};
pub const max_parameters = 16;