-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Refactor BigQuery write error handling and add timeout #4321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Refactor error handling in BigQuery write operations and add timeout for perform_write function.
Summary of ChangesHello @haiyuan-eng-google, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the reliability and robustness of BigQuery write operations within the analytics plugin. It refactors the existing error handling to be more structured, introduces a crucial timeout mechanism for write calls to prevent indefinite hangs, and improves the application's shutdown process by making the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Response from ADK Triaging Agent Hello @haiyuan-eng-google, thank you for creating this PR! To help us review your contribution more effectively, could you please fill out the "Testing Plan" section in your PR description? This is required for all PRs that are not small documentation or typo fixes. Additionally, could you please link to an existing issue or provide a more detailed description of the change in the PR body, following the structure of our issue templates? This information will help reviewers to better understand and test your changes. Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors the BigQuery write operation to include a timeout and improves the robustness of the _atexit_cleanup function. The changes are generally good, but I have a couple of suggestions to improve configurability and code clarity. Specifically, I recommend making the new write timeout configurable instead of hardcoded, and refactoring a confusing if True: block in the atexit handler.
| return | ||
| return | ||
|
|
||
| await asyncio.wait_for(perform_write(), timeout=30.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout for perform_write is hardcoded to 30.0 seconds. It's better to make this value configurable to allow adjustments for different environments without changing the code. I recommend adding a write_timeout attribute to the BigQueryLoggerConfig class and using it here, similar to other timeout configurations.
| await asyncio.wait_for(perform_write(), timeout=30.0) | |
| await asyncio.wait_for(perform_write(), timeout=self.config.write_timeout) |
| try: | ||
| # Check if the batch_processor object is still alive | ||
| if batch_processor and not batch_processor._shutdown: | ||
| pass | ||
| else: | ||
| return | ||
| except ReferenceError: | ||
| return | ||
|
|
||
| if True: # Indentation anchor, logic continues below |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The try...except ReferenceError block is a good addition for robustness. However, the if/else with pass can be simplified. More importantly, the if True: on line 1648 is an anti-pattern used as an 'indentation anchor' which harms readability. It should be removed, and the subsequent code block (lines 1650-1694) should be unindented.
Here's a suggestion for a cleaner implementation:
try:
# Check if the batch_processor object is still alive and not shut down.
if not batch_processor or batch_processor._shutdown:
return
except ReferenceError:
# The weak reference is no longer valid, so there's nothing to clean up.
return
# Emergency Flush: Rescue any logs remaining in the queue
# ... (rest of the function, unindented)
Refactor error handling in BigQuery write operations and add timeout for perform_write function.
Please ensure you have read the contribution guide before creating a pull request.
Link to Issue or Description of Change
1. Link to an existing issue (if applicable):
2. Or, if no issue exists, describe the change:
Problem:
The BigQuery
BatchProcessorworker thread could hang indefinitely duringwrite_client.append_rows()calls under certain failure conditions (e.g., silent connection drops or blocked RPCs). Because the worker waits for this call without a timeout, it stops processing new events. The internal queue eventually fills up (defaulting to 1000 items) and subsequent logs are dropped to prevent memory leaks, leading to a silent cessation of logging despite the application continuing to run. Additionally, aReferenceErrorwas occasionally observed during interpreter shutdown (_atexit_cleanup) when thebatch_processorobject had already been garbage collected.Solution:
write_client.append_rowscall withinasyncio.wait_forwith a 30-second timeout in_write_rows_with_retry.asyncio.TimeoutError. This ensures that if a write hangs, it fails fast, triggers the existing retry mechanism (with backoff), and eventually drops the problematic batch if retries are exhausted, allowing the worker to proceed to the next batch.try-except ReferenceErrorblock in_atexit_cleanupto prevent noisy errors during script termination.Testing Plan
Unit Tests:
Please include a summary of passed
pytestresults.Manual End-to-End (E2E) Tests:
I verified the changes using a reproduction script that mocks a hanging BigQuery writer.
append_rowsto return a hanging iterator andasyncio.wait_forto simulate a timeout.TimeoutErroris raised after the simulated timeout.Checklist
Additional context
This change prevents a critical failure mode where observability data is silently lost due to transient network issues or service hangs.