mirror of
https://github.com/sp-tarkov/server.git
synced 2025-02-13 09:50:43 -05:00
Handle websockets asynchronously & Fix up typing on HTTP (#1008)
Should handle the websockets asynchronously, also has some typing updates considering ws's server is not called Server anymore but is now 'WebSocketServer' --------- Co-authored-by: Chomp <27521899+chompDev@users.noreply.github.com>
This commit is contained in:
parent
d1d2adcc78
commit
a87dc96e77
@ -4,7 +4,7 @@ import { ContextVariableType } from "@spt/context/ContextVariableType";
|
|||||||
import { HttpServerHelper } from "@spt/helpers/HttpServerHelper";
|
import { HttpServerHelper } from "@spt/helpers/HttpServerHelper";
|
||||||
import { ConfigTypes } from "@spt/models/enums/ConfigTypes";
|
import { ConfigTypes } from "@spt/models/enums/ConfigTypes";
|
||||||
import { IHttpConfig } from "@spt/models/spt/config/IHttpConfig";
|
import { IHttpConfig } from "@spt/models/spt/config/IHttpConfig";
|
||||||
import { ILogger } from "@spt/models/spt/utils/ILogger";
|
import type { ILogger } from "@spt/models/spt/utils/ILogger";
|
||||||
import { ConfigServer } from "@spt/servers/ConfigServer";
|
import { ConfigServer } from "@spt/servers/ConfigServer";
|
||||||
import { WebSocketServer } from "@spt/servers/WebSocketServer";
|
import { WebSocketServer } from "@spt/servers/WebSocketServer";
|
||||||
import { IHttpListener } from "@spt/servers/http/IHttpListener";
|
import { IHttpListener } from "@spt/servers/http/IHttpListener";
|
||||||
@ -14,7 +14,7 @@ import { inject, injectAll, injectable } from "tsyringe";
|
|||||||
@injectable()
|
@injectable()
|
||||||
export class HttpServer {
|
export class HttpServer {
|
||||||
protected httpConfig: IHttpConfig;
|
protected httpConfig: IHttpConfig;
|
||||||
protected started: boolean;
|
protected started = false;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
@inject("PrimaryLogger") protected logger: ILogger,
|
@inject("PrimaryLogger") protected logger: ILogger,
|
||||||
@ -102,7 +102,7 @@ export class HttpServer {
|
|||||||
* @param remoteAddress Address to check
|
* @param remoteAddress Address to check
|
||||||
* @returns True if its local
|
* @returns True if its local
|
||||||
*/
|
*/
|
||||||
protected isLocalRequest(remoteAddress: string): boolean {
|
protected isLocalRequest(remoteAddress: string | undefined): boolean | undefined {
|
||||||
if (!remoteAddress) {
|
if (!remoteAddress) {
|
||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
@ -1,16 +1,17 @@
|
|||||||
import http, { IncomingMessage } from "node:http";
|
import http, { IncomingMessage } from "node:http";
|
||||||
import { HttpServerHelper } from "@spt/helpers/HttpServerHelper";
|
import { HttpServerHelper } from "@spt/helpers/HttpServerHelper";
|
||||||
import { ILogger } from "@spt/models/spt/utils/ILogger";
|
import type { ILogger } from "@spt/models/spt/utils/ILogger";
|
||||||
import { IWebSocketConnectionHandler } from "@spt/servers/ws/IWebSocketConnectionHandler";
|
import { IWebSocketConnectionHandler } from "@spt/servers/ws/IWebSocketConnectionHandler";
|
||||||
import { LocalisationService } from "@spt/services/LocalisationService";
|
import { LocalisationService } from "@spt/services/LocalisationService";
|
||||||
import { JsonUtil } from "@spt/utils/JsonUtil";
|
import { JsonUtil } from "@spt/utils/JsonUtil";
|
||||||
import { RandomUtil } from "@spt/utils/RandomUtil";
|
import { RandomUtil } from "@spt/utils/RandomUtil";
|
||||||
import { inject, injectAll, injectable } from "tsyringe";
|
import { inject, injectAll, injectable } from "tsyringe";
|
||||||
import { Server, WebSocket } from "ws";
|
import { WebSocketServer as Server } from "ws";
|
||||||
|
import { SPTWebSocket } from "./ws/SPTWebsocket";
|
||||||
|
|
||||||
@injectable()
|
@injectable()
|
||||||
export class WebSocketServer {
|
export class WebSocketServer {
|
||||||
protected webSocketServer: Server;
|
protected webSocketServer: Server | undefined;
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
@inject("PrimaryLogger") protected logger: ILogger,
|
@inject("PrimaryLogger") protected logger: ILogger,
|
||||||
@ -21,12 +22,12 @@ export class WebSocketServer {
|
|||||||
@injectAll("WebSocketConnectionHandler") protected webSocketConnectionHandlers: IWebSocketConnectionHandler[],
|
@injectAll("WebSocketConnectionHandler") protected webSocketConnectionHandlers: IWebSocketConnectionHandler[],
|
||||||
) {}
|
) {}
|
||||||
|
|
||||||
public getWebSocketServer(): Server {
|
public getWebSocketServer(): Server | undefined {
|
||||||
return this.webSocketServer;
|
return this.webSocketServer;
|
||||||
}
|
}
|
||||||
|
|
||||||
public setupWebSocket(httpServer: http.Server): void {
|
public setupWebSocket(httpServer: http.Server): void {
|
||||||
this.webSocketServer = new Server({ server: httpServer });
|
this.webSocketServer = new Server({ server: httpServer, WebSocket: SPTWebSocket });
|
||||||
|
|
||||||
this.webSocketServer.addListener("listening", () => {
|
this.webSocketServer.addListener("listening", () => {
|
||||||
this.logger.success(
|
this.logger.success(
|
||||||
@ -37,7 +38,9 @@ export class WebSocketServer {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
this.webSocketServer.addListener("connection", this.wsOnConnection.bind(this));
|
this.webSocketServer.addListener("connection", async (ws: SPTWebSocket, msg) => {
|
||||||
|
await this.wsOnConnection(ws, msg);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
protected getRandomisedMessage(): string {
|
protected getRandomisedMessage(): string {
|
||||||
@ -50,18 +53,19 @@ export class WebSocketServer {
|
|||||||
: this.localisationService.getText("server_start_success");
|
: this.localisationService.getText("server_start_success");
|
||||||
}
|
}
|
||||||
|
|
||||||
protected wsOnConnection(ws: WebSocket, req: IncomingMessage): void {
|
protected async wsOnConnection(ws: SPTWebSocket, req: IncomingMessage): Promise<void> {
|
||||||
const socketHandlers = this.webSocketConnectionHandlers.filter((wsh) => req.url.includes(wsh.getHookUrl()));
|
const socketHandlers = this.webSocketConnectionHandlers.filter((wsh) => req.url.includes(wsh.getHookUrl()));
|
||||||
if ((socketHandlers?.length ?? 0) === 0) {
|
if ((socketHandlers?.length ?? 0) === 0) {
|
||||||
const message = `Socket connection received for url ${req.url}, but there is not websocket handler configured for it`;
|
const message = `Socket connection received for url ${req.url}, but there is not websocket handler configured for it`;
|
||||||
this.logger.warning(message);
|
this.logger.warning(message);
|
||||||
ws.send(this.jsonUtil.serialize({ error: message }));
|
await ws.sendAsync(this.jsonUtil.serialize({ error: message }));
|
||||||
ws.close();
|
await ws.closeAsync();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
socketHandlers.forEach((wsh) => {
|
|
||||||
wsh.onConnection(ws, req);
|
for (const wsh of socketHandlers) {
|
||||||
|
await wsh.onConnection(ws, req);
|
||||||
this.logger.info(`WebSocketHandler "${wsh.getSocketId()}" connected`);
|
this.logger.info(`WebSocketHandler "${wsh.getSocketId()}" connected`);
|
||||||
});
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -91,7 +91,7 @@ export class SptHttpListener implements IHttpListener {
|
|||||||
sessionID: string,
|
sessionID: string,
|
||||||
req: IncomingMessage,
|
req: IncomingMessage,
|
||||||
resp: ServerResponse,
|
resp: ServerResponse,
|
||||||
body: Buffer,
|
body: Buffer | undefined,
|
||||||
output: string,
|
output: string,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const bodyInfo = this.getBodyInfo(body);
|
const bodyInfo = this.getBodyInfo(body);
|
||||||
@ -138,7 +138,7 @@ export class SptHttpListener implements IHttpListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public async getResponse(sessionID: string, req: IncomingMessage, body: Buffer): Promise<string> {
|
public async getResponse(sessionID: string, req: IncomingMessage, body: Buffer | undefined): Promise<string> {
|
||||||
const info = this.getBodyInfo(body, req.url);
|
const info = this.getBodyInfo(body, req.url);
|
||||||
if (globalThis.G_LOG_REQUESTS) {
|
if (globalThis.G_LOG_REQUESTS) {
|
||||||
// Parse quest info into object
|
// Parse quest info into object
|
||||||
@ -158,7 +158,7 @@ export class SptHttpListener implements IHttpListener {
|
|||||||
return output;
|
return output;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected getBodyInfo(body: Buffer, requestUrl = undefined): any {
|
protected getBodyInfo(body: Buffer | undefined, requestUrl = undefined): any {
|
||||||
const text = body ? body.toString() : "{}";
|
const text = body ? body.toString() : "{}";
|
||||||
const info = text ? this.jsonUtil.deserialize<any>(text, requestUrl) : {};
|
const info = text ? this.jsonUtil.deserialize<any>(text, requestUrl) : {};
|
||||||
return info;
|
return info;
|
||||||
|
@ -1,8 +1,8 @@
|
|||||||
import { IncomingMessage } from "node:http";
|
import { IncomingMessage } from "node:http";
|
||||||
import { WebSocket } from "ws";
|
import { SPTWebSocket } from "./SPTWebsocket";
|
||||||
|
|
||||||
export interface IWebSocketConnectionHandler {
|
export interface IWebSocketConnectionHandler {
|
||||||
getSocketId(): string;
|
getSocketId(): string;
|
||||||
getHookUrl(): string;
|
getHookUrl(): string;
|
||||||
onConnection(ws: WebSocket, req: IncomingMessage): void;
|
onConnection(ws: SPTWebSocket, req: IncomingMessage): Promise<void>;
|
||||||
}
|
}
|
||||||
|
24
project/src/servers/ws/SPTWebsocket.ts
Normal file
24
project/src/servers/ws/SPTWebsocket.ts
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
import WebSocket from "ws";
|
||||||
|
|
||||||
|
export class SPTWebSocket extends WebSocket {
|
||||||
|
// biome-ignore lint/suspicious/noExplicitAny: Any is required here, I dont see any other way considering it will complain if we use BufferLike
|
||||||
|
public sendAsync(data: any): Promise<void> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this.send(data, (error) => {
|
||||||
|
if (error) {
|
||||||
|
reject(error);
|
||||||
|
} else {
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public closeAsync(): Promise<void> {
|
||||||
|
return new Promise((resolve, reject) => {
|
||||||
|
this.on('close', () => resolve());
|
||||||
|
this.on('error', (err) => reject(err));
|
||||||
|
this.close();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -4,7 +4,7 @@ import { IWsNotificationEvent } from "@spt/models/eft/ws/IWsNotificationEvent";
|
|||||||
import { ConfigTypes } from "@spt/models/enums/ConfigTypes";
|
import { ConfigTypes } from "@spt/models/enums/ConfigTypes";
|
||||||
import { NotificationEventType } from "@spt/models/enums/NotificationEventType";
|
import { NotificationEventType } from "@spt/models/enums/NotificationEventType";
|
||||||
import { IHttpConfig } from "@spt/models/spt/config/IHttpConfig";
|
import { IHttpConfig } from "@spt/models/spt/config/IHttpConfig";
|
||||||
import { ILogger } from "@spt/models/spt/utils/ILogger";
|
import type { ILogger } from "@spt/models/spt/utils/ILogger";
|
||||||
import { ConfigServer } from "@spt/servers/ConfigServer";
|
import { ConfigServer } from "@spt/servers/ConfigServer";
|
||||||
import { IWebSocketConnectionHandler } from "@spt/servers/ws/IWebSocketConnectionHandler";
|
import { IWebSocketConnectionHandler } from "@spt/servers/ws/IWebSocketConnectionHandler";
|
||||||
import { ISptWebSocketMessageHandler } from "@spt/servers/ws/message/ISptWebSocketMessageHandler";
|
import { ISptWebSocketMessageHandler } from "@spt/servers/ws/message/ISptWebSocketMessageHandler";
|
||||||
@ -12,11 +12,12 @@ import { LocalisationService } from "@spt/services/LocalisationService";
|
|||||||
import { JsonUtil } from "@spt/utils/JsonUtil";
|
import { JsonUtil } from "@spt/utils/JsonUtil";
|
||||||
import { inject, injectAll, injectable } from "tsyringe";
|
import { inject, injectAll, injectable } from "tsyringe";
|
||||||
import { WebSocket } from "ws";
|
import { WebSocket } from "ws";
|
||||||
|
import { SPTWebSocket } from "./SPTWebsocket";
|
||||||
|
|
||||||
@injectable()
|
@injectable()
|
||||||
export class SptWebSocketConnectionHandler implements IWebSocketConnectionHandler {
|
export class SptWebSocketConnectionHandler implements IWebSocketConnectionHandler {
|
||||||
protected httpConfig: IHttpConfig;
|
protected httpConfig: IHttpConfig;
|
||||||
protected webSockets: Map<string, WebSocket> = new Map<string, WebSocket>();
|
protected webSockets: Map<string, SPTWebSocket> = new Map<string, SPTWebSocket>();
|
||||||
protected defaultNotification: IWsNotificationEvent = { type: NotificationEventType.PING, eventId: "ping" };
|
protected defaultNotification: IWsNotificationEvent = { type: NotificationEventType.PING, eventId: "ping" };
|
||||||
|
|
||||||
protected websocketPingHandler: NodeJS.Timeout | undefined;
|
protected websocketPingHandler: NodeJS.Timeout | undefined;
|
||||||
@ -39,7 +40,7 @@ export class SptWebSocketConnectionHandler implements IWebSocketConnectionHandle
|
|||||||
return "/notifierServer/getwebsocket/";
|
return "/notifierServer/getwebsocket/";
|
||||||
}
|
}
|
||||||
|
|
||||||
public onConnection(ws: WebSocket, req: IncomingMessage): void {
|
public async onConnection(ws: SPTWebSocket, req: IncomingMessage): Promise<void> {
|
||||||
// Strip request and break it into sections
|
// Strip request and break it into sections
|
||||||
const splitUrl = req.url.substring(0, req.url.indexOf("?")).split("/");
|
const splitUrl = req.url.substring(0, req.url.indexOf("?")).split("/");
|
||||||
const sessionID = splitUrl.pop();
|
const sessionID = splitUrl.pop();
|
||||||
@ -54,18 +55,20 @@ export class SptWebSocketConnectionHandler implements IWebSocketConnectionHandle
|
|||||||
if (this.websocketPingHandler) {
|
if (this.websocketPingHandler) {
|
||||||
clearInterval(this.websocketPingHandler);
|
clearInterval(this.websocketPingHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ws.on("message", async (msg) => {
|
||||||
|
for (const wsmh of this.sptWebSocketMessageHandlers) {
|
||||||
|
await wsmh.onSptMessage(sessionID, this.webSockets.get(sessionID), msg);
|
||||||
|
}
|
||||||
|
|
||||||
ws.on("message", (msg) =>
|
this.logger.info(`WebSocketHandler "${wsmh.getSocketId()}" connected`);
|
||||||
this.sptWebSocketMessageHandlers.forEach((wsmh) =>
|
});
|
||||||
wsmh.onSptMessage(sessionID, this.webSockets.get(sessionID), msg),
|
|
||||||
),
|
|
||||||
);
|
|
||||||
|
|
||||||
this.websocketPingHandler = setInterval(() => {
|
this.websocketPingHandler = setInterval(async () => {
|
||||||
this.logger.debug(this.localisationService.getText("websocket-pinging_player", sessionID));
|
this.logger.debug(this.localisationService.getText("websocket-pinging_player", sessionID));
|
||||||
|
|
||||||
if (ws.readyState === WebSocket.OPEN) {
|
if (ws.readyState === WebSocket.OPEN) {
|
||||||
ws.send(this.jsonUtil.serialize(this.defaultNotification));
|
await ws.sendAsync(this.jsonUtil.serialize(this.defaultNotification));
|
||||||
} else {
|
} else {
|
||||||
this.logger.debug(this.localisationService.getText("websocket-socket_lost_deleting_handle"));
|
this.logger.debug(this.localisationService.getText("websocket-socket_lost_deleting_handle"));
|
||||||
clearInterval(this.websocketPingHandler);
|
clearInterval(this.websocketPingHandler);
|
||||||
@ -74,10 +77,12 @@ export class SptWebSocketConnectionHandler implements IWebSocketConnectionHandle
|
|||||||
}, this.httpConfig.webSocketPingDelayMs);
|
}, this.httpConfig.webSocketPingDelayMs);
|
||||||
}
|
}
|
||||||
|
|
||||||
public sendMessage(sessionID: string, output: IWsNotificationEvent): void {
|
public async sendMessageAsync(sessionID: string, output: IWsNotificationEvent): Promise<void> {
|
||||||
try {
|
try {
|
||||||
if (this.isConnectionWebSocket(sessionID)) {
|
if (this.isConnectionWebSocket(sessionID)) {
|
||||||
this.webSockets.get(sessionID).send(this.jsonUtil.serialize(output));
|
const ws = this.webSockets.get(sessionID);
|
||||||
|
|
||||||
|
await ws.sendAsync(this.jsonUtil.serialize(output));
|
||||||
this.logger.debug(this.localisationService.getText("websocket-message_sent"));
|
this.logger.debug(this.localisationService.getText("websocket-message_sent"));
|
||||||
} else {
|
} else {
|
||||||
this.logger.debug(this.localisationService.getText("websocket-not_ready_message_not_sent", sessionID));
|
this.logger.debug(this.localisationService.getText("websocket-not_ready_message_not_sent", sessionID));
|
||||||
|
@ -1,13 +1,14 @@
|
|||||||
import { ILogger } from "@spt/models/spt/utils/ILogger";
|
import type { ILogger } from "@spt/models/spt/utils/ILogger";
|
||||||
import { ISptWebSocketMessageHandler } from "@spt/servers/ws/message/ISptWebSocketMessageHandler";
|
import { ISptWebSocketMessageHandler } from "@spt/servers/ws/message/ISptWebSocketMessageHandler";
|
||||||
import { inject, injectable } from "tsyringe";
|
import { inject, injectable } from "tsyringe";
|
||||||
import { RawData, WebSocket } from "ws";
|
import { RawData } from "ws";
|
||||||
|
import { SPTWebSocket } from "../SPTWebsocket";
|
||||||
|
|
||||||
@injectable()
|
@injectable()
|
||||||
export class DefaultSptWebSocketMessageHandler implements ISptWebSocketMessageHandler {
|
export class DefaultSptWebSocketMessageHandler implements ISptWebSocketMessageHandler {
|
||||||
constructor(@inject("PrimaryLogger") protected logger: ILogger) {}
|
constructor(@inject("PrimaryLogger") protected logger: ILogger) {}
|
||||||
|
|
||||||
public onSptMessage(sessionId: string, client: WebSocket, message: RawData): void {
|
public async onSptMessage(sessionId: string, client: SPTWebSocket, message: RawData): Promise<void> {
|
||||||
this.logger.debug(`[${sessionId}] SPT message received: ${message}`);
|
this.logger.debug(`[${sessionId}] SPT message received: ${message}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import { RawData, WebSocket } from "ws";
|
import { RawData } from "ws";
|
||||||
|
import { SPTWebSocket } from "../SPTWebsocket";
|
||||||
|
|
||||||
export interface ISptWebSocketMessageHandler {
|
export interface ISptWebSocketMessageHandler {
|
||||||
onSptMessage(sessionID: string, client: WebSocket, message: RawData): void;
|
onSptMessage(sessionID: string, client: SPTWebSocket, message: RawData): Promise<void>;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user