Skip to content

Commit d3a33c8

Browse files
authored
Merge pull request #18 from Arkatufus/simplify-writer-actor
Simplify PersistenceWriterActor by using WithRetryActor
2 parents 162bc95 + 33f0370 commit d3a33c8

File tree

1 file changed

+33
-83
lines changed

1 file changed

+33
-83
lines changed

src/Akka.Persistence.Migration/Actors/PersistenceWriterActor.cs

Lines changed: 33 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -7,29 +7,24 @@
77

88
namespace Akka.Persistence.Migration.Actors;
99

10-
public class PersistenceWriterActor: ReceivePersistentActor, IWithTimers
10+
public class PersistenceWriterActor: ReceivePersistenceWithRetryActor<object, object>
1111
{
12-
private readonly ILoggingAdapter _log;
13-
private readonly List<Exception> _exceptions = [];
14-
private readonly int _maxRetries;
15-
private readonly TimeSpan _retryInterval;
16-
private PersistenceWriterProtocol.IOperation? _currentOperation;
12+
private IActorRef? _sender;
1713

1814
public PersistenceWriterActor(MigrationOptions options, string persistenceId)
1915
{
2016
PersistenceId = persistenceId;
2117

2218
JournalPluginId = options.ToJournalId;
2319
SnapshotPluginId = options.ToSnapshotStoreId;
24-
_maxRetries = options.MaxRetries;
25-
_retryInterval = options.RetryInterval;
26-
_log = Context.GetLogger();
27-
20+
MaxRetries = options.MaxRetries;
21+
RetryInterval = options.RetryInterval;
2822
Become(Idle);
2923
}
3024

3125
public override string PersistenceId { get; }
32-
public ITimerScheduler Timers { get; set; } = null!;
26+
protected override int MaxRetries { get; }
27+
protected override TimeSpan RetryInterval { get; }
3328

3429
private void Idle()
3530
{
@@ -43,109 +38,64 @@ private void Idle()
4338
{
4439
HandleSaveSnapshot(snapshot.Snapshot, Sender);
4540
});
41+
42+
CommandRetry();
43+
4644
CommandAny(HandleUnhandled);
4745
}
4846

4947
private void Writing()
5048
{
51-
Command<PersistenceWriterProtocol.PersistFailed>(fail => HandleFailure(fail.Cause));
52-
Command<SaveSnapshotSuccess>(_ => Self.Tell(PersistenceWriterProtocol.SnapshotWriteCompleted.Instance));
53-
Command<SaveSnapshotFailure>(fail => HandleFailure(fail.Cause));
54-
Command<PersistenceWriterProtocol.Retry>(_ => HandleRetry());
55-
Command<PersistenceWriterProtocol.IWriteSucceeded>(HandleComplete);
49+
CommandRetry();
50+
5651
CommandAny(HandleUnhandled);
5752
}
5853

5954
private void HandlePersist(object evt, IActorRef sender)
6055
{
61-
_currentOperation ??= new PersistenceWriterProtocol.PersistOperation(evt, sender);
56+
_sender = sender;
6257
Become(Writing);
6358

64-
_log.Debug($"Migrating event {evt}");
65-
Persist(evt, _ =>
59+
Log.Debug($"Migrating event {evt}");
60+
PersistWithRetry(evt, _ =>
6661
{
67-
_log.Debug($"Event {evt} migrated");
68-
Self.Tell(PersistenceWriterProtocol.PersistWriteCompleted.Instance);
62+
Log.Debug($"Event {evt} migrated");
63+
_sender.Tell(PersistenceWriterProtocol.PersistWriteCompleted.Instance);
64+
_sender = null;
65+
66+
Become(Idle);
6967
});
7068
}
7169

7270
private void HandleSaveSnapshot(object snapshot, IActorRef sender)
7371
{
74-
_currentOperation ??= new PersistenceWriterProtocol.SnapshotOperation(snapshot, sender);
72+
_sender = sender;
7573
Become(Writing);
7674

77-
SaveSnapshot(snapshot);
75+
SaveSnapshotWithRetry(snapshot);
7876
}
7977

80-
private void HandleComplete(PersistenceWriterProtocol.IWriteSucceeded message)
78+
private void HandleUnhandled(object msg)
8179
{
82-
_exceptions.Clear();
83-
_currentOperation!.ReplyTo.Tell(message);
84-
_currentOperation = null;
85-
86-
Become(Idle);
80+
Log.Error($"Illegal out of band command detected: {msg}");
81+
Unhandled(msg);
8782
}
8883

89-
private void HandleRetry()
84+
protected override void OnPersistFailure(Exception cause)
9085
{
91-
switch (_currentOperation)
92-
{
93-
case PersistenceWriterProtocol.PersistOperation p:
94-
HandlePersist(p.Message, p.ReplyTo);
95-
break;
96-
case PersistenceWriterProtocol.SnapshotOperation s:
97-
HandleSaveSnapshot(s.Message, s.ReplyTo);
98-
break;
99-
default:
100-
throw new AggregateException($"Unknown migration operation: {_currentOperation!.GetType()}", _exceptions);
101-
}
86+
_sender.Tell(new PersistenceWriterProtocol.PersistFailed(cause));
10287
}
10388

104-
private void HandleFailure(Exception cause)
89+
protected override void OnSaveSnapshotSuccess(SaveSnapshotSuccess success)
10590
{
106-
if (_currentOperation is null)
107-
throw new NullReferenceException("_currentOperation should not be null");
91+
_sender.Tell(PersistenceWriterProtocol.SnapshotWriteCompleted.Instance);
92+
_sender = null;
10893

109-
_exceptions.Add(cause);
110-
if (_exceptions.Count < _maxRetries)
111-
{
112-
_log.Info("{0} write operation failed ({1}/{2}), retrying in {3} seconds...",
113-
_currentOperation.Name, _exceptions.Count, _maxRetries, _retryInterval.TotalSeconds);
114-
115-
Timers.StartSingleTimer(PersistenceWriterProtocol.Retry.Instance, PersistenceWriterProtocol.Retry.Instance, _retryInterval);
116-
return;
117-
}
118-
119-
try
120-
{
121-
throw new AggregateException(
122-
message: string.Format(_currentOperation.ErrorMessage, _maxRetries),
123-
innerExceptions: _exceptions);
124-
}
125-
catch (AggregateException ex)
126-
{
127-
_log.Error(ex, _currentOperation.ErrorMessage, _maxRetries);
128-
_currentOperation.ReplyTo.Tell(_currentOperation.FailedMessage(ex));
129-
}
130-
}
131-
132-
private void HandleUnhandled(object msg)
133-
{
134-
_log.Error($"Illegal out of band command detected: {msg}");
135-
Unhandled(msg);
136-
}
137-
138-
protected override void OnPersistFailure(Exception cause, object @event, long sequenceNr)
139-
{
140-
Log.Warning(cause, "Rejected to persist event type [{0}] with sequence number [{1}] for persistenceId [{2}] due to [{3}].",
141-
@event.GetType(), sequenceNr, PersistenceId, cause.Message);
142-
_currentOperation?.ReplyTo.Tell(new PersistenceWriterProtocol.PersistFailed(cause));
94+
Become(Idle);
14395
}
144-
145-
protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr)
96+
97+
protected override void OnSaveSnapshotFailure(Exception cause)
14698
{
147-
Log.Warning(cause, "Rejected to persist event type [{0}] with sequence number [{1}] for persistenceId [{2}] due to [{3}].",
148-
@event.GetType(), sequenceNr, PersistenceId, cause.Message);
149-
Self.Tell(new PersistenceWriterProtocol.PersistFailed(cause));
99+
_sender.Tell(new PersistenceWriterProtocol.SnapshotFailed(cause));
150100
}
151101
}

0 commit comments

Comments
 (0)