Skip to content

Fixed race-condition for streambroker.#126

Merged
s3rius merged 1 commit into
mainfrom
bugfix/race-condition
Jun 23, 2026
Merged

Fixed race-condition for streambroker.#126
s3rius merged 1 commit into
mainfrom
bugfix/race-condition

Conversation

@s3rius

@s3rius s3rius commented Jun 23, 2026

Copy link
Copy Markdown
Member

This PR fixes #123 .

I tried multiple approaches and found this one working.

Details
import asyncio

import redis.asyncio as aioredis

STREAM, GROUP = "taskiq_bug", "taskiq"


async def setup():
    r = await aioredis.from_url("redis://localhost:6379")
    await r.delete(STREAM)
    await r.xgroup_create(STREAM, GROUP, id="0", mkstream=True)
    msg_id = await r.xadd(STREAM, {"data": b"test"})
    await r.xreadgroup(GROUP, "worker-seed", {STREAM: ">"}, count=1)
    await r.aclose()
    return msg_id


async def worker(name: str):
    conn = await aioredis.from_url("redis://localhost:6379")
    lock_key = f"autoclaim:{GROUP}:{STREAM}"

    pipe = conn.pipeline()
    lock = pipe.lock(lock_key)
    await lock.acquire()
    pipe.xautoclaim(
        STREAM,
        GROUP,
        name,
        min_idle_time=100,
        count=10,
    )
    await lock.release()
    res = await pipe.execute()
    if res[1][1]:
        msgs = res[1][1]
        print(name, "got", msgs[0][0])

    await conn.aclose()


async def run_once():
    await setup()
    await asyncio.sleep(0.2)
    await asyncio.gather(
        worker("worker-0"),
        worker("worker-1"),
    )


async def main():
    for _ in range(25):
        await run_once()


if __name__ == "__main__":
    asyncio.run(main())

Output:

worker-0 got b'1782170479725-0'
worker-1 got b'1782170479929-0'
worker-0 got b'1782170480133-0'
worker-0 got b'1782170480337-0'
worker-0 got b'1782170480540-0'
worker-0 got b'1782170480744-0'
worker-0 got b'1782170480947-0'
worker-0 got b'1782170481150-0'
worker-0 got b'1782170481354-0'
worker-0 got b'1782170481557-0'
worker-0 got b'1782170481760-0'
worker-0 got b'1782170481963-0'
worker-0 got b'1782170482166-0'
worker-0 got b'1782170482369-0'
worker-1 got b'1782170482572-0'
worker-1 got b'1782170482775-0'
worker-0 got b'1782170482978-0'
worker-0 got b'1782170483180-0'
worker-0 got b'1782170483383-0'
worker-0 got b'1782170483586-0'
worker-0 got b'1782170483790-0'
worker-1 got b'1782170483993-0'
worker-0 got b'1782170484197-0'
worker-0 got b'1782170484402-0'
worker-0 got b'1782170484605-0'

Here's redis monitor output:

1782169992.799451 [0 172.19.0.1:37304] "MULTI"
1782169992.799458 [0 172.19.0.1:37304] "SET" "autoclaim:taskiq:taskiq_bug" "f1015f306e8f11f1ac1603adb8b6c235" "NX"
1782169992.799472 [0 172.19.0.1:37304] "XAUTOCLAIM" "taskiq_bug" "taskiq" "worker-0" "100" "0-0" "COUNT" "10"
1782169992.799474 [0 172.19.0.1:37304] "EVALSHA" "c3f8721cbb97f72bc19e972846bd7aaf91901658" "1" "autoclaim:taskiq:taskiq_bug" "f1015f306e8f11f1ac1603adb8b6c235"
1782169992.799487 [0 lua] "get" "autoclaim:taskiq:taskiq_bug"
1782169992.799491 [0 lua] "del" "autoclaim:taskiq:taskiq_bug"
1782169992.799494 [0 172.19.0.1:37304] "EXEC"
1782169992.799503 [0 172.19.0.1:37316] "MULTI"
1782169992.799506 [0 172.19.0.1:37316] "SET" "autoclaim:taskiq:taskiq_bug" "f1017c546e8f11f1ac1603adb8b6c235" "NX"
1782169992.799509 [0 172.19.0.1:37316] "XAUTOCLAIM" "taskiq_bug" "taskiq" "worker-1" "100" "0-0" "COUNT" "10"
1782169992.799510 [0 172.19.0.1:37316] "EVALSHA" "c3f8721cbb97f72bc19e972846bd7aaf91901658" "1" "autoclaim:taskiq:taskiq_bug" "f1017c546e8f11f1ac1603adb8b6c235"
1782169992.799513 [0 lua] "get" "autoclaim:taskiq:taskiq_bug"
1782169992.799514 [0 lua] "del" "autoclaim:taskiq:taskiq_bug"
1782169992.799515 [0 172.19.0.1:37316] "EXEC"

It seems to do exactly what is needed.

Signed-off-by: Pavel Kirilin <s3riussan@gmail.com>
@s3rius s3rius force-pushed the bugfix/race-condition branch from 0fcfca8 to 5d3d514 Compare June 23, 2026 01:06
@s3rius s3rius merged commit 39c6b7c into main Jun 23, 2026
7 checks passed
@s3rius s3rius deleted the bugfix/race-condition branch June 23, 2026 01:08
@codecov

codecov Bot commented Jun 23, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 0% with 11 lines in your changes missing coverage. Please review.
✅ Project coverage is 92.18%. Comparing base (090a00a) to head (5d3d514).
⚠️ Report is 36 commits behind head on main.

Files with missing lines Patch % Lines
taskiq_redis/redis_broker.py 0.00% 11 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #126      +/-   ##
==========================================
+ Coverage   91.70%   92.18%   +0.48%     
==========================================
  Files           7        8       +1     
  Lines         434      704     +270     
==========================================
+ Hits          398      649     +251     
- Misses         36       55      +19     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[BUG] Race condition inside RedisStreamBroker

1 participant