diff --git a/CHANGELOG.md b/CHANGELOG.md index 2cb0126524..9ed3d72306 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## v1.1.4 + +### Fixed + +- DA client falls back to HTTP polling with `Retrieve` when the WebSocket connection fails, instead of trying to use the WS-only `Subscribe` over HTTP. A background goroutine retries WS every 30s so transient outages don't force a permanent downgrade [#3361](https://github.com/evstack/ev-node/pull/3361) + ## v1.1.3 ### Fixed diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index 8b05cad689..df08473d73 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -85,6 +85,8 @@ func (m *mockDA) HasForcedInclusionNamespace() bool { return true } +func (m *mockDA) SupportsSubscribe() bool { return true } + func (m *mockDA) GetLatestDAHeight(_ context.Context) (uint64, error) { return 0, nil } diff --git a/block/internal/da/async_block_retriever_test.go b/block/internal/da/async_block_retriever_test.go index d9e4613988..6bfcb57fef 100644 --- a/block/internal/da/async_block_retriever_test.go +++ b/block/internal/da/async_block_retriever_test.go @@ -53,6 +53,8 @@ func TestAsyncBlockRetriever_SubscriptionDrivenCaching(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + // Create a subscription channel that delivers one event then blocks. subCh := make(chan datypes.SubscriptionEvent, 1) subCh <- datypes.SubscriptionEvent{ @@ -104,6 +106,8 @@ func TestAsyncBlockRetriever_CatchupFillsGaps(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + // Subscription delivers height 105 (no blobs — just a signal). subCh := make(chan datypes.SubscriptionEvent, 1) subCh <- datypes.SubscriptionEvent{Height: 105} @@ -153,6 +157,8 @@ func TestAsyncBlockRetriever_HeightFromFuture(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + // Subscription delivers height 100 with no blobs. subCh := make(chan datypes.SubscriptionEvent) client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once() @@ -187,6 +193,8 @@ func TestAsyncBlockRetriever_StopGracefully(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + blockCh := make(chan datypes.SubscriptionEvent) client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe() client.On("Retrieve", mock.Anything, mock.Anything, fiNs).Return(datypes.ResultRetrieve{ @@ -211,6 +219,8 @@ func TestAsyncBlockRetriever_ReconnectOnSubscriptionError(t *testing.T) { client := &mocks.MockClient{} fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes() + client.On("SupportsSubscribe").Return(true) + // First subscription closes immediately (simulating error). closedCh := make(chan datypes.SubscriptionEvent) close(closedCh) diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 455d151d9f..0ffd7e41ed 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -32,6 +32,7 @@ type Config struct { type client struct { blobAPI *blobrpc.BlobAPI headerAPI *blobrpc.HeaderAPI + da *blobrpc.Client // kept to read live IsWebSocket after transport upgrades logger zerolog.Logger defaultTimeout time.Duration namespaceBz []byte @@ -131,6 +132,7 @@ func NewClient(cfg Config) FullClient { return &client{ blobAPI: &cfg.DA.Blob, headerAPI: &cfg.DA.Header, + da: cfg.DA, logger: cfg.Logger.With().Str("component", "da_client").Logger(), defaultTimeout: cfg.DefaultTimeout, namespaceBz: datypes.NamespaceFromString(cfg.Namespace).Bytes(), @@ -485,6 +487,13 @@ func (c *client) HasForcedInclusionNamespace() bool { return c.hasForcedNamespace } +// SupportsSubscribe reports whether the underlying transport supports +// channel-based subscriptions (WebSocket). Reads the live IsWebSocket flag +// from the jsonrpc client so transport upgrades are visible immediately. +func (c *client) SupportsSubscribe() bool { + return c.da != nil && c.da.IsWebSocket.Load() +} + // Subscribe subscribes to blobs in the given namespace via the celestia-node // Subscribe API. It returns a channel that emits a SubscriptionEvent for every // DA block containing a matching blob. The channel is closed when ctx is diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index f1272087f0..e1ff8d6fcf 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -31,6 +31,11 @@ type Client interface { // GetLatestDAHeight returns the latest height available on the DA layer. GetLatestDAHeight(ctx context.Context) (uint64, error) + // SupportsSubscribe reports whether the underlying transport supports + // channel-based subscriptions (WebSocket). When false, callers must use + // polling-based retrieval via Retrieve instead. + SupportsSubscribe() bool + // Namespace accessors. GetHeaderNamespace() []byte GetDataNamespace() []byte diff --git a/block/internal/da/subscriber.go b/block/internal/da/subscriber.go index 8ff46773ce..cf3dd49084 100644 --- a/block/internal/da/subscriber.go +++ b/block/internal/da/subscriber.go @@ -122,10 +122,14 @@ func (s *Subscriber) Start(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) s.cancel = cancel - s.wg.Add(2) s.lifecycleMu.Unlock() - go s.followLoop(ctx) + s.wg.Add(2) + if s.client.SupportsSubscribe() { + go s.followLoop(ctx) + } else { + go s.pollLoop(ctx) + } go s.catchupLoop(ctx) return nil @@ -167,6 +171,72 @@ func (s *Subscriber) signalCatchup() { } } +// pollLoop periodically queries the latest DA height and triggers +// catchup when new heights are available. The catchup loop fetches blobs +// via Retrieve (which uses GetAll) so each height is fetched exactly once. +// Periodically checks whether the underlying transport has been upgraded +// to WebSocket and switches to followLoop when that happens. +func (s *Subscriber) pollLoop(ctx context.Context) { + defer s.wg.Done() + + s.logger.Info().Msg("starting poll loop") + defer s.logger.Info().Msg("poll loop stopped") + + // Do an immediate poll on startup so we don't wait for the first tick. + s.pollDAHeight(ctx) + + interval := s.daBlockTime + if interval <= 0 { + interval = 2 * time.Second + } + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + // If the transport has been upgraded to WS in the background, + // switch to the subscription-based follow loop. + if s.client.SupportsSubscribe() { + s.logger.Info().Msg("WebSocket available, switching from poll to follow loop") + s.wg.Add(1) + go s.followLoop(ctx) + return + } + + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.pollDAHeight(ctx) + } + } +} + +// pollDAHeight queries GetLatestDAHeight and signals catchup when a new +// height is observed. The actual blob retrieval is done by catchupLoop. +func (s *Subscriber) pollDAHeight(ctx context.Context) { + height, err := s.client.GetLatestDAHeight(ctx) + if err != nil { + if ctx.Err() != nil { + return + } + s.logger.Warn().Err(err).Msg("poll: failed to get latest DA height") + return + } + + cur := s.highestSeenDAHeight.Load() + if height <= cur { + return + } + + s.seenSubscriptionEvent.Store(true) + s.logger.Debug(). + Uint64("new_da_height", height). + Uint64("current_highest_seen", cur). + Msg("poll: observed new DA height") + + s.updateHighest(height) +} + // followLoop subscribes to DA blob events and keeps highestSeenDAHeight up to date. func (s *Subscriber) followLoop(ctx context.Context) { defer s.wg.Done() diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index c41c920494..e8e5ab391a 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -165,6 +165,9 @@ func (t *tracedClient) GetForcedInclusionNamespace() []byte { func (t *tracedClient) HasForcedInclusionNamespace() bool { return t.inner.HasForcedInclusionNamespace() } +func (t *tracedClient) SupportsSubscribe() bool { + return t.inner.SupportsSubscribe() +} func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte, includeTimestamp bool) (<-chan datypes.SubscriptionEvent, error) { return t.inner.Subscribe(ctx, namespace, includeTimestamp) } diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index 9ea344e3ab..74bf3882eb 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -74,6 +74,7 @@ func (m *mockFullClient) GetHeaderNamespace() []byte { func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} } func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} } func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true } +func (m *mockFullClient) SupportsSubscribe() bool { return true } // setup a tracer provider + span recorder func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) { diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index a8f1b01102..3d4a21d634 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -116,6 +116,7 @@ func makeSignedHeaderBytes( func setupMockDAClient(tb testing.TB) (da.Client, chan datypes.SubscriptionEvent) { mockClient := testmocks.NewMockClient(tb) eventCh := make(chan datypes.SubscriptionEvent, 1) + mockClient.EXPECT().SupportsSubscribe().Return(true).Maybe() mockClient.EXPECT().Subscribe(mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(eventCh), nil).Maybe() return mockClient, eventCh } diff --git a/pkg/da/jsonrpc/client.go b/pkg/da/jsonrpc/client.go index 664da4114b..bfd6cb9a26 100644 --- a/pkg/da/jsonrpc/client.go +++ b/pkg/da/jsonrpc/client.go @@ -5,6 +5,9 @@ import ( "fmt" "net/http" "strings" + "sync" + "sync/atomic" + "time" libshare "github.com/celestiaorg/go-square/v3/share" "github.com/filecoin-project/go-jsonrpc" @@ -13,15 +16,30 @@ import ( // Client dials the celestia-node RPC "blob" and "header" namespaces. type Client struct { - Blob BlobAPI - Header HeaderAPI - closer jsonrpc.ClientCloser + Blob BlobAPI + Header HeaderAPI + IsWebSocket atomic.Bool + + mu sync.Mutex + closer jsonrpc.ClientCloser + retryCancel context.CancelFunc // stops the background WS retry loop } -// Close closes the underlying JSON-RPC connection. +// Close closes the underlying JSON-RPC connection and stops any +// background WebSocket retry loop. func (c *Client) Close() { - if c != nil && c.closer != nil { - c.closer() + if c == nil { + return + } + c.mu.Lock() + if c.retryCancel != nil { + c.retryCancel() + c.retryCancel = nil + } + closer := c.closer + c.mu.Unlock() + if closer != nil { + closer() } } @@ -72,18 +90,88 @@ func NewClient(ctx context.Context, addr, token string, authHeaderName string) ( // NewWSClient connects to the DA RPC endpoint over WebSocket. // Automatically converts http:// to ws:// (and https:// to wss://). // Supports channel-based subscriptions (e.g. Subscribe). -// Note: WebSocket connections are eager — they connect at creation time -// if the initial WS dial fails, falls back to HTTP polling for the entire session. +// WebSocket connections are eager — they connect at creation time. +// If the initial WS dial fails, it falls back to HTTP polling and spawns a +// background goroutine that periodically retries the WS connection. When +// the WS endpoint becomes reachable, the transport is transparently upgraded. func NewWSClient(ctx context.Context, logger zerolog.Logger, addr, token string, authHeaderName string) (*Client, error) { client, err := NewClient(ctx, httpToWS(addr), token, authHeaderName) if err != nil { logger.Warn().Err(err).Msg("DA websocket connection failed, falling back to DA polling") - return NewClient(ctx, addr, token, authHeaderName) + client, err = NewClient(ctx, addr, token, authHeaderName) + if err != nil { + return nil, err + } + client.IsWebSocket.Store(false) + + // Retry WS in the background so transient outages don't force a permanent downgrade. + retryCtx, retryCancel := context.WithCancel(context.Background()) + client.retryCancel = retryCancel + go client.retryWSLoop(retryCtx, logger, addr, token, authHeaderName) + + return client, nil } + client.IsWebSocket.Store(true) return client, nil } +const wsRetryInterval = 30 * time.Second + +// retryWSLoop periodically attempts to re-establish a WebSocket connection. +// When successful, it swaps the transport in-place and exits. +func (c *Client) retryWSLoop(ctx context.Context, logger zerolog.Logger, addr, token, authHeaderName string) { + ticker := time.NewTicker(wsRetryInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if c.tryUpgradeWS(ctx, logger, addr, token, authHeaderName) { + return + } + } + } +} + +// tryUpgradeWS attempts to open a WS connection and, if successful, swaps +// the transport internals so subsequent calls use WebSocket. Returns true +// when the upgrade succeeds (or the client is already on WS). +func (c *Client) tryUpgradeWS(ctx context.Context, logger zerolog.Logger, addr, token, authHeaderName string) bool { + wsClient, err := NewClient(ctx, httpToWS(addr), token, authHeaderName) + if err != nil { + return false + } + + c.mu.Lock() + defer c.mu.Unlock() + + // Another goroutine may have already upgraded. + if c.IsWebSocket.Load() { + wsClient.Close() + return true + } + + // Swap function pointers from the new WS client into the active client. + c.Blob.Internal = wsClient.Blob.Internal + c.Header.Internal = wsClient.Header.Internal + + // Close the old HTTP connections and wire the new closer. + oldCloser := c.closer + c.closer = func() { + wsClient.closer() + if oldCloser != nil { + oldCloser() + } + } + + c.IsWebSocket.Store(true) + logger.Info().Msg("DA websocket connection restored, switching back from HTTP polling") + return true +} + // BlobAPI mirrors celestia-node's blob module (nodebuilder/blob/blob.go). // jsonrpc.NewClient wires Internal.* to RPC stubs. type BlobAPI struct { diff --git a/pkg/sequencers/single/sequencer_test.go b/pkg/sequencers/single/sequencer_test.go index a2374bb192..362f265c18 100644 --- a/pkg/sequencers/single/sequencer_test.go +++ b/pkg/sequencers/single/sequencer_test.go @@ -50,6 +50,7 @@ func setupForcedInclusionMockDA(t *testing.T, mockDA *MockFullDAClient, latestDA mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(latestDAHeight, nil).Maybe() @@ -910,6 +911,7 @@ func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 101 — close to sequencer start (100), no catch-up needed. @@ -1019,6 +1021,7 @@ func TestSequencer_GetNextBatch_EmptyDABatch_IncreasesDAHeight(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 100 — same as sequencer start, no catch-up needed @@ -1117,6 +1120,7 @@ func TestSequencer_GetNextBatch_WithGasFiltering(t *testing.T) { mockDA.MockClient.On("GetBlobsAtHeight", mock.Anything, mock.Anything, mock.Anything). Return(forcedTxs, nil).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return([]byte("forced")).Maybe() mockDA.MockClient.On("MaxBlobSize", mock.Anything).Return(uint64(1000000), nil).Maybe() @@ -1224,6 +1228,7 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) { mockDA := newMockFullDAClient(t) mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("Retrieve", mock.Anything, mock.Anything, mock.Anything).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture}, @@ -1295,6 +1300,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at height 105 — sequencer starts at 100 with epoch size 1, @@ -1369,6 +1375,7 @@ func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 105 — sequencer starts at 100 with epoch size 1, @@ -1465,6 +1472,7 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 105 — multiple epochs ahead, triggers catch-up @@ -1528,6 +1536,7 @@ func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 105 — multiple epochs ahead, triggers catch-up @@ -1737,6 +1746,7 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 106 — sequencer starts at 100 with epoch size 1, @@ -1892,6 +1902,7 @@ func TestSequencer_CatchUp_CheckpointAdvancesDuringCatchUp(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is at 105 — multiple epochs ahead, triggers catch-up @@ -1991,6 +2002,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() // DA head is far ahead — triggers catch-up @@ -2121,6 +2133,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps_EmptyEpoch(t *testing.T) { mockDA.MockClient.On("GetDataNamespace").Return([]byte("data")).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return(forcedInclusionNS).Maybe() mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("GetLatestDAHeight", mock.Anything).Return(uint64(110), nil).Once() @@ -2201,6 +2214,7 @@ func TestSequencer_GetNextBatch_GasFilteringPreservesUnprocessedTxs(t *testing.T mockDA := newMockFullDAClient(t) mockDA.MockClient.On("HasForcedInclusionNamespace").Return(true).Maybe() + mockDA.MockClient.On("SupportsSubscribe").Return(true).Maybe() mockDA.MockClient.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe() mockDA.MockClient.On("GetForcedInclusionNamespace").Return([]byte("forced")).Maybe() mockDA.MockClient.On("MaxBlobSize", mock.Anything).Return(uint64(1000000), nil).Maybe() diff --git a/test/mocks/da.go b/test/mocks/da.go index e1daf70038..1309afc389 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -7,7 +7,7 @@ package mocks import ( "context" - "github.com/evstack/ev-node/pkg/da/types" + da "github.com/evstack/ev-node/pkg/da/types" mock "github.com/stretchr/testify/mock" ) @@ -555,6 +555,50 @@ func (_c *MockClient_Submit_Call) RunAndReturn(run func(ctx context.Context, dat return _c } +// SupportsSubscribe provides a mock function for the type MockClient +func (_mock *MockClient) SupportsSubscribe() bool { + ret := _mock.Called() + + if len(ret) == 0 { + panic("no return value specified for SupportsSubscribe") + } + + var r0 bool + if returnFunc, ok := ret.Get(0).(func() bool); ok { + r0 = returnFunc() + } else { + r0 = ret.Get(0).(bool) + } + return r0 +} + +// MockClient_SupportsSubscribe_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SupportsSubscribe' +type MockClient_SupportsSubscribe_Call struct { + *mock.Call +} + +// SupportsSubscribe is a helper method to define mock.On call +func (_e *MockClient_Expecter) SupportsSubscribe() *MockClient_SupportsSubscribe_Call { + return &MockClient_SupportsSubscribe_Call{Call: _e.mock.On("SupportsSubscribe")} +} + +func (_c *MockClient_SupportsSubscribe_Call) Run(run func()) *MockClient_SupportsSubscribe_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockClient_SupportsSubscribe_Call) Return(b bool) *MockClient_SupportsSubscribe_Call { + _c.Call.Return(b) + return _c +} + +func (_c *MockClient_SupportsSubscribe_Call) RunAndReturn(run func() bool) *MockClient_SupportsSubscribe_Call { + _c.Call.Return(run) + return _c +} + // Subscribe provides a mock function for the type MockClient func (_mock *MockClient) Subscribe(ctx context.Context, namespace []byte, fetchTimestamp bool) (<-chan da.SubscriptionEvent, error) { ret := _mock.Called(ctx, namespace, fetchTimestamp) diff --git a/test/testda/dummy.go b/test/testda/dummy.go index e1f93642c4..04599a8f58 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -243,6 +243,10 @@ func (d *DummyDA) GetForcedInclusionNamespace() []byte { return nil } // HasForcedInclusionNamespace reports whether forced inclusion is configured. func (d *DummyDA) HasForcedInclusionNamespace() bool { return false } +// SupportsSubscribe reports whether the underlying transport supports +// channel-based subscriptions. +func (d *DummyDA) SupportsSubscribe() bool { return true } + // GetLatestDAHeight returns the current DA height (the latest height available). func (d *DummyDA) GetLatestDAHeight(_ context.Context) (uint64, error) { return d.height.Load(), nil