fix: fs event emissions (#3153)

This commit is contained in:
Daniel Salazar
2026-05-25 14:25:27 -07:00
committed by GitHub
parent 8be115096f
commit 0ff0ea7743
9 changed files with 450 additions and 127 deletions
@@ -434,6 +434,119 @@ describe('FSController.completeBatchWrites', () => {
expect(finalized?.wasOverwrite).toBe(true);
});
it('emits updated events with GUI metadata when overwriting via batch completion', async () => {
const { actor, userId } = await makeUser();
const username = actor.user!.username!;
const target = `/${username}/Documents/overwrite-event.js`;
const firstStart = makeRes();
await withActor(actor, () =>
controller.startBatchWrites(
makeReq<SignedWriteRequest[]>({
body: [{ fileMetadata: { path: target, size: 1 } }],
actor,
}),
firstStart.res,
),
);
const [firstResponse] = firstStart.captured
.body as SignedWriteResponse[];
await withActor(actor, () =>
controller.completeBatchWrites(
makeReq<CompleteWriteRequest[]>({
body: [{ uploadId: firstResponse!.sessionId }],
actor,
}),
makeRes().res,
),
);
const targetEntry = await server.stores.fsEntry.getEntryByPath(target);
expect(targetEntry).not.toBeNull();
await server.stores.subdomain.create({
userId,
subdomain: `workers.puter.${username}-worker`,
rootDirId: targetEntry!.id,
});
const secondStart = makeRes();
await withActor(actor, () =>
controller.startBatchWrites(
makeReq<SignedWriteRequest[]>({
body: [
{
fileMetadata: {
path: target,
size: 2,
overwrite: true,
},
},
],
actor,
}),
secondStart.res,
),
);
const [secondResponse] = secondStart.captured
.body as SignedWriteResponse[];
const emitSpy = vi.spyOn(server.clients.event, 'emit');
let updatedCall:
| (typeof emitSpy.mock.calls)[number]
| undefined;
try {
await withActor(actor, () =>
controller.completeBatchWrites(
makeReq<CompleteWriteRequest[]>({
body: [
{
uploadId: secondResponse!.sessionId,
guiMetadata: {
operationId: 'op-123',
itemUploadId: 'item-456',
socketId: 'socket-789',
originalClientSocketId: 'socket-789',
},
},
],
actor,
}),
makeRes().res,
),
);
updatedCall = emitSpy.mock.calls.find(
([eventName]) => eventName === 'outer.gui.item.updated',
);
} finally {
emitSpy.mockRestore();
}
expect(updatedCall).toBeTruthy();
const payload = updatedCall?.[1] as {
user_id_list?: number[];
response?: Record<string, unknown>;
};
expect(payload.user_id_list).toEqual([userId]);
expect(payload.response).toMatchObject({
uid: expect.any(String),
uuid: expect.any(String),
id: expect.any(String),
path: target,
name: 'overwrite-event.js',
is_dir: false,
type: expect.stringMatching(/^application\/javascript/),
workers: [
expect.objectContaining({
subdomain: `workers.puter.${username}-worker`,
address: expect.stringContaining(`${username}-worker`),
}),
],
from_new_service: true,
operation_id: 'op-123',
item_upload_id: 'item-456',
socket_id: 'socket-789',
original_client_socket_id: 'socket-789',
});
});
it("rejects another user's session ids with a 4xx", async () => {
const a = await makeUser();
const b = await makeUser();
+12 -58
View File
@@ -59,6 +59,7 @@ import type {
ThumbnailUploadPrepareItem,
ThumbnailUploadPreparePayload,
} from './types.js';
import { toLegacyEntry } from './legacyFsHelpers.js';
class UploadProgressTracker implements UploadProgressTrackerLike {
total = 0;
progress = 0;
@@ -423,7 +424,8 @@ export class FSController extends PuterController {
busboy.on('field', (fieldName, value, info) => {
if (
info.fieldnameTruncated ||
(info as unknown as { filenameTruncated: string })
.filenameTruncated ||
info.nameTruncated ||
info.valueTruncated
) {
@@ -2022,56 +2024,7 @@ export class FSController extends PuterController {
}
async #toGuiFsEntry(entry: FSEntry): Promise<Record<string, unknown>> {
const dirpath = pathPosix.dirname(entry.path);
const extension = pathPosix.extname(entry.name).slice(1).toLowerCase();
const response = {
id: entry.uuid,
uid: entry.uuid,
uuid: entry.uuid,
parent_id: entry.parentUid,
parent_uid: entry.parentUid,
path: entry.path,
dirname: dirpath,
dirpath,
name: entry.name,
is_dir: entry.isDir,
is_shortcut: entry.isShortcut ? 1 : 0,
shortcut_to: entry.shortcutTo,
type: entry.isDir ? 'folder' : extension,
writable: true,
is_public: entry.isPublic,
thumbnail: entry.thumbnail,
immutable: entry.immutable,
metadata: entry.metadata,
modified: entry.modified,
created: entry.created,
accessed: entry.accessed,
size: entry.size,
};
if (
typeof response.thumbnail === 'string' &&
response.thumbnail.length > 0
) {
const thumbnailEntry = {
uuid: entry.uuid,
thumbnail: response.thumbnail,
};
// emitAndWait — listener rewrites s3:// / legacy URLs into
// time-limited signed URLs on the payload object.
await this.clients.event.emitAndWait(
'thumbnail.read',
thumbnailEntry,
{},
);
response.thumbnail =
typeof thumbnailEntry.thumbnail === 'string' &&
thumbnailEntry.thumbnail.length > 0
? thumbnailEntry.thumbnail
: null;
}
return response;
return toLegacyEntry(this.clients.event, entry);
}
async #emitGuiWriteEvent(
@@ -2081,7 +2034,7 @@ export class FSController extends PuterController {
): Promise<void> {
const response = {
...(await this.#toGuiFsEntry(fsEntry)),
...this.#toEventGuiMetadata(guiMetadata, false),
...this.#toEventGuiMetadata(guiMetadata),
from_new_service: true,
};
await this.clients.event.emit(
@@ -2239,7 +2192,7 @@ export class FSController extends PuterController {
return null;
}
return { index, contentType, size };
return { index, contentType, size } as ThumbnailUploadPrepareItem;
}
async #attachSignedThumbnailUploadTargets(
@@ -2259,11 +2212,12 @@ export class FSController extends PuterController {
const payload: ThumbnailUploadPreparePayload = {
items: prepareItems.map(
(item): ThumbnailUploadPrepareItem => ({
index: item.index,
contentType: item.contentType,
...(item.size !== undefined ? { size: item.size } : {}),
}),
(item): ThumbnailUploadPrepareItem =>
({
index: item.index,
contentType: item.contentType,
...(item.size !== undefined ? { size: item.size } : {}),
}) as ThumbnailUploadPrepareItem,
),
};
// emitAndWait — listeners populate `uploadUrl` / `thumbnailUrl` on
+1
View File
@@ -52,6 +52,7 @@ export interface ThumbnailUploadPrepareItem {
size?: number;
uploadUrl?: string;
thumbnailUrl?: string;
item_uid: string;
}
export interface ThumbnailUploadPreparePayload {
@@ -1,10 +1,13 @@
// This suite tests basic features of puter webdav. it is not a comprehensive webdav test suite unlike litmus
// but rather it performs some common sense checks to ensure that WebDAV support isn't irrevocably broken in puter
import type { Request, Response } from 'express';
import { Readable } from 'node:stream';
import { afterAll, beforeAll, describe, expect, it, vi } from 'vitest';
import { v4 as uuidv4 } from 'uuid';
import { PuterRouter } from '../../core/http/PuterRouter.js';
import { PuterServer } from '../../server.js';
import { setupTestServer } from '../../testUtil.js';
import { generateDefaultFsentries } from '../../util/userProvisioning.js';
import type { WebDAVController } from './WebDAVController.js';
let server: PuterServer;
@@ -101,6 +104,35 @@ const basicAuth = (user: string, pass: string) =>
const noop = vi.fn();
const makeUser = async () => {
const username = `webdav-${Math.random().toString(36).slice(2, 10)}`;
const created = await server.stores.user.create({
username,
uuid: uuidv4(),
password: null,
email: `${username}@test.local`,
free_storage: 100 * 1024 * 1024,
requires_email_confirmation: false,
});
await generateDefaultFsentries(
server.clients.db,
server.stores.user,
created,
);
const refreshed = (await server.stores.user.getById(created.id))!;
return {
userId: refreshed.id,
username: refreshed.username,
actor: {
user: {
id: refreshed.id,
uuid: refreshed.uuid,
username: refreshed.username,
},
},
};
};
describe('WebDAVController', () => {
describe('route registration', () => {
it('registers a single catch-all use() route', () => {
@@ -396,6 +428,53 @@ describe('WebDAVController', () => {
);
expect(captured.statusCode).toBe(422);
});
it('emits GUI-safe item events without leaking numeric fsentry ids', async () => {
const { actor, userId, username } = await makeUser();
const target = `/${username}/Documents/webdav-event.txt`;
const { res, captured } = makeRes();
const req = Object.assign(
Readable.from(['hello']),
makeReq({
method: 'PUT',
path: target,
headers: { 'content-length': '5' },
actor,
}),
) as Request;
const emitSpy = vi.spyOn(server.clients.event, 'emit');
let addedCall:
| (typeof emitSpy.mock.calls)[number]
| undefined;
try {
await dispatchMiddleware(req, res, noop);
await new Promise((resolve) => setTimeout(resolve, 0));
addedCall = emitSpy.mock.calls.find(
([eventName]) => eventName === 'outer.gui.item.added',
);
} finally {
emitSpy.mockRestore();
}
expect(captured.statusCode).toBe(201);
expect(addedCall).toBeTruthy();
const payload = addedCall?.[1] as {
user_id_list?: number[];
response?: Record<string, unknown>;
};
expect(payload.user_id_list).toEqual([userId]);
expect(payload.response).toMatchObject({
id: expect.any(String),
uid: expect.any(String),
uuid: expect.any(String),
path: target,
from_new_service: true,
});
expect(typeof payload.response?.id).toBe('string');
expect(payload.response?.id).toBe(payload.response?.uuid);
expect(payload.response).not.toHaveProperty('userId');
});
});
describe('DELETE', () => {
@@ -27,6 +27,7 @@ import type { PuterRouter } from '../../core/http/PuterRouter.js';
import { verify as verifyOtp } from '../../services/auth/OTPUtil.js';
import { expandTildePath } from '../../services/fs/resolveNode.js';
import type { FSEntry } from '../../stores/fs/FSEntry.js';
import { toLegacyEntry } from '../fs/legacyFsHelpers.js';
import { PuterController } from '../types.js';
import {
createLock,
@@ -772,19 +773,23 @@ export class WebDAVController extends PuterController {
entry: FSEntry,
extra?: Record<string, unknown>,
): void {
const payload = {
user_id_list: [entry.userId],
response: { ...entry, ...extra, from_new_service: true },
};
const meta = {};
void Promise.resolve()
.then(() =>
.then(async () => {
const response = {
...(await toLegacyEntry(this.clients.event, entry)),
...extra,
from_new_service: true,
};
this.clients.event.emit(
eventName,
payload as unknown as EventMap[T],
{
user_id_list: [entry.userId],
response,
} as unknown as EventMap[T],
meta,
),
)
);
})
.catch(() => {
// non-critical
});
+171 -57
View File
@@ -25,7 +25,8 @@ import { PuterDriver } from '../types.js';
import { loadFileInput } from '../util/fileInput.js';
import type { Actor } from '../../core/actor.js';
import path from 'node:path';
import { EventMetadata } from '../../clients/event/types.js';
import type { EventMetadata } from '../../clients/event/types.js';
import type { FSEntry } from '../../stores/fs/FSEntry.js';
const CF_BASE_URL = 'https://api.cloudflare.com/client/v4/accounts';
const WORKER_NAME_REGEX = /^[a-zA-Z0-9_-]+$/;
@@ -497,38 +498,49 @@ export class WorkerDriver extends PuterDriver {
);
}
// ── Hot-reload: auto-redeploy on file write ─────────────────────
// ── Hot-reload: auto-redeploy on source file write ──────────────
//
// When a user saves a JS file that's tied to a worker subdomain,
// we redeploy it to Cloudflare automatically. This is what makes
// "save file → live in prod" instant.
//
// The FS layer emits `outer.gui.item.added` and
// `outer.gui.item.updated` after a write commits. We subscribe to
// those — the payload carries `{ user_id_list, response }` where
// `response` is the entry shape (uuid, path, user_id, etc.). We
// match against worker subdomain `root_dir_id` to decide whether
// to re-deploy.
// This listens to backend FS lifecycle events rather than `outer.gui.*`
// socket events. GUI events intentionally expose public UUID-shaped ids,
// while worker subdomains are keyed to the numeric fsentries.id.
#subscribeHotReload(): void {
if (!this.#cfBaseUrl) return; // CF not configured — skip
for (const eventName of [
'outer.gui.item.added',
'outer.gui.item.updated',
] as const) {
this.clients.event.on(
eventName,
(_key: string, data: unknown, meta: EventMetadata) => {
void this.#handleFileWrite(data, meta).catch((err) => {
console.error('[workers] hot-reload error', err);
});
},
);
}
this.clients.event.on(
'fs.write.file',
(_key: string, data: unknown, meta: EventMetadata) => {
void this.#handleSourceWrite(data, meta).catch((err) => {
console.error('[workers] hot-reload error', err);
});
},
);
this.clients.event.on(
'fs.remove.node',
(_key: string, data: unknown, meta: EventMetadata) => {
void this.#handleSourceRemove(data, meta).catch((err) => {
console.error('[workers] source remove error', err);
});
},
);
this.clients.event.on(
'fs.move.node',
(_key: string, data: unknown, meta: EventMetadata) => {
void this.#handleSourceMove(data, meta).catch((err) => {
console.error('[workers] source move error', err);
});
},
);
}
async #handleFileWrite(data: unknown, meta: EventMetadata): Promise<void> {
async #handleSourceWrite(
data: unknown,
meta: EventMetadata,
): Promise<void> {
const metaObj =
meta && typeof meta === 'object'
? (meta as Record<string, unknown>)
@@ -536,37 +548,10 @@ export class WorkerDriver extends PuterDriver {
// Only run on the local node — incoming broadcast writes shouldn't trigger a re-deploy
if (metaObj.from_outside) return;
const d = data as Record<string, unknown> | undefined;
if (!d) return;
// `outer.gui.item.*` events carry `{ user_id_list, response }`
// where `response` is the FS entry shape. Extract what we need.
const response = (d.response ?? d) as Record<string, unknown>;
const userIdList = d.user_id_list as Array<number | string> | undefined;
const uuid = (response.uuid ?? response.uid) as string | undefined;
const userId = (userIdList?.[0] ?? response.user_id) as
| number
| undefined;
const path = response.path as string | undefined;
// Only files trigger hot-reload (not directories)
if (response.is_dir || response.isDir) return;
if (!uuid || !userId) return;
// Check if any worker subdomain points at this file
const workerSubs = await this.stores.subdomain.listByUserIdAndPrefix(
userId,
WORKER_SUBDOMAIN_PREFIX,
);
const matched = workerSubs.filter((r: Record<string, unknown>) => {
// root_dir_id can be the FS entry id or uuid depending on how it was stored
return (
String(r.root_dir_id) === String(uuid) ||
String(r.root_dir_id) === String(response.id)
);
});
const entry = this.#extractFsEntryFromEvent(data);
if (!entry || entry.isDir) return;
const matched = await this.#listWorkerRowsForEntry(entry);
if (matched.length === 0) return;
for (const row of matched) {
@@ -577,7 +562,7 @@ export class WorkerDriver extends PuterDriver {
);
try {
const ownerUser = await this.stores.user.getById(userId);
const ownerUser = await this.stores.user.getById(entry.userId);
if (!ownerUser) continue;
const ownerActor = { user: ownerUser } as Actor;
@@ -591,7 +576,7 @@ export class WorkerDriver extends PuterDriver {
},
this.services.fs,
ownerActor,
path ?? uuid, // prefer path, fall back to uuid
entry.path ?? entry.uuid, // prefer path, fall back to uuid
{ maxBytes: MAX_SOURCE_SIZE },
);
const sourceCode = loaded.buffer.toString('utf-8');
@@ -625,18 +610,18 @@ export class WorkerDriver extends PuterDriver {
await this.stores.subdomain.update(
String(row.uuid),
{ preamble_version: preambleVersion },
{ userId },
{ userId: entry.userId },
);
}
// Notify the user
await this.#notifyUser(userId, workerName, cfResult);
await this.#notifyUser(entry.userId, workerName, cfResult);
} catch (err) {
console.warn(
`[workers] hot-reload deploy failed for ${workerName}`,
err,
);
await this.#notifyUser(userId, workerName, {
await this.#notifyUser(entry.userId, workerName, {
success: false,
errors: [String(err)],
});
@@ -644,6 +629,135 @@ export class WorkerDriver extends PuterDriver {
}
}
async #handleSourceRemove(
data: unknown,
meta: EventMetadata,
): Promise<void> {
const metaObj =
meta && typeof meta === 'object'
? (meta as Record<string, unknown>)
: {};
if (metaObj.from_outside) return;
const entry = this.#extractFsEntryFromEvent(data);
if (!entry || entry.isDir) return;
const matched = await this.#listWorkerRowsForEntry(entry);
for (const row of matched) {
await this.#deleteWorkerForSourceRow(row, entry.userId);
}
}
async #handleSourceMove(data: unknown, meta: EventMetadata): Promise<void> {
const metaObj =
meta && typeof meta === 'object'
? (meta as Record<string, unknown>)
: {};
if (metaObj.from_outside) return;
const entry = this.#extractFsEntryFromEvent(data);
if (!entry || !this.#isTrashPath(entry.path)) return;
const matched = entry.isDir
? await this.#listWorkerRowsUnderPath(entry.userId, entry.path)
: await this.#listWorkerRowsForEntry(entry);
for (const row of matched) {
await this.#deleteWorkerForSourceRow(row, entry.userId);
}
}
#extractFsEntryFromEvent(data: unknown): FSEntry | undefined {
if (!data || typeof data !== 'object') return undefined;
const event = data as Record<string, unknown>;
for (const key of ['node', 'entry', 'target']) {
const value = event[key];
if (this.#isFsEntry(value)) {
return value;
}
}
return undefined;
}
#isFsEntry(value: unknown): value is FSEntry {
if (!value || typeof value !== 'object') return false;
const entry = value as Partial<FSEntry>;
return (
typeof entry.id === 'number' &&
typeof entry.uuid === 'string' &&
typeof entry.userId === 'number' &&
typeof entry.path === 'string' &&
typeof entry.isDir === 'boolean'
);
}
async #listWorkerRowsForEntry(
entry: FSEntry,
): Promise<Array<Record<string, unknown>>> {
const workerSubs = await this.stores.subdomain.listByUserIdAndPrefix(
entry.userId,
WORKER_SUBDOMAIN_PREFIX,
);
return workerSubs.filter((r: Record<string, unknown>) => {
return (
String(r.root_dir_id) === String(entry.id) ||
String(r.root_dir_id) === String(entry.uuid) ||
String(r.root_dir_id) === String(entry.uid)
);
});
}
async #listWorkerRowsUnderPath(
userId: number,
parentPath: string,
): Promise<Array<Record<string, unknown>>> {
const workerSubs = await this.stores.subdomain.listByUserIdAndPrefix(
userId,
WORKER_SUBDOMAIN_PREFIX,
);
const rootDirIds = workerSubs
.map((r: Record<string, unknown>) => r.root_dir_id)
.filter((id): id is number => typeof id === 'number');
const entriesById =
await this.stores.fsEntry.getEntriesByIds(rootDirIds);
return workerSubs.filter((row: Record<string, unknown>) => {
const rootDirId = row.root_dir_id;
if (typeof rootDirId !== 'number') return false;
const entry = entriesById.get(rootDirId);
return (
entry?.path === parentPath ||
entry?.path.startsWith(`${parentPath}/`)
);
});
}
#isTrashPath(entryPath: string): boolean {
const parts = entryPath.split('/').filter(Boolean);
return parts[1] === 'Trash';
}
async #deleteWorkerForSourceRow(
row: Record<string, unknown>,
userId: number,
): Promise<void> {
const workerFullName = String(row.subdomain ?? '');
if (!workerFullName.startsWith(WORKER_SUBDOMAIN_PREFIX)) return;
const workerName = workerFullName.slice(WORKER_SUBDOMAIN_PREFIX.length);
try {
await this.#cfDelete(workerName);
if (row.uuid) {
await this.stores.subdomain.deleteByUuid(String(row.uuid), {
userId,
});
}
} catch (err) {
console.warn(
`[workers] source cleanup failed for ${workerName}`,
err,
);
}
}
async #notifyUser(
userId: number,
workerName: string,
+3
View File
@@ -3033,6 +3033,9 @@ export class FSService extends PuterService {
await this.#removeDescendantsStorage(descendants);
if (descendants.length > 0) {
await this.stores.fsEntry.deleteEntries(descendants);
for (const descendant of descendants) {
this.#emitRemoveEvent(descendant);
}
}
if (!input.descendantsOnly) {
@@ -0,0 +1,54 @@
/**
* Copyright (C) 2024-present Puter Technologies Inc.
*
* This file is part of Puter.
*
* Puter is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
import { describe, expect, it, vi } from 'vitest';
import { EventClient } from '../../clients/event/EventClient.js';
import type { FSEntryStore } from '../../stores/fs/FSEntryStore.js';
import { FSEntryCacheInvalidationEventHandler } from './cacheInvalidation.js';
describe('FSEntryCacheInvalidationEventHandler', () => {
it('reads exact outer GUI event payloads from the EventClient data argument', async () => {
const eventClient = new EventClient({} as never);
const fsEntryStore = {
invalidateEntryCacheByPathForUser: vi.fn(async () => undefined),
invalidateEntryCacheByUuid: vi.fn(async () => undefined),
} as unknown as FSEntryStore;
new FSEntryCacheInvalidationEventHandler(fsEntryStore, eventClient);
await eventClient.emitAndWait(
'outer.gui.item.updated',
{
user_id_list: [123],
response: {
path: '/alice/Documents/file.txt',
uuid: 'entry-uuid',
},
},
{},
);
expect(
fsEntryStore.invalidateEntryCacheByPathForUser,
).toHaveBeenCalledWith(123, '/alice/Documents/file.txt');
expect(fsEntryStore.invalidateEntryCacheByUuid).toHaveBeenCalledWith(
'entry-uuid',
);
});
});
+4 -4
View File
@@ -38,7 +38,7 @@ export class FSEntryCacheInvalidationEventHandler {
#registerHandlers(): void {
this.#eventClient.on(
'outer.gui.item.added',
async (event: OuterGuiItemEventPayload) => {
async (_key, event: OuterGuiItemEventPayload) => {
await this.#runSafely(
() => this.#handleOuterGuiItemEvent(event),
'outer.gui.item.added',
@@ -47,7 +47,7 @@ export class FSEntryCacheInvalidationEventHandler {
);
this.#eventClient.on(
'outer.gui.item.updated',
async (event: OuterGuiItemEventPayload) => {
async (_key, event: OuterGuiItemEventPayload) => {
await this.#runSafely(
() => this.#handleOuterGuiItemEvent(event),
'outer.gui.item.updated',
@@ -56,7 +56,7 @@ export class FSEntryCacheInvalidationEventHandler {
);
this.#eventClient.on(
'outer.gui.item.moved',
async (event: OuterGuiItemEventPayload) => {
async (_key, event: OuterGuiItemEventPayload) => {
await this.#runSafely(
() => this.#handleOuterGuiItemEvent(event),
'outer.gui.item.moved',
@@ -65,7 +65,7 @@ export class FSEntryCacheInvalidationEventHandler {
);
this.#eventClient.on(
'fs.remove.node',
async (event: FsRemoveNodeEventPayload) => {
async (_key, event: FsRemoveNodeEventPayload) => {
await this.#runSafely(
() => this.#handleRemoveNodeEvent(event),
'fs.remove.node',