Initial commit
This commit is contained in:
commit
b842cbde9e
|
@ -0,0 +1,74 @@
|
|||
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
|
||||
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
|
||||
|
||||
# User-specific stuff
|
||||
.idea/**/workspace.xml
|
||||
.idea/**/tasks.xml
|
||||
.idea/**/usage.statistics.xml
|
||||
.idea/**/dictionaries
|
||||
.idea/**/shelf
|
||||
|
||||
# AWS User-specific
|
||||
.idea/**/aws.xml
|
||||
|
||||
# Generated files
|
||||
.idea/**/contentModel.xml
|
||||
|
||||
# Sensitive or high-churn files
|
||||
.idea/**/dataSources/
|
||||
.idea/**/dataSources.ids
|
||||
.idea/**/dataSources.local.xml
|
||||
.idea/**/sqlDataSources.xml
|
||||
.idea/**/dynamic.xml
|
||||
.idea/**/uiDesigner.xml
|
||||
.idea/**/dbnavigator.xml
|
||||
|
||||
# Gradle
|
||||
.idea/**/gradle.xml
|
||||
.idea/**/libraries
|
||||
|
||||
# Gradle and Maven with auto-import
|
||||
# When using Gradle or Maven with auto-import, you should exclude module files,
|
||||
# since they will be recreated, and may cause churn. Uncomment if using
|
||||
# auto-import.
|
||||
# .idea/artifacts
|
||||
# .idea/compiler.xml
|
||||
# .idea/jarRepositories.xml
|
||||
# .idea/modules.xml
|
||||
# .idea/*.iml
|
||||
# .idea/modules
|
||||
# *.iml
|
||||
# *.ipr
|
||||
|
||||
# CMake
|
||||
cmake-build-*/
|
||||
|
||||
# Mongo Explorer plugin
|
||||
.idea/**/mongoSettings.xml
|
||||
|
||||
# File-based project format
|
||||
*.iws
|
||||
|
||||
# IntelliJ
|
||||
out/
|
||||
|
||||
# mpeltonen/sbt-idea plugin
|
||||
.idea_modules/
|
||||
|
||||
# JIRA plugin
|
||||
atlassian-ide-plugin.xml
|
||||
|
||||
# Cursive Clojure plugin
|
||||
.idea/replstate.xml
|
||||
|
||||
# Crashlytics plugin (for Android Studio and IntelliJ)
|
||||
com_crashlytics_export_strings.xml
|
||||
crashlytics.properties
|
||||
crashlytics-build.properties
|
||||
fabric.properties
|
||||
|
||||
# Editor-based Rest Client
|
||||
.idea/httpRequests
|
||||
|
||||
# Android studio 3.1+ serialized cache file
|
||||
.idea/caches/build_file_checksums.ser
|
|
@ -0,0 +1,7 @@
|
|||
<component name="InspectionProjectProfileManager">
|
||||
<profile version="1.0">
|
||||
<option name="myName" value="Project Default" />
|
||||
<inspection_tool class="CodeBlock2Expr" enabled="true" level="WEAK WARNING" enabled_by_default="true" />
|
||||
<inspection_tool class="OptionalUsedAsFieldOrParameterType" enabled="false" level="WARNING" enabled_by_default="false" />
|
||||
</profile>
|
||||
</component>
|
|
@ -0,0 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectRootManager" version="2" languageLevel="JDK_16" default="true" project-jdk-name="openjdk-16" project-jdk-type="JavaSDK">
|
||||
<output url="file://$PROJECT_DIR$/out" />
|
||||
</component>
|
||||
</project>
|
|
@ -0,0 +1,8 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="ProjectModuleManager">
|
||||
<modules>
|
||||
<module fileurl="file://$PROJECT_DIR$/distsys.iml" filepath="$PROJECT_DIR$/distsys.iml" />
|
||||
</modules>
|
||||
</component>
|
||||
</project>
|
|
@ -0,0 +1,10 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="RunConfigurationProducerService">
|
||||
<option name="ignoredProducers">
|
||||
<set>
|
||||
<option value="com.android.tools.idea.compose.preview.runconfiguration.ComposePreviewRunConfigurationProducer" />
|
||||
</set>
|
||||
</option>
|
||||
</component>
|
||||
</project>
|
|
@ -0,0 +1,6 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="$PROJECT_DIR$" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
|
@ -0,0 +1,11 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<module type="JAVA_MODULE" version="4">
|
||||
<component name="NewModuleRootManager" inherit-compiler-output="true">
|
||||
<exclude-output />
|
||||
<content url="file://$MODULE_DIR$">
|
||||
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
|
||||
</content>
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
</component>
|
||||
</module>
|
|
@ -0,0 +1,76 @@
|
|||
package net.kayomn;
|
||||
|
||||
import net.kayomn.common.*;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Scanner;
|
||||
|
||||
public final class ControlNode {
|
||||
public static final String PrimaryIP = "127.0.0.1";
|
||||
|
||||
public static final int PrimaryPort = 50000;
|
||||
|
||||
public static final int ServicesExpected = 1;
|
||||
|
||||
public static void main(String[] args) {
|
||||
var service = new Service("Control");
|
||||
|
||||
try (var server = service.listen(PrimaryPort)) {
|
||||
record RemoteService(String name, InetSocketAddress inetAddress) {
|
||||
|
||||
}
|
||||
|
||||
var remoteServices = Collections.synchronizedList(new ArrayList<RemoteService>());
|
||||
var inScanner = new Scanner(System.in);
|
||||
var isRunning = true;
|
||||
|
||||
server.onRequest("helo", (inetAddress, data) -> {
|
||||
var remoteServiceName = new String(data, StandardCharsets.UTF_8);
|
||||
|
||||
if (remoteServiceName.equals("distribution")) {
|
||||
remoteServices.add(new RemoteService(remoteServiceName, inetAddress));
|
||||
|
||||
return Response.EmptyOk;
|
||||
}
|
||||
|
||||
return new Response(Response.Status.ClientFail, "service unknown".getBytes(StandardCharsets.UTF_8));
|
||||
});
|
||||
|
||||
server.onRequest("redy", (inetAddress, data) -> {
|
||||
if (remoteServices.size() >= ServicesExpected) {
|
||||
var bodyBuilder = new StringBuilder();
|
||||
|
||||
for (var remoteService : remoteServices) {
|
||||
var remoteAddress = remoteService.inetAddress();
|
||||
|
||||
bodyBuilder.append(remoteService.name());
|
||||
bodyBuilder.append('\t');
|
||||
bodyBuilder.append(remoteAddress.getHostString());
|
||||
bodyBuilder.append('\t');
|
||||
bodyBuilder.append(remoteAddress.getPort());
|
||||
bodyBuilder.append('\n');
|
||||
}
|
||||
|
||||
return new Response(Response.Status.Ok, bodyBuilder.toString().getBytes(StandardCharsets.UTF_8));
|
||||
}
|
||||
|
||||
return Response.EmptyBusy;
|
||||
});
|
||||
|
||||
server.onQuit((inetAdress, data) -> {
|
||||
remoteServices.removeIf(remoteService -> remoteService.inetAddress().equals(inetAdress));
|
||||
});
|
||||
|
||||
while (isRunning) {
|
||||
if (inScanner.nextLine().equals("quit")) {
|
||||
isRunning = false;
|
||||
} else {
|
||||
System.out.println("Unknown command");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
package net.kayomn;
|
||||
|
||||
import net.kayomn.common.Request;
|
||||
import net.kayomn.common.Response;
|
||||
import net.kayomn.common.Service;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
|
||||
public class DistributionNode {
|
||||
public static void main(String[] args) {
|
||||
var service = new Service("Distribution");
|
||||
|
||||
try (var client = service.connect(new InetSocketAddress(ControlNode.PrimaryIP, ControlNode.PrimaryPort))) {
|
||||
var charset = StandardCharsets.UTF_8;
|
||||
var heloOptionalResponse = client.request(new Request("helo", "distribution".getBytes(charset))).join();
|
||||
|
||||
if (heloOptionalResponse.isPresent()) {
|
||||
var heloResponse = heloOptionalResponse.get();
|
||||
|
||||
switch (heloResponse.status()) {
|
||||
case Ok -> {
|
||||
var redyRequest = new Request("redy");
|
||||
var redyOptionalResponse = client.request(redyRequest).join();
|
||||
|
||||
while (true) {
|
||||
if (redyOptionalResponse.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
|
||||
var redyResponse = redyOptionalResponse.get();
|
||||
|
||||
if (redyResponse.status() != Response.Status.Busy) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (redyOptionalResponse.isPresent()) {
|
||||
var redyResponse = redyOptionalResponse.get();
|
||||
|
||||
if (redyResponse.status() == Response.Status.Ok) {
|
||||
System.out.println(new String(redyResponse.body(), StandardCharsets.UTF_8));
|
||||
} else {
|
||||
service.log(Service.LogLevel.Critical, "Failed to reach remote service");
|
||||
}
|
||||
} else {
|
||||
service.log(Service.LogLevel.Critical, "Server failed to respond");
|
||||
}
|
||||
}
|
||||
|
||||
case ClientFail -> service.log(Service.LogLevel.Critical, "Service unrecognized by control node");
|
||||
default -> service.log(Service.LogLevel.Critical, "Unknown server error");
|
||||
}
|
||||
} else {
|
||||
service.log(Service.LogLevel.Critical, "Failed to reach remote service");
|
||||
}
|
||||
|
||||
client.request(new Request("quit")).join();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
package net.kayomn;
|
||||
|
||||
import net.kayomn.common.*;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Scanner;
|
||||
|
||||
public final class UserNode {
|
||||
public static void main(String[] args) {
|
||||
var service = new Service("User");
|
||||
|
||||
try (var client = service.connect(new InetSocketAddress(ControlNode.PrimaryIP, ControlNode.PrimaryPort))) {
|
||||
var inScanner = new Scanner(System.in);
|
||||
var isRunning = true;
|
||||
|
||||
while (isRunning) {
|
||||
var commandName = inScanner.next();
|
||||
var commandData = inScanner.nextLine();
|
||||
|
||||
var optionalResponse = client.request(new Request(
|
||||
commandName,
|
||||
commandData.getBytes(StandardCharsets.UTF_8)
|
||||
)).join();
|
||||
|
||||
if (optionalResponse.isPresent()) {
|
||||
var response = optionalResponse.get();
|
||||
|
||||
if (response.status() == Response.Status.Ok) {
|
||||
if (commandName.equals("quit")) {
|
||||
isRunning = false;
|
||||
} else {
|
||||
System.out.println(new String(response.body(), StandardCharsets.UTF_8));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
service.log(Service.LogLevel.Critical, "Request to reach remote service");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package net.kayomn.common;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* A data transfer type that can be transformed into a {@link ByteBuffer} for transmission over serial devices like file
|
||||
* systems and networks.
|
||||
*/
|
||||
public interface Encodable {
|
||||
/**
|
||||
* Transforms the contained data into a series of bytes and returns them as a {@link ByteBuffer}.
|
||||
*/
|
||||
ByteBuffer encode();
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package net.kayomn.common;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Data transfer type used by {@link Service}s for requesting {@link Response}s from remote server {@link Service}s.
|
||||
*
|
||||
* {@link Request#name()} marks the utf-8 encoded name of the request, which is used by {@link Service}s to determine
|
||||
* how it is processed on the server-side.
|
||||
*
|
||||
* {@link Request#data()} contains any request-specific argument data that may be acknowledged and used by the
|
||||
* processing {@link Service}.
|
||||
*/
|
||||
public record Request(String name, byte[] data) implements Encodable {
|
||||
private static final Charset NameCharset = StandardCharsets.UTF_8;
|
||||
|
||||
private static final int HeaderSize = (Integer.BYTES + Integer.BYTES);
|
||||
|
||||
/**
|
||||
* Constructs a new {@link Request} instance with {@code name} as the request name and no additional data.
|
||||
*/
|
||||
public Request(String name) {
|
||||
this(name, new byte[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to decode {@code buffer} into a new {@link Request} instance, returning it or nothing wrapped in an
|
||||
* {@link Optional}.
|
||||
*
|
||||
* Nothing is returned if the data contained in {@code buffer} does not follow a supported byte pattern for
|
||||
* decoding.
|
||||
*/
|
||||
public static Optional<Request> decode(ByteBuffer buffer) {
|
||||
if (buffer.capacity() > HeaderSize) {
|
||||
// Request name must be at least one byte long.
|
||||
var nameSize = buffer.getInt(0);
|
||||
var dataSize = buffer.getInt(Integer.BYTES);
|
||||
var name = new byte[nameSize];
|
||||
var data = new byte[dataSize];
|
||||
|
||||
buffer.get(HeaderSize, name, 0, nameSize);
|
||||
buffer.get(HeaderSize + nameSize, data, 0, dataSize);
|
||||
|
||||
return Optional.of(new Request(new String(name, NameCharset), data));
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms the {@link Request} into a series of bytes, returning it as a {@link ByteBuffer}.
|
||||
*
|
||||
* The returned {@link ByteBuffer} may be passed to {@link #decode(ByteBuffer)} to be decoded back into a
|
||||
* {@link Request}.
|
||||
*/
|
||||
@Override
|
||||
public ByteBuffer encode() {
|
||||
var nameBytes = this.name.getBytes(NameCharset);
|
||||
var nameSize = nameBytes.length;
|
||||
var dataSize = this.data.length;
|
||||
|
||||
return ByteBuffer.allocate(HeaderSize + nameSize + dataSize)
|
||||
.putInt(nameSize)
|
||||
.putInt(dataSize)
|
||||
.put(nameBytes)
|
||||
.put(this.data)
|
||||
.position(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
package net.kayomn.common;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Data transfer type used by {@link Service}s for responding to received {@link Request}s from remote client
|
||||
* {@link Service}s.
|
||||
*
|
||||
* {@link Response#status()} identifies the success of the response, with {@link Status#Ok} acting as the generic
|
||||
* "success" state. See {@link Status} for more information on the potential status codes that a {@link Response} may
|
||||
* hold.
|
||||
*
|
||||
* {@link Response#body()} contains the data requested as raw bytes.
|
||||
*/
|
||||
public record Response(Status status, byte[] body) implements Encodable {
|
||||
/**
|
||||
* Shortcut constant to avoid constructing another {@link Response} that acts as an empty "Busy" status.
|
||||
*/
|
||||
public static final Response EmptyBusy = new Response(Status.Busy);
|
||||
|
||||
/**
|
||||
* Shortcut constant to avoid constructing another {@link Response} that acts as an empty "Ok" status.
|
||||
*/
|
||||
public static final Response EmptyOk = new Response(Status.Ok);
|
||||
|
||||
private static final int HeaderSize = (Integer.BYTES + Integer.BYTES);
|
||||
|
||||
/**
|
||||
* Strongly typed abstractions over the raw response status code integer values.
|
||||
*/
|
||||
public enum Status {
|
||||
Ok,
|
||||
Busy,
|
||||
ClientFail,
|
||||
ServerFail,
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new {@link Response} with {@code status} as the status code and an empty body.
|
||||
*/
|
||||
public Response(Status status) {
|
||||
this(status, new byte[0]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to decode {@code buffer} into a new {@link Response} instance, returning it or nothing wrapped in an
|
||||
* {@link Optional}.
|
||||
*
|
||||
* Nothing is returned if the data contained in {@code buffer} does not follow a supported byte pattern for
|
||||
* decoding.
|
||||
*/
|
||||
public static Optional<Response> decode(ByteBuffer buffer) {
|
||||
if (buffer.capacity() >= HeaderSize) {
|
||||
var statusCode = buffer.getInt(0);
|
||||
|
||||
if ((statusCode >= 200) && (statusCode < 300)) {
|
||||
var bodySize = buffer.getInt(Integer.BYTES);
|
||||
var bodyBytes = new byte[bodySize];
|
||||
|
||||
buffer.get(HeaderSize, bodyBytes, 0, bodySize);
|
||||
|
||||
return Optional.of(new Response(Status.Ok, bodyBytes));
|
||||
}
|
||||
|
||||
if ((statusCode >= 300) && (statusCode < 400)) {
|
||||
var bodySize = buffer.getInt(Integer.BYTES);
|
||||
var bodyBytes = new byte[bodySize];
|
||||
|
||||
buffer.get(HeaderSize, bodyBytes, 0, bodySize);
|
||||
|
||||
return Optional.of(new Response(Status.Busy, bodyBytes));
|
||||
}
|
||||
|
||||
if ((statusCode >= 400) && (statusCode < 500)) {
|
||||
var bodySize = buffer.getInt(Integer.BYTES);
|
||||
var bodyBytes = new byte[bodySize];
|
||||
|
||||
buffer.get(HeaderSize, bodyBytes, 0, bodySize);
|
||||
|
||||
return Optional.of(new Response(Status.ClientFail, bodyBytes));
|
||||
}
|
||||
|
||||
if ((statusCode >= 500) && (statusCode < 600)) {
|
||||
var bodySize = buffer.getInt(Integer.BYTES);
|
||||
var bodyBytes = new byte[bodySize];
|
||||
|
||||
buffer.get(HeaderSize, bodyBytes, 0, bodySize);
|
||||
|
||||
return Optional.of(new Response(Status.ServerFail, bodyBytes));
|
||||
}
|
||||
}
|
||||
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms the {@link Response} into a series of bytes, returning it as a {@link ByteBuffer}.
|
||||
*
|
||||
* The returned {@link ByteBuffer} may be passed to {@link #decode(ByteBuffer)} to be decoded back into a
|
||||
* {@link Response}.
|
||||
*/
|
||||
public ByteBuffer encode() {
|
||||
var bodySize = this.body.length;
|
||||
var buffer = ByteBuffer.allocate(HeaderSize + bodySize);
|
||||
|
||||
buffer.putInt(switch (this.status) {
|
||||
case Ok -> 200;
|
||||
case Busy -> 300;
|
||||
case ClientFail -> 400;
|
||||
case ServerFail -> 500;
|
||||
});
|
||||
|
||||
return buffer.putInt(bodySize).put(this.body).position(0);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,377 @@
|
|||
package net.kayomn.common;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.StandardSocketOptions;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.*;
|
||||
|
||||
/**
|
||||
* General-purpose networking abstraction capable of spawning {@link Client} and {@link Server} instances for
|
||||
* communicating with other remote {@link Service}s over a network.
|
||||
*/
|
||||
public class Service {
|
||||
/**
|
||||
* Client-server connection for sending requests to a remote server.
|
||||
*/
|
||||
public interface Client extends AutoCloseable {
|
||||
/**
|
||||
* Closes the client connection to the server, freeing any associated network resources.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Submits {@code request} as a request to the connected server, returning a {@link CompletableFuture} that will
|
||||
* hold the resulting {@link Response} wrapped in a {@link Optional} at a later point.
|
||||
*
|
||||
* If the request failed to reach the server, the {@link CompletableFuture} will hold an empty {@link Optional}
|
||||
* instead.
|
||||
*
|
||||
* See {@link CompletableFuture} for more information on how to await and query for the future value.
|
||||
*/
|
||||
CompletableFuture<Optional<Response>> request(Request request);
|
||||
}
|
||||
|
||||
/**
|
||||
* Log severity level identifiers used for hinting how their associated message should be logged.
|
||||
*/
|
||||
public enum LogLevel {
|
||||
Info,
|
||||
Warning,
|
||||
Critical,
|
||||
}
|
||||
|
||||
/**
|
||||
* Client-server broadcaster for receiving requests from remote clients.
|
||||
*/
|
||||
public interface Server extends AutoCloseable {
|
||||
/**
|
||||
* Closes the server broadcast from all clients, freeing any associated network resources.
|
||||
*/
|
||||
@Override
|
||||
void close();
|
||||
|
||||
/**
|
||||
* Assigns {@code quitConsumer} as the handling logic for any future client quit requests.
|
||||
*/
|
||||
void onQuit(BiConsumer<InetSocketAddress, byte[]> quitConsumer);
|
||||
|
||||
/**
|
||||
* Assigns {@code requestProcessor} as the handling logic for any future client requests submitted with the name
|
||||
* {@code requestName}.
|
||||
*/
|
||||
void onRequest(String requestName, BiFunction<InetSocketAddress, byte[], Response> requestProcessor);
|
||||
}
|
||||
|
||||
private final String name;
|
||||
|
||||
/**
|
||||
* Constructs a new {@link Service} with {@code serviceName} as its name.
|
||||
*/
|
||||
public Service(String serviceName) {
|
||||
this.name = serviceName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to connect to a service located at {@code socketAddress}, returning a {@link Client} instance
|
||||
* representing the established connection.
|
||||
*
|
||||
* Should any exception occur while establishing the connection to the remote service, an {@link IOException} is
|
||||
* thrown.
|
||||
*
|
||||
* {@link Client#close} must be called to close the connection and release any associated network resources.
|
||||
*/
|
||||
public Client connect(InetSocketAddress socketAddress) {
|
||||
final class ClientImplementation implements Client, Runnable {
|
||||
record Event(Request request, Consumer<Optional<Response>> optionalResponseConsumer) {
|
||||
|
||||
}
|
||||
|
||||
private final ArrayBlockingQueue<Event> eventQueue;
|
||||
|
||||
private final AtomicBoolean isRunning;
|
||||
|
||||
public ClientImplementation() {
|
||||
this.eventQueue = new ArrayBlockingQueue<>(64);
|
||||
this.isRunning = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.isRunning.set(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Optional<Response>> request(Request request) {
|
||||
// The "complete" function which sets the held data of the future is passed to a list of functions to be
|
||||
// called later, once the request has been processed in the event queue by the server and the remote
|
||||
// server has generated a response.
|
||||
var responseFuture = new CompletableFuture<Optional<Response>>();
|
||||
|
||||
this.eventQueue.offer(new Event(request, responseFuture::complete));
|
||||
|
||||
return responseFuture;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
// Attempt to open a connection.
|
||||
try (var socketChannel = SocketChannel.open(socketAddress)) {
|
||||
log(LogLevel.Info, "Client started connection to " + socketAddress);
|
||||
|
||||
var buffer = ByteBuffer.allocateDirect(1024);
|
||||
|
||||
// Handle connection to server.
|
||||
while (this.isRunning.get()) {
|
||||
while (!this.eventQueue.isEmpty()) {
|
||||
try {
|
||||
// Each event is submitted to the remote server one-by-one from the queue.
|
||||
var event = this.eventQueue.poll();
|
||||
var encodedRequest = event.request().encode();
|
||||
|
||||
if (socketChannel.write(encodedRequest) == encodedRequest.capacity()) {
|
||||
if (socketChannel.read(buffer) > 0) {
|
||||
// Send the decoded request to the "complete" function of the future created
|
||||
// earlier in "request".
|
||||
event.optionalResponseConsumer().accept(Response.decode(buffer));
|
||||
}
|
||||
|
||||
buffer.clear();
|
||||
}
|
||||
} catch (IOException exception) {
|
||||
log(
|
||||
LogLevel.Warning,
|
||||
"Failed to communicate with remote service: " + exception.getMessage()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException exception) {
|
||||
log(LogLevel.Critical, "Failed to connect to " + socketAddress + ": " + exception.getMessage());
|
||||
|
||||
Optional<Response> optionalResponse = Optional.empty();
|
||||
|
||||
while (this.isRunning.get()) {
|
||||
// Respond to any future requests after the exception with empty optionals.
|
||||
while (!this.eventQueue.isEmpty()) {
|
||||
this.eventQueue.poll().optionalResponseConsumer().accept(optionalResponse);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var client = new ClientImplementation();
|
||||
|
||||
new Thread(client).start();
|
||||
|
||||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to listen on the port {@code portNumber}, returning a {@link Server} instance representing the running
|
||||
* broadcast.
|
||||
*
|
||||
* A {@code portNumber} value of {@code 0} will make the server choose the first available port it can find.
|
||||
* Otherwise, any integer between {@code 1} and {@code 65535} is a valid port number.
|
||||
*
|
||||
* While most request names are freely programmable, certain ones are hardcoded, namely:
|
||||
*
|
||||
* * "quit" is reserved to disconnect the client connection from the server and respond with
|
||||
* {@link Response#EmptyOk} if successful.
|
||||
*
|
||||
* * "noop" is reserved as a quiet operation with no side effects to server state, responding with
|
||||
* {@link Response#EmptyOk} if successful. NOOP requests are useful for checking the validity of the connection
|
||||
* from the client-side.
|
||||
*
|
||||
* Should any exception occur while starting the broadcast, an {@link IOException} is thrown.
|
||||
*
|
||||
* {@link Server#close} must be called to kill the broadcast and release any associated network resources.
|
||||
*/
|
||||
public Server listen(int portNumber) {
|
||||
final class ServerImplementation implements Runnable, Server {
|
||||
private final HashMap<String, BiFunction<InetSocketAddress, byte[], Response>> requestProcessors;
|
||||
|
||||
private Optional<BiConsumer<InetSocketAddress, byte[]>> optionalQuitConsumer;
|
||||
|
||||
private final AtomicBoolean isRunning;
|
||||
|
||||
public ServerImplementation() {
|
||||
this.requestProcessors = new HashMap<>();
|
||||
this.isRunning = new AtomicBoolean(true);
|
||||
this.optionalQuitConsumer = Optional.empty();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
this.isRunning.set(false);
|
||||
}
|
||||
|
||||
private void handle(SocketChannel clientSocketChannel) {
|
||||
try {
|
||||
var inBuffer = ByteBuffer.allocateDirect(1024);
|
||||
|
||||
if (clientSocketChannel.read(inBuffer) > 0) {
|
||||
var decodedRequest = Request.decode(inBuffer);
|
||||
|
||||
if (decodedRequest.isPresent()) {
|
||||
var request = decodedRequest.get();
|
||||
var requestName = request.name();
|
||||
|
||||
// Some requests are hardcoded into service servers.
|
||||
switch (requestName) {
|
||||
case "quit" -> {
|
||||
// Attempting to acquire the local address after closing the socket channel throws
|
||||
// an unchecked exception.
|
||||
var address = clientSocketChannel.getRemoteAddress();
|
||||
|
||||
this.optionalQuitConsumer.ifPresent(inetSocketAddressBiConsumer -> {
|
||||
inetSocketAddressBiConsumer.accept((InetSocketAddress)address, request.data());
|
||||
});
|
||||
|
||||
clientSocketChannel.write(new Response(Response.Status.Ok).encode());
|
||||
clientSocketChannel.close();
|
||||
log(LogLevel.Info, address + " disconnected");
|
||||
}
|
||||
|
||||
case "noop" -> clientSocketChannel.write(Response.EmptyOk.encode());
|
||||
|
||||
default -> {
|
||||
log(
|
||||
LogLevel.Info,
|
||||
clientSocketChannel.getRemoteAddress() + " requested \"" + requestName + "\""
|
||||
);
|
||||
|
||||
var requestProcessor = Optional.ofNullable(
|
||||
this.requestProcessors.get(requestName)
|
||||
);
|
||||
|
||||
if (requestProcessor.isPresent()) {
|
||||
clientSocketChannel.write(
|
||||
requestProcessor.get().apply(
|
||||
(InetSocketAddress)clientSocketChannel.getRemoteAddress(),
|
||||
request.data()
|
||||
).encode()
|
||||
);
|
||||
} else {
|
||||
clientSocketChannel.write(new Response(
|
||||
Response.Status.ClientFail,
|
||||
"request name unsupported".getBytes(StandardCharsets.UTF_8)
|
||||
).encode());
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
clientSocketChannel.write(new Response(
|
||||
Response.Status.ClientFail,
|
||||
"request corrupt".getBytes(StandardCharsets.UTF_8)
|
||||
).encode());
|
||||
}
|
||||
} else {
|
||||
clientSocketChannel.write(new Response(
|
||||
Response.Status.ClientFail,
|
||||
"request empty".getBytes(StandardCharsets.UTF_8)
|
||||
).encode());
|
||||
}
|
||||
} catch (IOException exception) {
|
||||
log(LogLevel.Warning, "Failed to reach client: " + exception.getMessage());
|
||||
|
||||
try {
|
||||
clientSocketChannel.close();
|
||||
} catch (IOException closeException) {
|
||||
log(LogLevel.Warning, "Failed to close client connection: " + closeException.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onQuit(BiConsumer<InetSocketAddress, byte[]> quitConsumer) {
|
||||
this.optionalQuitConsumer = Optional.of(quitConsumer);
|
||||
}
|
||||
|
||||
public void onRequest(String requestName, BiFunction<InetSocketAddress, byte[], Response> requestProcessor) {
|
||||
this.requestProcessors.put(requestName, requestProcessor);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
var selector = Selector.open();
|
||||
|
||||
var serverSocketChannel = ServerSocketChannel.open()
|
||||
.setOption(StandardSocketOptions.SO_REUSEADDR, true)
|
||||
.bind(new InetSocketAddress(portNumber));
|
||||
|
||||
serverSocketChannel.configureBlocking(false);
|
||||
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||
log(LogLevel.Info, "Server started on " + portNumber);
|
||||
|
||||
while (this.isRunning.get()) {
|
||||
try {
|
||||
selector.select(100);
|
||||
|
||||
var selectedKeyIterator = selector.selectedKeys().iterator();
|
||||
|
||||
while (selectedKeyIterator.hasNext()) {
|
||||
var selectedKey = selectedKeyIterator.next();
|
||||
|
||||
selectedKeyIterator.remove();
|
||||
|
||||
if (selectedKey.isAcceptable()) {
|
||||
// Handle incoming connection...
|
||||
SocketChannel clientSocketChannel = serverSocketChannel.accept();
|
||||
|
||||
clientSocketChannel.configureBlocking(false);
|
||||
clientSocketChannel.register(selector, SelectionKey.OP_READ);
|
||||
log(LogLevel.Info, clientSocketChannel.getRemoteAddress() + " connected");
|
||||
}
|
||||
|
||||
if (selectedKey.isReadable()) {
|
||||
// Handle incoming request...
|
||||
this.handle((SocketChannel)selectedKey.channel());
|
||||
}
|
||||
}
|
||||
} catch (IOException exception) {
|
||||
log(LogLevel.Warning, "Failed to handle client: " + exception.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
serverSocketChannel.close();
|
||||
selector.close();
|
||||
|
||||
log(LogLevel.Info, "Closed");
|
||||
} catch (IOException exception) {
|
||||
log(LogLevel.Critical, "Failed to bind service on port: " + exception.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var server = new ServerImplementation();
|
||||
|
||||
new Thread(server).start();
|
||||
|
||||
return server;
|
||||
}
|
||||
|
||||
/**
|
||||
* Logs {@code message} using {@code logLevel} as the log severity.
|
||||
*
|
||||
* See {@link LogLevel} for more information on log severity levels.
|
||||
*/
|
||||
public void log(LogLevel logLevel, String message) {
|
||||
var composedMessage = ("[" + this.name + "] " + message);
|
||||
|
||||
switch (logLevel) {
|
||||
case Info -> System.out.println(composedMessage);
|
||||
case Warning -> System.out.println(composedMessage);
|
||||
case Critical -> System.err.println(composedMessage);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue