66from awslambdaric import __version__
77from .lambda_runtime_exception import FaultException
88from .lambda_runtime_marshaller import to_json
9+ import logging
10+ import time
911
1012ERROR_TYPE_HEADER = "Lambda-Runtime-Function-Error-Type"
13+ # Retry config constants
14+ DEFAULT_RETRY_MAX_ATTEMPTS = 5
15+ DEFAULT_RETRY_INITIAL_DELAY = 0.1 # seconds
16+ DEFAULT_RETRY_BACKOFF_FACTOR = 2.0
1117
1218
1319def _user_agent ():
@@ -46,13 +52,17 @@ def __init__(self, endpoint, response_code, response_body):
4652 )
4753
4854
49- class LambdaRuntimeClient (object ):
55+ class BaseLambdaRuntimeClient (object ):
5056 marshaller = LambdaMarshaller ()
5157 """marshaller is a class attribute that determines the unmarshalling and marshalling logic of a function's event
5258 and response. It allows for function authors to override the the default implementation, LambdaMarshaller which
5359 unmarshals and marshals JSON, to an instance of a class that implements the same interface."""
5460
55- def __init__ (self , lambda_runtime_address , use_thread_for_polling_next = False ):
61+ def __init__ (
62+ self ,
63+ lambda_runtime_address ,
64+ use_thread_for_polling_next = False ,
65+ ):
5666 self .lambda_runtime_address = lambda_runtime_address
5767 self .use_thread_for_polling_next = use_thread_for_polling_next
5868 if self .use_thread_for_polling_next :
@@ -94,9 +104,16 @@ def post_init_error(self, error_response_data, error_type_override=None):
94104 else error_response_data ["errorType" ]
95105 )
96106 }
97- self .call_rapid (
98- "POST" , endpoint , http .HTTPStatus .ACCEPTED , error_response_data , headers
99- )
107+ try :
108+ self .call_rapid (
109+ "POST" , endpoint , http .HTTPStatus .ACCEPTED , error_response_data , headers
110+ )
111+ except Exception as e :
112+ self .handle_init_error (e )
113+
114+ def handle_init_error (self , exc ):
115+ """Override in subclasses to customize init error handling."""
116+ raise NotImplementedError
100117
101118 def restore_next (self ):
102119 import http
@@ -113,14 +130,24 @@ def report_restore_error(self, restore_error_data):
113130 "POST" , endpoint , http .HTTPStatus .ACCEPTED , restore_error_data , headers
114131 )
115132
133+ def handle_exception (self , exc , func_to_retry = None , use_backoff = False ):
134+ """Override in subclasses to customize error handling."""
135+ raise NotImplementedError
136+
137+ def _get_next (self ):
138+ try :
139+ return runtime_client .next ()
140+ except Exception as e :
141+ return self .handle_exception (e , runtime_client .next , True )
142+
116143 def wait_next_invocation (self ):
117144 # Calling runtime_client.next() from a separate thread unblocks the main thread,
118145 # which can then process signals.
119146 if self .use_thread_for_polling_next :
120147 try :
121148 # TPE class is supposed to be registered at construction time and be ready to use.
122149 with self .ThreadPoolExecutor (max_workers = 1 ) as executor :
123- future = executor .submit (runtime_client . next )
150+ future = executor .submit (self . _get_next )
124151 response_body , headers = future .result ()
125152 except Exception as e :
126153 raise FaultException (
@@ -145,17 +172,66 @@ def wait_next_invocation(self):
145172 def post_invocation_result (
146173 self , invoke_id , result_data , content_type = "application/json"
147174 ):
148- runtime_client .post_invocation_result (
149- invoke_id ,
150- (
151- result_data
152- if isinstance (result_data , bytes )
153- else result_data .encode ("utf-8" )
154- ),
155- content_type ,
156- )
175+ try :
176+ runtime_client .post_invocation_result (
177+ invoke_id ,
178+ (
179+ result_data
180+ if isinstance (result_data , bytes )
181+ else result_data .encode ("utf-8" )
182+ ),
183+ content_type ,
184+ )
185+ except Exception as e :
186+ self .handle_exception (e )
157187
158188 def post_invocation_error (self , invoke_id , error_response_data , xray_fault ):
159- max_header_size = 1024 * 1024 # 1MiB
160- xray_fault = xray_fault if len (xray_fault .encode ()) < max_header_size else ""
161- runtime_client .post_error (invoke_id , error_response_data , xray_fault )
189+ try :
190+ max_header_size = 1024 * 1024
191+ xray_fault = (
192+ xray_fault if len (xray_fault .encode ()) < max_header_size else ""
193+ )
194+ runtime_client .post_error (invoke_id , error_response_data , xray_fault )
195+ except Exception as e :
196+ self .handle_exception (e )
197+
198+
199+ class LambdaRuntimeClient (BaseLambdaRuntimeClient ):
200+ def handle_exception (self , exc , func_to_retry = None , use_backoff = False ):
201+ raise exc
202+
203+ def handle_init_error (self , exc ):
204+ raise exc
205+
206+
207+ class LambdaElevatorRuntimeClient (BaseLambdaRuntimeClient ):
208+ def _get_next_with_backoff (self , e , func_to_retry ):
209+ logging .warning (f"Initial runtime_client.next() failed: { e } " )
210+ delay = DEFAULT_RETRY_INITIAL_DELAY
211+ latest_exception = None
212+ for attempt in range (1 , DEFAULT_RETRY_MAX_ATTEMPTS ):
213+ try :
214+ logging .info (
215+ f"Retrying runtime_client.next() [attempt { attempt + 1 } ]..."
216+ )
217+ time .sleep (delay )
218+ return func_to_retry ()
219+ except Exception as e :
220+ logging .warning (f"Attempt { attempt + 1 } failed: { e } " )
221+ delay *= DEFAULT_RETRY_BACKOFF_FACTOR
222+ latest_exception = e
223+
224+ raise latest_exception
225+
226+ # In elevator we don't want to raises unhandled exception and crash the worker on non-2xx responses from RAPID
227+ def handle_exception (self , exc , func_to_retry = None , use_backoff = False ):
228+ if use_backoff :
229+ return self ._get_next_with_backoff (exc , func_to_retry )
230+ # We retry if getting next invoke failed, but if posting response to RAPID failed we just log it and continue
231+ logging .warning (f"{ exc } : This won't kill the Runtime loop" )
232+
233+ def handle_init_error (self , exc ):
234+ if isinstance (exc , LambdaRuntimeClientError ) and exc .response_code == 403 :
235+ # Suppress 403 errors from RAPID during init - indicates another runtime worker has already posted init error
236+ return
237+ raise exc
0 commit comments