Skip to content

Does Django Channels enforce single-threaded per connection execution? #2133

@jmurua14

Description

@jmurua14

I have created an application that receives audio every second from a websocket connection and I would like to process the audio in parallel. However, the consumers receive function seems single-threaded. What I mean is that even if I send audio every second the audio processing needs more than 2 seconds and this delays everything. I am using the AsyncWebsocketConsumer consumer class. In the code below I have simulated the audio processing with a 2 second sleep so that you can try i out. I am executing django with daphne (daphne -b 0.0.0.0 -p 8000 proiektua.asgi:application )

consumers.py

import json
from urllib.parse import parse_qs
from channels.generic.websocket import AsyncWebsocketConsumer
from .models import Room, Message
from channels.db import database_sync_to_async
import io
import time
import asyncio
from asgiref.sync import sync_to_async

class ChatConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f"chat_{self.room_name}"
        
        query_params = parse_qs(self.scope["query_string"].decode())
        self.role = query_params.get("role", [""])[0]  # "sender" or "receiver"
        if self.role == "sender":
            print("SENDER")
        elif self.role == "receiver":
            print("RECEIVER")

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        # Accept the WebSocket connection
        await self.accept()

        self.aurreko_denb = time.time()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    async def receive(self, text_data= None, bytes_data= None):
        
        if bytes_data:
            print(f"Time between receive: {time.time() - self.aurreko_denb}")
            self.aurreko_denb = time.time()
            asyncio.create_task(self.process_audio_and_send2(bytes_data)   


    async def process_audio_and_send2(self, bytes_data):
        """ Runs the audio processing without blocking receive """
        transcript = await sync_to_async(self.process_audio)(bytes_data)
        
        # Send the transcription back
        await self.send(text_data=transcript)

    def process_audio(self, audio_data):
        """ Process audio synchronously (e.g., speech-to-text) """
        import time  # Simulate a delay
        time.sleep(2)  # Simulate a slow process (Replace with actual STT)
        return "Transcribed text here"

Logs:

127.0.0.1:43476 - - [06/Feb/2025:09:34:51] "WSCONNECT /ws/chat/1/" - -
Time between receive: 1.0896313190460205
Time between receive: 2.002781391143799
Time between receive: 2.003610134124756
Time between receive: 2.0032858848571777
Time between receive: 2.0033938884735107
Time between receive: 2.0036416053771973
Time between receive: 2.0029494762420654
Time between receive: 2.003593921661377
Time between receive: 2.0041894912719727
127.0.0.1:43476 - - [06/Feb/2025:09:35:08] "WSDISCONNECT /ws/chat/1/" - -

I have tried with asyncio create_task but it doens't work how I want. I would appreciate any help, thanks in advance.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions