-
Notifications
You must be signed in to change notification settings - Fork 4k
fix(websocket): buffer messages when no listener is attached #26561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Fixes #26560 WebSocket messages sent immediately after handshake were lost when the `onmessage` handler was not set at the time of message arrival. Browsers queue these messages until a handler is attached, but Bun was discarding them. Changes: - Add message queue to WebSocket class for buffering messages - When a message arrives and no listener is attached, queue it instead of dispatching to nothing - When a message listener is first added (via onmessage or addEventListener), flush all queued messages - Support both text and binary message buffering Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
Updated 8:46 AM PT - Jan 29th, 2026
❌ @autofix-ci[bot], your commit d799664 has 5 failures in
🧪 To try this PR locally: bunx bun-pr 26561That installs a local version of the PR into your bun-26561 --bun |
WalkthroughIntroduces message buffering for WebSocket to queue incoming messages when no listener is attached, storing them in a pending queue and replaying when a listener is later added. Includes implementation changes and comprehensive regression test coverage for the buffering behavior. Changes
Suggested reviewers
🚥 Pre-merge checks | ✅ 4✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/bun.js/bindings/webcore/WebSocket.cpp`:
- Around line 1269-1280: Guard m_pendingMessages against unbounded growth by
enforcing a max queue size when appending in the branch that checks
m_hasMessageEventListener: before m_pendingMessages.append(QueuedTextMessage {
WTF::move(message) }) check the queue length and either drop oldest or newest
message (or signal backpressure/close) to prevent unbounded memory use; also
ensure the socket close/error paths clear m_pendingMessages to free memory
(update close/error handlers that reference m_pendingMessages to call
clear/reset). Use the symbols m_pendingMessages, m_hasMessageEventListener,
QueuedTextMessage, dispatchEvent, and the existing close/error handlers to
locate where to add the size check and the clearing logic.
In `@test/regression/issue/26560.test.ts`:
- Around line 33-35: Replace the fixed Bun.sleep delays (calls to Bun.sleep)
with condition-based synchronization: create a promise that the test resolves
when the server actually sends/emits the expected message (e.g., resolve inside
the server send/callback) and await that promise before attaching handlers;
after resolving, await a microtask/tick helper (e.g., await queueMicrotask via a
next-tick helper) to ensure ordering, then attach the handler. Do this for the
Bun.sleep occurrences (the initial attach-wait, plus the other places mentioned)
so tests wait on the server-send condition rather than an arbitrary timeout.
- Around line 9-25: The tests currently create servers with const server =
Bun.serve({...}) and rely on manual try/finally cleanup; replace those with the
async-disposable pattern by declaring the server using await using server =
Bun.serve({...}) in each of the five test cases so Bun will be automatically
disposed at scope exit; update each occurrence of the server variable (the
Bun.serve call and any references to server.upgrade or server) to use the new
await using declaration and remove the corresponding manual cleanup/try/finally
blocks.
| if (m_hasMessageEventListener) { | ||
| // Dispatch immediately if we have a listener | ||
| this->incPendingActivityCount(); | ||
| dispatchEvent(MessageEvent::create(WTF::move(message), m_url.string())); | ||
| this->decPendingActivityCount(); | ||
| return; | ||
| } | ||
|
|
||
| if (auto* context = scriptExecutionContext()) { | ||
| this->incPendingActivityCount(); | ||
| context->postTask([this, message_ = WTF::move(message), protectedThis = Ref { *this }](ScriptExecutionContext& context) { | ||
| ASSERT(scriptExecutionContext()); | ||
| protectedThis->dispatchEvent(MessageEvent::create(message_, protectedThis->m_url.string())); | ||
| protectedThis->decPendingActivityCount(); | ||
| }); | ||
| } | ||
| // Queue the message to be delivered when a listener is attached | ||
| // This mimics browser behavior where messages are buffered until onmessage is set | ||
| m_pendingMessages.append(QueuedTextMessage { WTF::move(message) }); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against unbounded growth of m_pendingMessages.
Messages are only dequeued when a message listener is attached. If a consumer never registers one (or defers indefinitely), incoming frames accumulate for the lifetime of the socket, which can blow up memory on chatty connections. Consider draining without listeners, clearing on close/error, or applying a max queue size/backpressure.
🤖 Prompt for AI Agents
In `@src/bun.js/bindings/webcore/WebSocket.cpp` around lines 1269 - 1280, Guard
m_pendingMessages against unbounded growth by enforcing a max queue size when
appending in the branch that checks m_hasMessageEventListener: before
m_pendingMessages.append(QueuedTextMessage { WTF::move(message) }) check the
queue length and either drop oldest or newest message (or signal
backpressure/close) to prevent unbounded memory use; also ensure the socket
close/error paths clear m_pendingMessages to free memory (update close/error
handlers that reference m_pendingMessages to call clear/reset). Use the symbols
m_pendingMessages, m_hasMessageEventListener, QueuedTextMessage, dispatchEvent,
and the existing close/error handlers to locate where to add the size check and
the clearing logic.
| const server = Bun.serve({ | ||
| port: 0, | ||
| fetch(req, server) { | ||
| if (server.upgrade(req)) return; | ||
| return new Response("Not found", { status: 404 }); | ||
| }, | ||
| websocket: { | ||
| open(ws) { | ||
| // Send messages immediately when the connection opens | ||
| ws.send("message1"); | ||
| ws.send("message2"); | ||
| ws.send("message3"); | ||
| }, | ||
| message() {}, | ||
| close() {}, | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
🏁 Script executed:
cat -n test/regression/issue/26560.test.tsRepository: oven-sh/bun
Length of output: 8398
🏁 Script executed:
# Search for Bun.serve type definitions and dispose pattern
rg -A 5 "Symbol\.(async)?Dispose" --type ts | head -50Repository: oven-sh/bun
Length of output: 3459
🏁 Script executed:
# Check if there are existing uses of using with Bun.serve in the test directory
rg "using\s+(.*?)\s*=\s*Bun\.serve" test/ --type tsRepository: oven-sh/bun
Length of output: 20251
🏁 Script executed:
# Look for existing try/finally patterns with server.stop() to see what's standard
rg -B 2 "server\.stop\(\)" test/ --type ts -c | head -20Repository: oven-sh/bun
Length of output: 964
🏁 Script executed:
head -30 test/regression/issue/14338.test.tsRepository: oven-sh/bun
Length of output: 1031
Use await using for Bun.serve cleanup.
Coding guidelines require using the async dispose pattern for Bun.serve. Replace the try/finally blocks with await using server = Bun.serve({...}) to eliminate boilerplate and maintain consistency across tests. This applies to all five test cases in this file.
♻️ Suggested pattern
- const server = Bun.serve({
+ await using server = Bun.serve({📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const server = Bun.serve({ | |
| port: 0, | |
| fetch(req, server) { | |
| if (server.upgrade(req)) return; | |
| return new Response("Not found", { status: 404 }); | |
| }, | |
| websocket: { | |
| open(ws) { | |
| // Send messages immediately when the connection opens | |
| ws.send("message1"); | |
| ws.send("message2"); | |
| ws.send("message3"); | |
| }, | |
| message() {}, | |
| close() {}, | |
| }, | |
| }); | |
| await using server = Bun.serve({ | |
| port: 0, | |
| fetch(req, server) { | |
| if (server.upgrade(req)) return; | |
| return new Response("Not found", { status: 404 }); | |
| }, | |
| websocket: { | |
| open(ws) { | |
| // Send messages immediately when the connection opens | |
| ws.send("message1"); | |
| ws.send("message2"); | |
| ws.send("message3"); | |
| }, | |
| message() {}, | |
| close() {}, | |
| }, | |
| }); |
🤖 Prompt for AI Agents
In `@test/regression/issue/26560.test.ts` around lines 9 - 25, The tests currently
create servers with const server = Bun.serve({...}) and rely on manual
try/finally cleanup; replace those with the async-disposable pattern by
declaring the server using await using server = Bun.serve({...}) in each of the
five test cases so Bun will be automatically disposed at scope exit; update each
occurrence of the server variable (the Bun.serve call and any references to
server.upgrade or server) to use the new await using declaration and remove the
corresponding manual cleanup/try/finally blocks.
| // Wait a bit before attaching the handler to ensure messages arrive first | ||
| await Bun.sleep(50); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replace fixed Bun.sleep delays with condition-based synchronization.
The fixed sleeps (50ms/10ms) make ordering dependent on timing and can be flaky under load. Prefer a deterministic barrier (e.g., resolve a promise from the server after it sends, then await a next-tick helper) so the test waits on conditions rather than elapsed time.
As per coding guidelines, "Do not use setTimeout in tests; instead await the condition to be met. Tests validate CONDITIONS, not TIME PASSING."
Also applies to: 82-84, 131-133, 237-248
🤖 Prompt for AI Agents
In `@test/regression/issue/26560.test.ts` around lines 33 - 35, Replace the fixed
Bun.sleep delays (calls to Bun.sleep) with condition-based synchronization:
create a promise that the test resolves when the server actually sends/emits the
expected message (e.g., resolve inside the server send/callback) and await that
promise before attaching handlers; after resolving, await a microtask/tick
helper (e.g., await queueMicrotask via a next-tick helper) to ensure ordering,
then attach the handler. Do this for the Bun.sleep occurrences (the initial
attach-wait, plus the other places mentioned) so tests wait on the server-send
condition rather than an arbitrary timeout.
|
@robobun try this: import { describe, it, expect } from 'bun:test'
import type { ServerWebSocket } from 'bun'
// Test to verify WebSocket early message handling in Bun
describe('WebSocket Early Message Queuing', () => {
it('should test addEventListener message handling with delay', async () => {
let resolveWebsocketOpen = () => { }
const promise = new Promise<void>((res) => { resolveWebsocketOpen = res })
// Create a minimal WebSocket server that sends messages immediately on connection
await using server = Bun.serve({
port: 0,
fetch(req, server) {
if (server.upgrade(req)) {
return // Upgrade succeeded
}
return new Response('WebSocket upgrade failed', { status: 500 })
},
websocket: {
open(ws: ServerWebSocket) {
// Send messages immediately after handshake
ws.send(JSON.stringify({ type: 'session_list', sessions: [] }))
ws.send(
JSON.stringify({
type: 'event_listener_msg',
data: 'Message for addEventListener test',
})
)
resolveWebsocketOpen()
},
message() {
// Handle messages if needed
},
close() {
// Handle close
},
},
})
const wsUrl = `ws://localhost:${server.port}/`
const ws = new WebSocket(wsUrl)
const receivedMessages: any[] = []
await promise
let resolveReceivedMessages = () => { }
const receivedMessagesPromise = new Promise<void>((res) => {
resolveReceivedMessages = res
setTimeout(res, 100)
})
function addWebSocketMessageListener() {
ws.addEventListener('message', (event) => {
const message = JSON.parse(event.data)
receivedMessages.push(message)
if (receivedMessages.length === 2) {
ws.close()
resolveReceivedMessages()
}
})
}
// This will pass, no messages lost
// setImmediate(addWebSocketMessageListener)
// This will fail, messages lost
setImmediate(() => setImmediate(addWebSocketMessageListener))
// This will pass, no messages lost
// Promise.resolve().then(() => {
// addWebSocketMessageListener()
// })
// This will fail, messages lost
// await new Promise<void>(res => setTimeout(res, 0))
// addWebSocketMessageListener()
// This will pass, no messages lost
// await new Promise(res => setImmediate(res))
// addWebSocketMessageListener()
// this will fail, messages lost
// await new Promise(res => setImmediate(res))
// await new Promise(res => setImmediate(res))
// addWebSocketMessageListener()
// This will pass, no messages lost
// queueMicrotask(addWebSocketMessageListener)
// This will pass, no messages lost
// queueMicrotask(() => queueMicrotask(addWebSocketMessageListener))
// This will pass, no messages lost
// queueMicrotask(() => queueMicrotask(() => queueMicrotask(addWebSocketMessageListener)))
// This will pass, no messages lost
// queueMicrotask(() => setImmediate(addWebSocketMessageListener))
// This will pass, no messages lost
// setImmediate(() => queueMicrotask(addWebSocketMessageListener))
// This will fail, messages lost
// setImmediate(() => queueMicrotask(() => setImmediate(addWebSocketMessageListener)))
await receivedMessagesPromise
// Test if addEventListener behaves the same as onmessage (should also lose messages)
expect(receivedMessages.length).toBe(2)
})
}) |
Summary
onmessagehandler is setProblem
WebSocket messages sent immediately after the handshake complete were being lost when the
onmessagehandler was not set at the time of message arrival. Browsers queue these messages until a handler is attached, but Bun was discarding them.Example that failed before this fix:
Solution
didReceiveMessage(text) anddidReceiveBinaryData(binary) to queue messages instead of dropping themonDidChangeListenercallback to detect when a message listener is first attachedflushPendingMessagesto dispatch all queued messages when a listener is addedTest plan
test/regression/issue/26560.test.tswith 5 test cases covering:onmessageis setaddEventListeneris used🤖 Generated with Claude Code