From b842cbde9ef26d0f8214bb7a1592f7b1719f7db1 Mon Sep 17 00:00:00 2001 From: Kayomn Date: Sun, 28 Nov 2021 21:08:12 +0000 Subject: [PATCH] Initial commit --- .gitignore | 74 ++++ .idea/inspectionProfiles/Project_Default.xml | 7 + .idea/misc.xml | 6 + .idea/modules.xml | 8 + .idea/runConfigurations.xml | 10 + .idea/vcs.xml | 6 + distsys.iml | 11 + src/net/kayomn/ControlNode.java | 76 ++++ src/net/kayomn/DistributionNode.java | 61 +++ src/net/kayomn/UserNode.java | 42 +++ src/net/kayomn/common/Encodable.java | 14 + src/net/kayomn/common/Request.java | 72 ++++ src/net/kayomn/common/Response.java | 116 ++++++ src/net/kayomn/common/Service.java | 377 +++++++++++++++++++ 14 files changed, 880 insertions(+) create mode 100644 .gitignore create mode 100644 .idea/inspectionProfiles/Project_Default.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/runConfigurations.xml create mode 100644 .idea/vcs.xml create mode 100644 distsys.iml create mode 100644 src/net/kayomn/ControlNode.java create mode 100644 src/net/kayomn/DistributionNode.java create mode 100644 src/net/kayomn/UserNode.java create mode 100644 src/net/kayomn/common/Encodable.java create mode 100644 src/net/kayomn/common/Request.java create mode 100644 src/net/kayomn/common/Response.java create mode 100644 src/net/kayomn/common/Service.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..962444f --- /dev/null +++ b/.gitignore @@ -0,0 +1,74 @@ +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..5ad4eb5 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,7 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..8fd4bcb --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..22736e7 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations.xml b/.idea/runConfigurations.xml new file mode 100644 index 0000000..797acea --- /dev/null +++ b/.idea/runConfigurations.xml @@ -0,0 +1,10 @@ + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/distsys.iml b/distsys.iml new file mode 100644 index 0000000..c90834f --- /dev/null +++ b/distsys.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/src/net/kayomn/ControlNode.java b/src/net/kayomn/ControlNode.java new file mode 100644 index 0000000..9225ae4 --- /dev/null +++ b/src/net/kayomn/ControlNode.java @@ -0,0 +1,76 @@ +package net.kayomn; + +import net.kayomn.common.*; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Scanner; + +public final class ControlNode { + public static final String PrimaryIP = "127.0.0.1"; + + public static final int PrimaryPort = 50000; + + public static final int ServicesExpected = 1; + + public static void main(String[] args) { + var service = new Service("Control"); + + try (var server = service.listen(PrimaryPort)) { + record RemoteService(String name, InetSocketAddress inetAddress) { + + } + + var remoteServices = Collections.synchronizedList(new ArrayList()); + var inScanner = new Scanner(System.in); + var isRunning = true; + + server.onRequest("helo", (inetAddress, data) -> { + var remoteServiceName = new String(data, StandardCharsets.UTF_8); + + if (remoteServiceName.equals("distribution")) { + remoteServices.add(new RemoteService(remoteServiceName, inetAddress)); + + return Response.EmptyOk; + } + + return new Response(Response.Status.ClientFail, "service unknown".getBytes(StandardCharsets.UTF_8)); + }); + + server.onRequest("redy", (inetAddress, data) -> { + if (remoteServices.size() >= ServicesExpected) { + var bodyBuilder = new StringBuilder(); + + for (var remoteService : remoteServices) { + var remoteAddress = remoteService.inetAddress(); + + bodyBuilder.append(remoteService.name()); + bodyBuilder.append('\t'); + bodyBuilder.append(remoteAddress.getHostString()); + bodyBuilder.append('\t'); + bodyBuilder.append(remoteAddress.getPort()); + bodyBuilder.append('\n'); + } + + return new Response(Response.Status.Ok, bodyBuilder.toString().getBytes(StandardCharsets.UTF_8)); + } + + return Response.EmptyBusy; + }); + + server.onQuit((inetAdress, data) -> { + remoteServices.removeIf(remoteService -> remoteService.inetAddress().equals(inetAdress)); + }); + + while (isRunning) { + if (inScanner.nextLine().equals("quit")) { + isRunning = false; + } else { + System.out.println("Unknown command"); + } + } + } + } +} diff --git a/src/net/kayomn/DistributionNode.java b/src/net/kayomn/DistributionNode.java new file mode 100644 index 0000000..3cf72ea --- /dev/null +++ b/src/net/kayomn/DistributionNode.java @@ -0,0 +1,61 @@ +package net.kayomn; + +import net.kayomn.common.Request; +import net.kayomn.common.Response; +import net.kayomn.common.Service; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; + +public class DistributionNode { + public static void main(String[] args) { + var service = new Service("Distribution"); + + try (var client = service.connect(new InetSocketAddress(ControlNode.PrimaryIP, ControlNode.PrimaryPort))) { + var charset = StandardCharsets.UTF_8; + var heloOptionalResponse = client.request(new Request("helo", "distribution".getBytes(charset))).join(); + + if (heloOptionalResponse.isPresent()) { + var heloResponse = heloOptionalResponse.get(); + + switch (heloResponse.status()) { + case Ok -> { + var redyRequest = new Request("redy"); + var redyOptionalResponse = client.request(redyRequest).join(); + + while (true) { + if (redyOptionalResponse.isEmpty()) { + break; + } + + var redyResponse = redyOptionalResponse.get(); + + if (redyResponse.status() != Response.Status.Busy) { + break; + } + } + + if (redyOptionalResponse.isPresent()) { + var redyResponse = redyOptionalResponse.get(); + + if (redyResponse.status() == Response.Status.Ok) { + System.out.println(new String(redyResponse.body(), StandardCharsets.UTF_8)); + } else { + service.log(Service.LogLevel.Critical, "Failed to reach remote service"); + } + } else { + service.log(Service.LogLevel.Critical, "Server failed to respond"); + } + } + + case ClientFail -> service.log(Service.LogLevel.Critical, "Service unrecognized by control node"); + default -> service.log(Service.LogLevel.Critical, "Unknown server error"); + } + } else { + service.log(Service.LogLevel.Critical, "Failed to reach remote service"); + } + + client.request(new Request("quit")).join(); + } + } +} diff --git a/src/net/kayomn/UserNode.java b/src/net/kayomn/UserNode.java new file mode 100644 index 0000000..adb434c --- /dev/null +++ b/src/net/kayomn/UserNode.java @@ -0,0 +1,42 @@ +package net.kayomn; + +import net.kayomn.common.*; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Scanner; + +public final class UserNode { + public static void main(String[] args) { + var service = new Service("User"); + + try (var client = service.connect(new InetSocketAddress(ControlNode.PrimaryIP, ControlNode.PrimaryPort))) { + var inScanner = new Scanner(System.in); + var isRunning = true; + + while (isRunning) { + var commandName = inScanner.next(); + var commandData = inScanner.nextLine(); + + var optionalResponse = client.request(new Request( + commandName, + commandData.getBytes(StandardCharsets.UTF_8) + )).join(); + + if (optionalResponse.isPresent()) { + var response = optionalResponse.get(); + + if (response.status() == Response.Status.Ok) { + if (commandName.equals("quit")) { + isRunning = false; + } else { + System.out.println(new String(response.body(), StandardCharsets.UTF_8)); + } + } + } else { + service.log(Service.LogLevel.Critical, "Request to reach remote service"); + } + } + } + } +} diff --git a/src/net/kayomn/common/Encodable.java b/src/net/kayomn/common/Encodable.java new file mode 100644 index 0000000..940e385 --- /dev/null +++ b/src/net/kayomn/common/Encodable.java @@ -0,0 +1,14 @@ +package net.kayomn.common; + +import java.nio.ByteBuffer; + +/** + * A data transfer type that can be transformed into a {@link ByteBuffer} for transmission over serial devices like file + * systems and networks. + */ +public interface Encodable { + /** + * Transforms the contained data into a series of bytes and returns them as a {@link ByteBuffer}. + */ + ByteBuffer encode(); +} diff --git a/src/net/kayomn/common/Request.java b/src/net/kayomn/common/Request.java new file mode 100644 index 0000000..f8f7333 --- /dev/null +++ b/src/net/kayomn/common/Request.java @@ -0,0 +1,72 @@ +package net.kayomn.common; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +/** + * Data transfer type used by {@link Service}s for requesting {@link Response}s from remote server {@link Service}s. + * + * {@link Request#name()} marks the utf-8 encoded name of the request, which is used by {@link Service}s to determine + * how it is processed on the server-side. + * + * {@link Request#data()} contains any request-specific argument data that may be acknowledged and used by the + * processing {@link Service}. + */ +public record Request(String name, byte[] data) implements Encodable { + private static final Charset NameCharset = StandardCharsets.UTF_8; + + private static final int HeaderSize = (Integer.BYTES + Integer.BYTES); + + /** + * Constructs a new {@link Request} instance with {@code name} as the request name and no additional data. + */ + public Request(String name) { + this(name, new byte[0]); + } + + /** + * Attempts to decode {@code buffer} into a new {@link Request} instance, returning it or nothing wrapped in an + * {@link Optional}. + * + * Nothing is returned if the data contained in {@code buffer} does not follow a supported byte pattern for + * decoding. + */ + public static Optional decode(ByteBuffer buffer) { + if (buffer.capacity() > HeaderSize) { + // Request name must be at least one byte long. + var nameSize = buffer.getInt(0); + var dataSize = buffer.getInt(Integer.BYTES); + var name = new byte[nameSize]; + var data = new byte[dataSize]; + + buffer.get(HeaderSize, name, 0, nameSize); + buffer.get(HeaderSize + nameSize, data, 0, dataSize); + + return Optional.of(new Request(new String(name, NameCharset), data)); + } + + return Optional.empty(); + } + + /** + * Transforms the {@link Request} into a series of bytes, returning it as a {@link ByteBuffer}. + * + * The returned {@link ByteBuffer} may be passed to {@link #decode(ByteBuffer)} to be decoded back into a + * {@link Request}. + */ + @Override + public ByteBuffer encode() { + var nameBytes = this.name.getBytes(NameCharset); + var nameSize = nameBytes.length; + var dataSize = this.data.length; + + return ByteBuffer.allocate(HeaderSize + nameSize + dataSize) + .putInt(nameSize) + .putInt(dataSize) + .put(nameBytes) + .put(this.data) + .position(0); + } +} diff --git a/src/net/kayomn/common/Response.java b/src/net/kayomn/common/Response.java new file mode 100644 index 0000000..2e1484c --- /dev/null +++ b/src/net/kayomn/common/Response.java @@ -0,0 +1,116 @@ +package net.kayomn.common; + +import java.nio.ByteBuffer; +import java.util.Optional; + +/** + * Data transfer type used by {@link Service}s for responding to received {@link Request}s from remote client + * {@link Service}s. + * + * {@link Response#status()} identifies the success of the response, with {@link Status#Ok} acting as the generic + * "success" state. See {@link Status} for more information on the potential status codes that a {@link Response} may + * hold. + * + * {@link Response#body()} contains the data requested as raw bytes. + */ +public record Response(Status status, byte[] body) implements Encodable { + /** + * Shortcut constant to avoid constructing another {@link Response} that acts as an empty "Busy" status. + */ + public static final Response EmptyBusy = new Response(Status.Busy); + + /** + * Shortcut constant to avoid constructing another {@link Response} that acts as an empty "Ok" status. + */ + public static final Response EmptyOk = new Response(Status.Ok); + + private static final int HeaderSize = (Integer.BYTES + Integer.BYTES); + + /** + * Strongly typed abstractions over the raw response status code integer values. + */ + public enum Status { + Ok, + Busy, + ClientFail, + ServerFail, + } + + /** + * Constructs a new {@link Response} with {@code status} as the status code and an empty body. + */ + public Response(Status status) { + this(status, new byte[0]); + } + + /** + * Attempts to decode {@code buffer} into a new {@link Response} instance, returning it or nothing wrapped in an + * {@link Optional}. + * + * Nothing is returned if the data contained in {@code buffer} does not follow a supported byte pattern for + * decoding. + */ + public static Optional decode(ByteBuffer buffer) { + if (buffer.capacity() >= HeaderSize) { + var statusCode = buffer.getInt(0); + + if ((statusCode >= 200) && (statusCode < 300)) { + var bodySize = buffer.getInt(Integer.BYTES); + var bodyBytes = new byte[bodySize]; + + buffer.get(HeaderSize, bodyBytes, 0, bodySize); + + return Optional.of(new Response(Status.Ok, bodyBytes)); + } + + if ((statusCode >= 300) && (statusCode < 400)) { + var bodySize = buffer.getInt(Integer.BYTES); + var bodyBytes = new byte[bodySize]; + + buffer.get(HeaderSize, bodyBytes, 0, bodySize); + + return Optional.of(new Response(Status.Busy, bodyBytes)); + } + + if ((statusCode >= 400) && (statusCode < 500)) { + var bodySize = buffer.getInt(Integer.BYTES); + var bodyBytes = new byte[bodySize]; + + buffer.get(HeaderSize, bodyBytes, 0, bodySize); + + return Optional.of(new Response(Status.ClientFail, bodyBytes)); + } + + if ((statusCode >= 500) && (statusCode < 600)) { + var bodySize = buffer.getInt(Integer.BYTES); + var bodyBytes = new byte[bodySize]; + + buffer.get(HeaderSize, bodyBytes, 0, bodySize); + + return Optional.of(new Response(Status.ServerFail, bodyBytes)); + } + } + + return Optional.empty(); + } + + /** + * Transforms the {@link Response} into a series of bytes, returning it as a {@link ByteBuffer}. + * + * The returned {@link ByteBuffer} may be passed to {@link #decode(ByteBuffer)} to be decoded back into a + * {@link Response}. + */ + public ByteBuffer encode() { + var bodySize = this.body.length; + var buffer = ByteBuffer.allocate(HeaderSize + bodySize); + + buffer.putInt(switch (this.status) { + case Ok -> 200; + case Busy -> 300; + case ClientFail -> 400; + case ServerFail -> 500; + }); + + return buffer.putInt(bodySize).put(this.body).position(0); + } +} diff --git a/src/net/kayomn/common/Service.java b/src/net/kayomn/common/Service.java new file mode 100644 index 0000000..c5a672f --- /dev/null +++ b/src/net/kayomn/common/Service.java @@ -0,0 +1,377 @@ +package net.kayomn.common; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.StandardSocketOptions; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.*; + +/** + * General-purpose networking abstraction capable of spawning {@link Client} and {@link Server} instances for + * communicating with other remote {@link Service}s over a network. + */ +public class Service { + /** + * Client-server connection for sending requests to a remote server. + */ + public interface Client extends AutoCloseable { + /** + * Closes the client connection to the server, freeing any associated network resources. + */ + @Override + void close(); + + /** + * Submits {@code request} as a request to the connected server, returning a {@link CompletableFuture} that will + * hold the resulting {@link Response} wrapped in a {@link Optional} at a later point. + * + * If the request failed to reach the server, the {@link CompletableFuture} will hold an empty {@link Optional} + * instead. + * + * See {@link CompletableFuture} for more information on how to await and query for the future value. + */ + CompletableFuture> request(Request request); + } + + /** + * Log severity level identifiers used for hinting how their associated message should be logged. + */ + public enum LogLevel { + Info, + Warning, + Critical, + } + + /** + * Client-server broadcaster for receiving requests from remote clients. + */ + public interface Server extends AutoCloseable { + /** + * Closes the server broadcast from all clients, freeing any associated network resources. + */ + @Override + void close(); + + /** + * Assigns {@code quitConsumer} as the handling logic for any future client quit requests. + */ + void onQuit(BiConsumer quitConsumer); + + /** + * Assigns {@code requestProcessor} as the handling logic for any future client requests submitted with the name + * {@code requestName}. + */ + void onRequest(String requestName, BiFunction requestProcessor); + } + + private final String name; + + /** + * Constructs a new {@link Service} with {@code serviceName} as its name. + */ + public Service(String serviceName) { + this.name = serviceName; + } + + /** + * Attempts to connect to a service located at {@code socketAddress}, returning a {@link Client} instance + * representing the established connection. + * + * Should any exception occur while establishing the connection to the remote service, an {@link IOException} is + * thrown. + * + * {@link Client#close} must be called to close the connection and release any associated network resources. + */ + public Client connect(InetSocketAddress socketAddress) { + final class ClientImplementation implements Client, Runnable { + record Event(Request request, Consumer> optionalResponseConsumer) { + + } + + private final ArrayBlockingQueue eventQueue; + + private final AtomicBoolean isRunning; + + public ClientImplementation() { + this.eventQueue = new ArrayBlockingQueue<>(64); + this.isRunning = new AtomicBoolean(true); + } + + @Override + public void close() { + this.isRunning.set(false); + } + + @Override + public CompletableFuture> request(Request request) { + // The "complete" function which sets the held data of the future is passed to a list of functions to be + // called later, once the request has been processed in the event queue by the server and the remote + // server has generated a response. + var responseFuture = new CompletableFuture>(); + + this.eventQueue.offer(new Event(request, responseFuture::complete)); + + return responseFuture; + } + + @Override + public void run() { + // Attempt to open a connection. + try (var socketChannel = SocketChannel.open(socketAddress)) { + log(LogLevel.Info, "Client started connection to " + socketAddress); + + var buffer = ByteBuffer.allocateDirect(1024); + + // Handle connection to server. + while (this.isRunning.get()) { + while (!this.eventQueue.isEmpty()) { + try { + // Each event is submitted to the remote server one-by-one from the queue. + var event = this.eventQueue.poll(); + var encodedRequest = event.request().encode(); + + if (socketChannel.write(encodedRequest) == encodedRequest.capacity()) { + if (socketChannel.read(buffer) > 0) { + // Send the decoded request to the "complete" function of the future created + // earlier in "request". + event.optionalResponseConsumer().accept(Response.decode(buffer)); + } + + buffer.clear(); + } + } catch (IOException exception) { + log( + LogLevel.Warning, + "Failed to communicate with remote service: " + exception.getMessage() + ); + } + } + } + } catch (IOException exception) { + log(LogLevel.Critical, "Failed to connect to " + socketAddress + ": " + exception.getMessage()); + + Optional optionalResponse = Optional.empty(); + + while (this.isRunning.get()) { + // Respond to any future requests after the exception with empty optionals. + while (!this.eventQueue.isEmpty()) { + this.eventQueue.poll().optionalResponseConsumer().accept(optionalResponse); + } + } + } + } + } + + var client = new ClientImplementation(); + + new Thread(client).start(); + + return client; + } + + /** + * Attempts to listen on the port {@code portNumber}, returning a {@link Server} instance representing the running + * broadcast. + * + * A {@code portNumber} value of {@code 0} will make the server choose the first available port it can find. + * Otherwise, any integer between {@code 1} and {@code 65535} is a valid port number. + * + * While most request names are freely programmable, certain ones are hardcoded, namely: + * + * * "quit" is reserved to disconnect the client connection from the server and respond with + * {@link Response#EmptyOk} if successful. + * + * * "noop" is reserved as a quiet operation with no side effects to server state, responding with + * {@link Response#EmptyOk} if successful. NOOP requests are useful for checking the validity of the connection + * from the client-side. + * + * Should any exception occur while starting the broadcast, an {@link IOException} is thrown. + * + * {@link Server#close} must be called to kill the broadcast and release any associated network resources. + */ + public Server listen(int portNumber) { + final class ServerImplementation implements Runnable, Server { + private final HashMap> requestProcessors; + + private Optional> optionalQuitConsumer; + + private final AtomicBoolean isRunning; + + public ServerImplementation() { + this.requestProcessors = new HashMap<>(); + this.isRunning = new AtomicBoolean(true); + this.optionalQuitConsumer = Optional.empty(); + } + + @Override + public void close() { + this.isRunning.set(false); + } + + private void handle(SocketChannel clientSocketChannel) { + try { + var inBuffer = ByteBuffer.allocateDirect(1024); + + if (clientSocketChannel.read(inBuffer) > 0) { + var decodedRequest = Request.decode(inBuffer); + + if (decodedRequest.isPresent()) { + var request = decodedRequest.get(); + var requestName = request.name(); + + // Some requests are hardcoded into service servers. + switch (requestName) { + case "quit" -> { + // Attempting to acquire the local address after closing the socket channel throws + // an unchecked exception. + var address = clientSocketChannel.getRemoteAddress(); + + this.optionalQuitConsumer.ifPresent(inetSocketAddressBiConsumer -> { + inetSocketAddressBiConsumer.accept((InetSocketAddress)address, request.data()); + }); + + clientSocketChannel.write(new Response(Response.Status.Ok).encode()); + clientSocketChannel.close(); + log(LogLevel.Info, address + " disconnected"); + } + + case "noop" -> clientSocketChannel.write(Response.EmptyOk.encode()); + + default -> { + log( + LogLevel.Info, + clientSocketChannel.getRemoteAddress() + " requested \"" + requestName + "\"" + ); + + var requestProcessor = Optional.ofNullable( + this.requestProcessors.get(requestName) + ); + + if (requestProcessor.isPresent()) { + clientSocketChannel.write( + requestProcessor.get().apply( + (InetSocketAddress)clientSocketChannel.getRemoteAddress(), + request.data() + ).encode() + ); + } else { + clientSocketChannel.write(new Response( + Response.Status.ClientFail, + "request name unsupported".getBytes(StandardCharsets.UTF_8) + ).encode()); + } + } + } + } else { + clientSocketChannel.write(new Response( + Response.Status.ClientFail, + "request corrupt".getBytes(StandardCharsets.UTF_8) + ).encode()); + } + } else { + clientSocketChannel.write(new Response( + Response.Status.ClientFail, + "request empty".getBytes(StandardCharsets.UTF_8) + ).encode()); + } + } catch (IOException exception) { + log(LogLevel.Warning, "Failed to reach client: " + exception.getMessage()); + + try { + clientSocketChannel.close(); + } catch (IOException closeException) { + log(LogLevel.Warning, "Failed to close client connection: " + closeException.getMessage()); + } + } + } + + @Override + public void onQuit(BiConsumer quitConsumer) { + this.optionalQuitConsumer = Optional.of(quitConsumer); + } + + public void onRequest(String requestName, BiFunction requestProcessor) { + this.requestProcessors.put(requestName, requestProcessor); + } + + @Override + public void run() { + try { + var selector = Selector.open(); + + var serverSocketChannel = ServerSocketChannel.open() + .setOption(StandardSocketOptions.SO_REUSEADDR, true) + .bind(new InetSocketAddress(portNumber)); + + serverSocketChannel.configureBlocking(false); + serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); + log(LogLevel.Info, "Server started on " + portNumber); + + while (this.isRunning.get()) { + try { + selector.select(100); + + var selectedKeyIterator = selector.selectedKeys().iterator(); + + while (selectedKeyIterator.hasNext()) { + var selectedKey = selectedKeyIterator.next(); + + selectedKeyIterator.remove(); + + if (selectedKey.isAcceptable()) { + // Handle incoming connection... + SocketChannel clientSocketChannel = serverSocketChannel.accept(); + + clientSocketChannel.configureBlocking(false); + clientSocketChannel.register(selector, SelectionKey.OP_READ); + log(LogLevel.Info, clientSocketChannel.getRemoteAddress() + " connected"); + } + + if (selectedKey.isReadable()) { + // Handle incoming request... + this.handle((SocketChannel)selectedKey.channel()); + } + } + } catch (IOException exception) { + log(LogLevel.Warning, "Failed to handle client: " + exception.getMessage()); + } + } + + serverSocketChannel.close(); + selector.close(); + + log(LogLevel.Info, "Closed"); + } catch (IOException exception) { + log(LogLevel.Critical, "Failed to bind service on port: " + exception.getMessage()); + } + } + } + + var server = new ServerImplementation(); + + new Thread(server).start(); + + return server; + } + + /** + * Logs {@code message} using {@code logLevel} as the log severity. + * + * See {@link LogLevel} for more information on log severity levels. + */ + public void log(LogLevel logLevel, String message) { + var composedMessage = ("[" + this.name + "] " + message); + + switch (logLevel) { + case Info -> System.out.println(composedMessage); + case Warning -> System.out.println(composedMessage); + case Critical -> System.err.println(composedMessage); + } + } +}