Skip to content

Commit

Permalink
feat(adb): change how to close a socket
Browse files Browse the repository at this point in the history
  • Loading branch information
yume-chan committed Oct 16, 2023
1 parent 1aa7a92 commit e45fb2e
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 131 deletions.
5 changes: 5 additions & 0 deletions libraries/adb/src/commands/sync/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ export class AdbSyncSocketLocked implements AsyncExactReadable {
this.#combiner.flush();
this.#socketLock.notifyOne();
}

async close() {
await this.#readable.cancel();
}
}

export class AdbSyncSocket {
Expand All @@ -94,6 +98,7 @@ export class AdbSyncSocket {
}

async close() {
await this.#locked.close();
await this.#socket.close();
}
}
125 changes: 70 additions & 55 deletions libraries/adb/src/daemon/dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { AsyncOperationManager, PromiseResolver } from "@yume-chan/async";
import {
AsyncOperationManager,
PromiseResolver,
delay,
} from "@yume-chan/async";
import type {
Consumable,
ReadableWritablePair,
Expand All @@ -12,7 +16,7 @@ import {
import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct";

import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js";
import { decodeUtf8, encodeUtf8, unreachable } from "../utils/index.js";
import { decodeUtf8, encodeUtf8 } from "../utils/index.js";

import type { AdbPacketData, AdbPacketInit } from "./packet.js";
import { AdbCommand, calculateChecksum } from "./packet.js";
Expand Down Expand Up @@ -80,18 +84,18 @@ export class AdbPacketDispatcher implements Closeable {
new WritableStream({
write: async (packet) => {
switch (packet.command) {
case AdbCommand.OK:
this.#handleOk(packet);
break;
case AdbCommand.Close:
await this.#handleClose(packet);
break;
case AdbCommand.Write:
await this.#handleWrite(packet);
case AdbCommand.Okay:
this.#handleOkay(packet);
break;
case AdbCommand.Open:
await this.#handleOpen(packet);
break;
case AdbCommand.Write:
await this.#handleWrite(packet);
break;
default:
// Junk data may only appear in the authentication phase,
// since the dispatcher only works after authentication,
Expand Down Expand Up @@ -125,24 +129,6 @@ export class AdbPacketDispatcher implements Closeable {
this.#writer = connection.writable.getWriter();
}

#handleOk(packet: AdbPacketData) {
if (this.#initializers.resolve(packet.arg1, packet.arg0)) {
// Device successfully created the socket
return;
}

const socket = this.#sockets.get(packet.arg1);
if (socket) {
// Device has received last `WRTE` to the socket
socket.ack();
return;
}

// Maybe the device is responding to a packet of last connection
// Tell the device to close the socket
void this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0);
}

async #handleClose(packet: AdbPacketData) {
// If the socket is still pending
if (
Expand Down Expand Up @@ -170,15 +156,8 @@ export class AdbPacketDispatcher implements Closeable {
// Ignore `arg0` and search for the socket
const socket = this.#sockets.get(packet.arg1);
if (socket) {
// The device want to close the socket
if (!socket.closed) {
await this.sendPacket(
AdbCommand.Close,
packet.arg1,
packet.arg0,
);
}
await socket.dispose();
await socket.close();
socket.dispose();
this.#sockets.delete(packet.arg1);
return;
}
Expand All @@ -188,27 +167,22 @@ export class AdbPacketDispatcher implements Closeable {
// the device may also respond with two `CLSE` packets.
}

async #handleWrite(packet: AdbPacketData) {
const socket = this.#sockets.get(packet.arg1);
if (!socket) {
throw new Error(`Unknown local socket id: ${packet.arg1}`);
#handleOkay(packet: AdbPacketData) {
if (this.#initializers.resolve(packet.arg1, packet.arg0)) {
// Device successfully created the socket
return;
}

await socket.enqueue(packet.payload);
await this.sendPacket(AdbCommand.OK, packet.arg1, packet.arg0);
return;
}

addReverseTunnel(service: string, handler: AdbIncomingSocketHandler) {
this.#incomingSocketHandlers.set(service, handler);
}

removeReverseTunnel(address: string) {
this.#incomingSocketHandlers.delete(address);
}
const socket = this.#sockets.get(packet.arg1);
if (socket) {
// Device has received last `WRTE` to the socket
socket.ack();
return;
}

clearReverseTunnels() {
this.#incomingSocketHandlers.clear();
// Maybe the device is responding to a packet of last connection
// Tell the device to close the socket
void this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0);
}

async #handleOpen(packet: AdbPacketData) {
Expand Down Expand Up @@ -240,12 +214,41 @@ export class AdbPacketDispatcher implements Closeable {
try {
await handler(controller.socket);
this.#sockets.set(localId, controller);
await this.sendPacket(AdbCommand.OK, localId, remoteId);
await this.sendPacket(AdbCommand.Okay, localId, remoteId);
} catch (e) {
await this.sendPacket(AdbCommand.Close, 0, remoteId);
}
}

async #handleWrite(packet: AdbPacketData) {
const socket = this.#sockets.get(packet.arg1);
if (!socket) {
throw new Error(`Unknown local socket id: ${packet.arg1}`);
}

let handled = false;
await Promise.race([
delay(5000).then(() => {
if (!handled) {
throw new Error(
`packet for \`${socket.service}\` not handled in 5 seconds`,
);
}
}),
(async () => {
await socket.enqueue(packet.payload);
await this.sendPacket(
AdbCommand.Okay,
packet.arg1,
packet.arg0,
);
handled = true;
})(),
]);

return;
}

async createSocket(service: string): Promise<AdbSocket> {
if (this.options.appendNullToServiceString) {
service += "\0";
Expand All @@ -268,6 +271,18 @@ export class AdbPacketDispatcher implements Closeable {
return controller.socket;
}

addReverseTunnel(service: string, handler: AdbIncomingSocketHandler) {
this.#incomingSocketHandlers.set(service, handler);
}

removeReverseTunnel(address: string) {
this.#incomingSocketHandlers.delete(address);
}

clearReverseTunnels() {
this.#incomingSocketHandlers.clear();
}

async sendPacket(
command: AdbCommand,
arg0: number,
Expand Down Expand Up @@ -306,7 +321,7 @@ export class AdbPacketDispatcher implements Closeable {
this.#closed = true;

this.#readAbortController.abort();
if (this.options.preserveConnection ?? false) {
if (this.options.preserveConnection) {
this.#writer.releaseLock();
} else {
await this.#writer.close();
Expand All @@ -317,7 +332,7 @@ export class AdbPacketDispatcher implements Closeable {

#dispose() {
for (const socket of this.#sockets.values()) {
socket.dispose().catch(unreachable);
socket.dispose();
}

this.#disconnected.resolve();
Expand Down
2 changes: 1 addition & 1 deletion libraries/adb/src/daemon/packet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ export enum AdbCommand {
Auth = 0x48545541, // 'AUTH'
Close = 0x45534c43, // 'CLSE'
Connect = 0x4e584e43, // 'CNXN'
OK = 0x59414b4f, // 'OKAY'
Okay = 0x59414b4f, // 'OKAY'
Open = 0x4e45504f, // 'OPEN'
Write = 0x45545257, // 'WRTE'
}
Expand Down
Loading

0 comments on commit e45fb2e

Please sign in to comment.