⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content

Conversation

@tac0turtle
Copy link
Contributor

Overview

this pr add sync modes allowing us to move to subscriptions when at the head of the chain

@claude
Copy link
Contributor

claude bot commented Jan 8, 2026

Claude finished @tac0turtle's task in 2m 12s —— View job


Comprehensive Code Review

I've completed a thorough review of the sync mode implementation. This is a significant architectural improvement that introduces dynamic synchronization modes for the DA worker. Below are my findings across multiple dimensions:


1. Architecture & Design

Strengths

Well-designed state machine: The dual-mode approach (catchup vs follow) is architecturally sound and addresses a real performance need. The transition logic in determineSyncMode() (syncer.go:452-472) with the catchupThreshold provides appropriate hysteresis.

Clean separation of concerns: The introduction of AsyncBlockRetriever for prefetching and the subscription mechanism are properly abstracted and testable.

Appropriate use of interfaces: The DARetriever.ProcessBlobs() method (da_retriever.go:156) allows both polling and subscription modes to reuse the same blob processing logic, which is excellent for consistency.

⚠️ Concerns

Interface violation (syncer.go:603): The syncer calls s.daRetriever.ProcessBlobs() directly, but this method is not part of the DARetriever interface defined in da_retriever.go:22-27. While this works because the concrete type is used, it breaks the abstraction.

Fix interface abstraction →

Confusing naming (as noted by reviewer randygrok): The LocalHead() method name in client.go:458 is ambiguous. It's actually querying the DA layer's local head, not the rollup node's local head. Consider renaming to DALocalHead() or GetDANodeLocalHead() for clarity.

Clarify LocalHead naming →


2. Concurrency & Thread Safety

Strengths

Proper atomic operations: The use of atomic.Int32 for currentSyncMode (syncer.go:158, 434) and atomic.Uint64 for height tracking is correct and race-free.

Context cancellation: The subscription handling properly uses context cancellation (syncer.go:524-525) and cleanup is well-structured.

⚠️ Concerns

Potential race in async_block_retriever.go:114-126: The compare-and-swap loop for UpdateCurrentHeight is correct, but the infinite loop without a bounded retry could theoretically spin forever under extreme contention. Consider adding a maximum retry count or using atomic.Max() if available in your Go version.

Channel blocking in follow mode (syncer.go:608-618): When the heightInCh is full, events are cached to pending. However, if the channel is consistently full during follow mode, this could lead to increased memory usage in the cache. Consider monitoring cache.PendingEventCount() and falling back to catchup mode if the pending queue grows too large.


3. Error Handling & Resilience

Strengths

Graceful degradation: The determineSyncMode() method (syncer.go:458-463) correctly defaults to catchup mode on error, which is the safer fallback behavior.

Watchdog mechanism: The subscription watchdog timeout (syncer.go:574-580) prevents the system from getting stuck in follow mode if DA updates stop arriving.

⚠️ Concerns

Silent subscription failures: In processSubscriptionResponse() (syncer.go:586-627), blob processing errors are logged but don't trigger a mode switch or subscription restart. If blobs are malformed, the node could miss blocks silently. Consider accumulating error counts and switching modes after N consecutive failures.

Missing error handling for forced inclusion subscription: The forced_inclusion_retriever.go:97-113 handles subscription responses but doesn't validate or report errors. If forced inclusion blobs are corrupted, they'll be silently dropped.


4. Performance & Resource Efficiency

Strengths

Reduced polling overhead: The follow mode subscription eliminates wasteful polling when caught up, which is the primary goal of this PR.

Prefetching strategy: The AsyncBlockRetriever with prefetch window (async_block_retriever.go:58, 73-74) is well-designed for smooth catchup.

Cache cleanup: The cleanupOldBlocks() method (async_block_retriever.go:356-395) prevents unbounded memory growth.

⚠️ Concerns

Double caching: Both AsyncBlockRetriever (async_block_retriever.go) and the existing cache manager cache DA blocks. This seems redundant and wastes memory. Consider consolidating or clarifying the caching strategy.

Prefetch window sizing: The forced inclusion retriever uses daEpochSize*2 as the prefetch window (forced_inclusion_retriever.go:74), but there's no justification for why 2x is optimal. This could be excessive for long epochs.

Synchronous cleanup query (async_block_retriever.go:366-371): The datastore query in cleanupOldBlocks() uses f.ctx which could be cancelled during shutdown, leaving stale entries. Consider using context.Background() with a timeout for cleanup operations.


5. Security

Strengths

No new security vulnerabilities identified in the sync mode switching logic.

Proper validation: Blob processing reuses existing validation logic from daRetriever, maintaining security properties.

⚠️ Concerns

Subscription trust model: The subscription mechanism trusts the DA node to send correct blobs. If a malicious DA node sends crafted blobs at high frequency, the node could process garbage data. While existing validation should catch this, consider adding rate limiting or subscription source validation.


6. Testing & Code Coverage

⚠️ Critical Gaps

Low test coverage: Codecov reports only 30.76% patch coverage with 198 lines missing coverage. Key gaps include:

  • syncer.go:619 (subscription event handling) - 78 missing lines
  • async_block_retriever.go - 41 missing lines
  • forced_inclusion_retriever.go - 28 missing lines
  • subscription.go - 12 missing lines (new file, completely untested)
  • client.go Subscribe/LocalHead methods - 12 missing lines

Missing integration tests: No tests verify the catchup→follow→catchup mode transitions work correctly end-to-end.

No subscription failure tests: The runFollowMode() error paths (syncer.go:502-513) aren't tested.

Add mode transition tests →


7. Code Quality & Maintainability

Strengths

Clear logging: Structured logging with appropriate levels throughout (e.g., syncer.go:436-439 for mode switches).

Well-documented constants: catchupThreshold and followWatchdogMultiplier (syncer.go:55-61) are clearly explained.

Idiomatic Go: Code follows Go conventions and is consistent with the existing codebase style.

⚠️ Concerns

Magic numbers: The prefetch window of 10 (async_block_retriever.go:74) and watchdog multiplier of 3 (syncer.go:61) lack justification. Document why these values were chosen or make them configurable.

Dead code (as noted by julienrbrt): The nil channel check in syncer.go:565 is unnecessary because Go's select already handles nil channels correctly. Remove for clarity.

Duplicate tracking (as noted by julienrbrt): The forced inclusion retriever's tracking logic (forced_inclusion_retriever.go:36-38) may be redundant with the async fetcher's internal tracking. Needs cleanup.


8. Metrics & Observability

Strengths

Comprehensive metrics: New metrics for SyncMode, SubscribeErrors, and ModeSwitches (metrics.go:74-76) provide excellent visibility into the new behavior.

Mode transition logging: Clear log messages when switching modes (syncer.go:436-439).

⚠️ Suggestions

Missing metric: No metric for subscription uptime/duration. Consider adding SubscriptionDurationSeconds to track how long the node stays in follow mode before switching.

Missing metric: No metric for catchup lag (how far behind the DA head the node is). This would help operators identify sync issues.


9. Documentation

⚠️ Gaps

No design doc: This is a significant architectural change but lacks documentation explaining the mode switching logic, thresholds, and operational implications.

Incomplete interface documentation: The Subscribe method (interface.go:27-30) doesn't document what happens when the subscription fails or the channel closes.

No migration guide: Existing operators need guidance on how this change affects their nodes and whether any configuration changes are needed.


Summary & Recommendations

Priority Issues to Address

  1. Add ProcessBlobs to DARetriever interface to fix the abstraction violation
  2. Significantly improve test coverage, especially for mode transitions and subscription handling
  3. Clarify LocalHead naming to avoid confusion
  4. Remove dead code (nil channel check, redundant tracking)
  5. Add error accumulation in subscription processing to trigger mode switches on repeated failures
  6. Consolidate caching strategy to eliminate redundancy between AsyncBlockRetriever and existing cache

Nice to Have

  • Make thresholds (catchup threshold, prefetch window, watchdog multiplier) configurable
  • Add catchup lag and subscription duration metrics
  • Document the design and operational implications
  • Add rate limiting for subscription events

Overall Assessment: This is a well-architected feature that addresses a real performance need. The core logic is sound, but the implementation needs better test coverage and some cleanup around interface abstraction and redundant caching. The security and concurrency aspects are generally solid.

CI is currently running with 3 workflows pending. I recommend addressing the interface abstraction issue and test coverage before merging.

@github-actions
Copy link
Contributor

github-actions bot commented Jan 8, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedJan 20, 2026, 11:29 AM

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @tac0turtle, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the data availability (DA) worker's synchronization capabilities by introducing dynamic sync modes. It allows the system to efficiently transition between an aggressive polling strategy when catching up to the DA chain and a more resource-optimized, subscription-based approach once it reaches the chain head. This change aims to improve the overall performance and responsiveness of DA data processing.

Highlights

  • Dynamic Sync Modes: Introduced two dynamic synchronization modes for the DA worker: 'Catchup' for aggressive polling when behind the chain head, and 'Follow' for efficient, subscription-based real-time updates when caught up.
  • Subscription-based Updates: Implemented a new Subscribe method in the DA client, enabling the 'Follow' mode to receive real-time blob notifications, significantly reducing polling overhead.
  • Local Head Tracking: Added a LocalHead method to the DA client and a corresponding RPC endpoint, allowing the syncer to determine its current position relative to the DA chain head and facilitate mode switching.
  • New Metrics for Observability: Integrated new Prometheus metrics (SyncMode, SubscribeErrors, ModeSwitches) to provide visibility into the current sync mode, subscription failures, and transitions between modes.
  • Refactored DA Worker Loop: The daWorkerLoop has been refactored to intelligently determine and switch between 'Catchup' and 'Follow' modes based on the node's synchronization status, including a watchdog mechanism for 'Follow' mode.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a dual-mode synchronization mechanism, allowing the node to switch between an aggressive polling 'catchup' mode and a more efficient subscription-based 'follow' mode. This is a significant enhancement for nodes that are at the head of the chain. The changes are well-structured, introducing new DA client methods, metrics, and the core state machine logic in the daWorkerLoop. My review identified two critical bugs related to incorrect loop variable capturing that could lead to data corruption, and a couple of medium-severity design and style issues. Once these points are addressed, the implementation will be much more robust.

@codecov
Copy link

codecov bot commented Jan 12, 2026

Codecov Report

❌ Patch coverage is 38.48684% with 187 lines in your changes missing coverage. Please review.
✅ Project coverage is 58.47%. Comparing base (140b24a) to head (e706ba6).

Files with missing lines Patch % Lines
block/internal/syncing/syncer.go 47.61% 65 Missing and 1 partial ⚠️
block/internal/da/async_block_retriever.go 0.00% 41 Missing ⚠️
tools/local-da/rpc.go 0.00% 22 Missing ⚠️
block/internal/da/forced_inclusion_retriever.go 74.32% 17 Missing and 2 partials ⚠️
block/internal/common/subscription.go 0.00% 12 Missing ⚠️
block/internal/da/client.go 0.00% 12 Missing ⚠️
block/internal/syncing/da_retriever_tracing.go 0.00% 11 Missing ⚠️
block/internal/da/tracing.go 0.00% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2961      +/-   ##
==========================================
- Coverage   59.26%   58.47%   -0.80%     
==========================================
  Files         108      109       +1     
  Lines       10103    10319     +216     
==========================================
+ Hits         5988     6034      +46     
- Misses       3484     3656     +172     
+ Partials      631      629       -2     
Flag Coverage Δ
combined 58.47% <38.48%> (-0.80%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tac0turtle tac0turtle force-pushed the marko/sync_subscribe branch from 95aeea4 to ecfcf83 Compare January 12, 2026 08:22
@tac0turtle tac0turtle marked this pull request as ready for review January 12, 2026 08:37
@julienrbrt
Copy link
Member

CI is not so glad.

@tac0turtle
Copy link
Contributor Author

CI is not so glad.

fixed

}

// Subscribe to forced inclusion namespace if configured
var forcedInclusionCh <-chan *blobrpc.SubscriptionResponse
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to follow the force included namespace. The retriever itself does the caching itself. Maybe we should align this logic in the force inclusion retriever as well instead of using the async block fetching (in da)

s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process data subscription")
}

case resp, ok := <-forcedInclusionCh:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, this is dead code

daStartHeight uint64
asyncFetcher AsyncBlockRetriever

mu sync.Mutex
Copy link
Member

@julienrbrt julienrbrt Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to track all of this the tracking is done in the retrieve function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

working on some cleanup yea

}

// LocalHead returns the height of the locally synced DA head.
func (c *client) LocalHead(ctx context.Context) (uint64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for me while reading, appears a little confusing localHead in this context.

Is it querying the local node? is the last header that the DA has of my chain? is the last header that the DA layer has synced?

maybe this thought changes once all the review is done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ill amend to make more clear

Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error)

// LocalHead returns the height of the locally synced DA head.
// Used to determine if the node is caught up with the DA layer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, not sure if LocalHead is clear enough.

}

// HandleSubscriptionResponse caches forced inclusion blobs from subscription updates.
func (r *ForcedInclusionRetriever) HandleSubscriptionResponse(resp *blobrpc.SubscriptionResponse) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer if subscribing was internal to the async block retriever, and sequencer can make use of this too.

@tac0turtle tac0turtle marked this pull request as draft January 20, 2026 11:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants