diff --git a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py index 84fa66eb66..e5da176dd2 100644 --- a/src/google/adk/plugins/bigquery_agent_analytics_plugin.py +++ b/src/google/adk/plugins/bigquery_agent_analytics_plugin.py @@ -867,41 +867,52 @@ async def _write_rows_with_retry(self, rows: list[dict[str, Any]]) -> None: async def requests_iter(): yield req - responses = await self.write_client.append_rows(requests_iter()) - async for response in responses: - error = getattr(response, "error", None) - error_code = getattr(error, "code", None) - if error_code and error_code != 0: - error_message = getattr(error, "message", "Unknown error") - logger.warning( - "BigQuery Write API returned error code %s: %s", - error_code, - error_message, - ) - if error_code in [ - _GRPC_DEADLINE_EXCEEDED, - _GRPC_INTERNAL, - _GRPC_UNAVAILABLE, - ]: # Deadline, Internal, Unavailable - raise ServiceUnavailable(error_message) - else: - if "schema mismatch" in error_message.lower(): - logger.error( - "BigQuery Schema Mismatch: %s. This usually means the" - " table schema does not match the expected schema.", - error_message, - ) + async def perform_write(): + responses = await self.write_client.append_rows(requests_iter()) + async for response in responses: + error = getattr(response, "error", None) + error_code = getattr(error, "code", None) + if error_code and error_code != 0: + error_message = getattr(error, "message", "Unknown error") + logger.warning( + "BigQuery Write API returned error code %s: %s", + error_code, + error_message, + ) + if error_code in [ + _GRPC_DEADLINE_EXCEEDED, + _GRPC_INTERNAL, + _GRPC_UNAVAILABLE, + ]: # Deadline, Internal, Unavailable + raise ServiceUnavailable(error_message) else: - logger.error("Non-retryable BigQuery error: %s", error_message) - row_errors = getattr(response, "row_errors", []) - if row_errors: - for row_error in row_errors: - logger.error("Row error details: %s", row_error) - logger.error("Row content causing error: %s", rows) - return + if "schema mismatch" in error_message.lower(): + logger.error( + "BigQuery Schema Mismatch: %s. This usually means the" + " table schema does not match the expected schema.", + error_message, + ) + else: + logger.error( + "Non-retryable BigQuery error: %s", error_message + ) + row_errors = getattr(response, "row_errors", []) + if row_errors: + for row_error in row_errors: + logger.error("Row error details: %s", row_error) + logger.error("Row content causing error: %s", rows) + return + return + + await asyncio.wait_for(perform_write(), timeout=30.0) return - except (ServiceUnavailable, TooManyRequests, InternalServerError) as e: + except ( + ServiceUnavailable, + TooManyRequests, + InternalServerError, + asyncio.TimeoutError, + ) as e: attempt += 1 if attempt > self.retry_config.max_retries: logger.error( @@ -1625,8 +1636,17 @@ def get_credentials(): @staticmethod def _atexit_cleanup(batch_processor: "BatchProcessor") -> None: """Clean up batch processor on script exit.""" - # Check if the batch_processor object is still alive - if batch_processor and not batch_processor._shutdown: + try: + # Check if the batch_processor object is still alive + if batch_processor and not batch_processor._shutdown: + pass + else: + return + except ReferenceError: + return + + if True: # Indentation anchor, logic continues below + # Emergency Flush: Rescue any logs remaining in the queue remaining_items = [] try: