Skip to content

fix: use the closed window in the eof response#360

Open
vaibhavtiwari33 wants to merge 2 commits into
mainfrom
accumulator-key-tracking-eof
Open

fix: use the closed window in the eof response#360
vaibhavtiwari33 wants to merge 2 commits into
mainfrom
accumulator-key-tracking-eof

Conversation

@vaibhavtiwari33

@vaibhavtiwari33 vaibhavtiwari33 commented Jun 16, 2026

Copy link
Copy Markdown
Contributor

Based off of numaproj/numaflow-rs#177

Testing

Updated the stream sorter example's handler to the following:

    async def handler(
        self,
        datums: AsyncIterable[Datum],
        output: NonBlockingIterator,
    ):
        _LOGGER.info("StreamSorter handler started")
        async for datum in datums:
            _LOGGER.info(
                f"Received datum with event time: {datum.event_time}, "
                f"Current latest watermark: {self.latest_wm}, "
                f"Datum watermark: {datum.watermark}"
            )

Created a jitter source, which pauses emission of events with certain keys for 45 seconds. Total 5 keys in the system. Timeout set for the accumulator is 30s

Logged into stream sorter pod to track disk usage across 4 hours (I should've probably installed prometheus and metrics server to gather this detail in my local cluster but I was too lazy to do that for some reason) :

Disk usage

root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
15M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
11M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
14M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
17M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
17M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
29M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
29M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
29M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
23M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
11M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
30M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
27M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
31M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
31M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
18M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
23M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
27M     .
root@jitter-stream-sorter-sorter-0:/var/numaflow/pbq# du -sh
13M     .

Memory usage

root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
29995008
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
27566080
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
33546240
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
35954688
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
45420544
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
45256704
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
38699008
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
39211008
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
44568576
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
50184192
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
51757056
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
52154368
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
41869312
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
46641152
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
46379008
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
51281920
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
33849344
root@jitter-stream-sorter-sorter-0:/# cat /sys/fs/cgroup/memory.current
37580800

Overall values seem mostly consistent across time.

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@codecov

codecov Bot commented Jun 16, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 87.87879% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 92.68%. Comparing base (e3d3afd) to head (e6434ba).

Files with missing lines Patch % Lines
...kages/pynumaflow/pynumaflow/accumulator/_dtypes.py 75.00% 1 Missing and 1 partial ⚠️
.../pynumaflow/accumulator/servicer/async_servicer.py 77.77% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #360      +/-   ##
==========================================
- Coverage   92.74%   92.68%   -0.06%     
==========================================
  Files          67       67              
  Lines        3514     3540      +26     
  Branches      228      232       +4     
==========================================
+ Hits         3259     3281      +22     
- Misses        190      192       +2     
- Partials       65       67       +2     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Vaibhav Tiwari <vaibhav.tiwari33@gmail.com>
@vaibhavtiwari33 vaibhavtiwari33 enabled auto-merge (squash) June 18, 2026 03:23
generator_response = accumulator_stub.AccumulateFn(
request_iterator=request_generator(count=5, request=request, send_close=False)
)
except grpc.RpcError as e:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't ignore these errors. If there's an error test should fail right?

_result_queue: NonBlockingIterator
_consumer_future: Task
_latest_watermark: datetime
_close_window: KeyedWindow | None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do like this:

_close_window: KeyedWindow | None = field(default=None, init=False)

This value is not known/set at initialization of AccumulatorResult right? With above, it's not part of the constructor.

"""
return self._close_window

def set_close_window(self, window: KeyedWindow):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close_window is already a read-only property. Let's make this as well property setter ?

@close_window.setter
def close_window(self, window: KeyedWindow):
    if not isinstance(window, KeyedWindow):
        raise TypeError("window must be a KeyedWindow object")
    self._close_window = window

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants