-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Refactor BigQuery write error handling and add timeout #4321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -867,41 +867,52 @@ async def _write_rows_with_retry(self, rows: list[dict[str, Any]]) -> None: | |
| async def requests_iter(): | ||
| yield req | ||
|
|
||
| responses = await self.write_client.append_rows(requests_iter()) | ||
| async for response in responses: | ||
| error = getattr(response, "error", None) | ||
| error_code = getattr(error, "code", None) | ||
| if error_code and error_code != 0: | ||
| error_message = getattr(error, "message", "Unknown error") | ||
| logger.warning( | ||
| "BigQuery Write API returned error code %s: %s", | ||
| error_code, | ||
| error_message, | ||
| ) | ||
| if error_code in [ | ||
| _GRPC_DEADLINE_EXCEEDED, | ||
| _GRPC_INTERNAL, | ||
| _GRPC_UNAVAILABLE, | ||
| ]: # Deadline, Internal, Unavailable | ||
| raise ServiceUnavailable(error_message) | ||
| else: | ||
| if "schema mismatch" in error_message.lower(): | ||
| logger.error( | ||
| "BigQuery Schema Mismatch: %s. This usually means the" | ||
| " table schema does not match the expected schema.", | ||
| error_message, | ||
| ) | ||
| async def perform_write(): | ||
| responses = await self.write_client.append_rows(requests_iter()) | ||
| async for response in responses: | ||
| error = getattr(response, "error", None) | ||
| error_code = getattr(error, "code", None) | ||
| if error_code and error_code != 0: | ||
| error_message = getattr(error, "message", "Unknown error") | ||
| logger.warning( | ||
| "BigQuery Write API returned error code %s: %s", | ||
| error_code, | ||
| error_message, | ||
| ) | ||
| if error_code in [ | ||
| _GRPC_DEADLINE_EXCEEDED, | ||
| _GRPC_INTERNAL, | ||
| _GRPC_UNAVAILABLE, | ||
| ]: # Deadline, Internal, Unavailable | ||
| raise ServiceUnavailable(error_message) | ||
| else: | ||
| logger.error("Non-retryable BigQuery error: %s", error_message) | ||
| row_errors = getattr(response, "row_errors", []) | ||
| if row_errors: | ||
| for row_error in row_errors: | ||
| logger.error("Row error details: %s", row_error) | ||
| logger.error("Row content causing error: %s", rows) | ||
| return | ||
| if "schema mismatch" in error_message.lower(): | ||
| logger.error( | ||
| "BigQuery Schema Mismatch: %s. This usually means the" | ||
| " table schema does not match the expected schema.", | ||
| error_message, | ||
| ) | ||
| else: | ||
| logger.error( | ||
| "Non-retryable BigQuery error: %s", error_message | ||
| ) | ||
| row_errors = getattr(response, "row_errors", []) | ||
| if row_errors: | ||
| for row_error in row_errors: | ||
| logger.error("Row error details: %s", row_error) | ||
| logger.error("Row content causing error: %s", rows) | ||
| return | ||
| return | ||
|
|
||
| await asyncio.wait_for(perform_write(), timeout=30.0) | ||
| return | ||
|
|
||
| except (ServiceUnavailable, TooManyRequests, InternalServerError) as e: | ||
| except ( | ||
| ServiceUnavailable, | ||
| TooManyRequests, | ||
| InternalServerError, | ||
| asyncio.TimeoutError, | ||
| ) as e: | ||
| attempt += 1 | ||
| if attempt > self.retry_config.max_retries: | ||
| logger.error( | ||
|
|
@@ -1625,8 +1636,17 @@ def get_credentials(): | |
| @staticmethod | ||
| def _atexit_cleanup(batch_processor: "BatchProcessor") -> None: | ||
| """Clean up batch processor on script exit.""" | ||
| # Check if the batch_processor object is still alive | ||
| if batch_processor and not batch_processor._shutdown: | ||
| try: | ||
| # Check if the batch_processor object is still alive | ||
| if batch_processor and not batch_processor._shutdown: | ||
| pass | ||
| else: | ||
| return | ||
| except ReferenceError: | ||
| return | ||
|
|
||
| if True: # Indentation anchor, logic continues below | ||
|
Comment on lines
+1639
to
+1648
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Here's a suggestion for a cleaner implementation: try:
# Check if the batch_processor object is still alive and not shut down.
if not batch_processor or batch_processor._shutdown:
return
except ReferenceError:
# The weak reference is no longer valid, so there's nothing to clean up.
return
# Emergency Flush: Rescue any logs remaining in the queue
# ... (rest of the function, unindented) |
||
|
|
||
| # Emergency Flush: Rescue any logs remaining in the queue | ||
| remaining_items = [] | ||
| try: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout for
perform_writeis hardcoded to 30.0 seconds. It's better to make this value configurable to allow adjustments for different environments without changing the code. I recommend adding awrite_timeoutattribute to theBigQueryLoggerConfigclass and using it here, similar to other timeout configurations.