public void TryAsyncActionRecursively<TAsyncResult>( string asyncActionName, Func<Task<TAsyncResult>> asyncAction, Action<int> mainAction, Action<TAsyncResult> successAction, Func<string> getContextInfoFunc, Action<Exception> failedAction, int retryTimes) where TAsyncResult : AsyncOperationResult{ var retryAction = new Action<int>(currentRetryTimes => { if (currentRetryTimes >= _immediatelyRetryTimes) { Task.Factory.StartDelayedTask(_retryIntervalForIOException, () => mainAction(currentRetryTimes + 1)); } else { mainAction(currentRetryTimes + 1); } }); var executeFailedAction = new Action<Exception>(ex => { try { if (failedAction != null) { failedAction(ex); } } catch (Exception unknownEx) { _logger.Error(string.Format("Failed to execute the failedCallbackAction of asyncAction:{0}, contextInfo:{1}", asyncActionName, getContextInfoFunc()), unknownEx); } }); var PRocessTaskException = new Action<Exception, int>((ex, currentRetryTimes) => { if (ex is IOException) { _logger.Error(string.Format("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}", asyncActionName, getContextInfoFunc(), currentRetryTimes), ex); retryAction(retryTimes); } else { _logger.Error(string.Format("Async task '{0}' has unknown exception, contextInfo:{1}, current retryTimes:{2}", asyncActionName, getContextInfoFunc(), currentRetryTimes), ex); executeFailedAction(ex); } }); var completeAction = new Action<Task<TAsyncResult>>(t => { if (t.Exception != null) { processTaskException(t.Exception.InnerException, retryTimes); return; } if (t.IsCanceled) { _logger.ErrorFormat("Async task '{0}' was cancelled, contextInfo:{1}, current retryTimes:{2}", asyncActionName, getContextInfoFunc(), retryTimes); retryAction(retryTimes); return; } var result = t.Result; if (result.Status == AsyncOperationResultStatus.IOException) { _logger.ErrorFormat("Async task '{0}' has io exception, contextInfo:{1}, current retryTimes:{2}, errorMsg:{3}", asyncActionName, getContextInfoFunc(), retryTimes, result.ErrorMessage); retryAction(retryTimes); return; } if (successAction != null) { successAction(result); } }); try { asyncAction().ContinueWith(completeAction); } catch (IOException ex) { _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}", asyncActionName, getContextInfoFunc(), retryTimes), ex); retryAction(retryTimes); } catch (Exception ex) { _logger.Error(string.Format("Execute async action '{0}' failed, contextInfo:{1}, current retryTimes:{2}", asyncActionName, getContextInfoFunc(), retryTimes), ex); executeFailedAction(ex); }}
該函數(shù)的功能是:執(zhí)行一個異步任務(wù)(返回Task的方法),如果執(zhí)行出現(xiàn)IO異常,則重試當(dāng)前主函數(shù)(mainAction);用戶的mainAction中會再次調(diào)用TryAsyncActionRecursively方法。從而實現(xiàn)當(dāng)遇到IO異常時,能做到不斷重試。另外,重試只立即重試指定的次數(shù),超過指定次數(shù),則不立即重試,而是暫停一定間隔后再次執(zhí)行。該函數(shù)還提供當(dāng)acyncAction執(zhí)行成功或失敗后的回調(diào)函數(shù),以及允許傳入當(dāng)前上下文的一些說明信息,以便記錄有意義的錯誤日志信息。
下面是使用示例:
private void PublishEventAsync(ProcessingCommand processingCommand, EventStream eventStream, int retryTimes){ TryAsyncActionRecursively<AsyncOperationResult>("PublishEventAsync", () => _eventPublisher.PublishAsync(eventStream), currentRetryTimes => PublishEventAsync(processingCommand, eventStream, currentRetryTimes), result => { _logger.DebugFormat("Publish events success, {0}", eventStream); processingCommand.Complete(new CommandResult(CommandStatus.Success, processingCommand.Command.Id)); }, () => string.Format("[eventStream:{0}]", eventStream), ex => processingCommand.Complete(new CommandResult(CommandStatus.Failed, processingCommand.Command.Id)), retryTimes);}
PublishEventAsync(processingCommand, eventStream, 0);
新聞熱點
疑難解答