Breaking a loop in one consumer of Django channels from another consumer

I am creating a web application with Django and Django pipes that use websockets.

When the user clicks a button in the browser, the websocket sends data to my server, and the consumer on the server starts sending messages to the client once a second (in a loop).

I want to create another button that will stop this data transfer process. When the user clicks on this new button, the websocket is sending other data to the server, and the consumer on the server has to somehow stop the loop on the previous user. Also I will require this to stop the loop when the client disconnects.

I felt the urge to use a global variable. But the Django channels documentation states that they strongly discourage the use of globals, as they want the application network to be transparent (not really clear).

I tried using a channel session. I forced the second consumer to update the value in the channel session, but the channel session values ​​were not updated on the first consumer.

Here's a simplified code. Browser:

var socket = new WebSocket("ws://" + window.location.host + "/socket/");
$('#button1').on('click', function() { 
    socket.send(JSON.stringify({action: 'start_getting_values'}))
});
$('#button2').on('click', function() { 
    socket.send(JSON.stringify({action: 'stop_getting_values'}))
});

      

Server consumer:

@channel_session
def ws_message(message):
    text = json.loads(message.content['text'])

    if text['action'] == 'start_getting_values':
        while True:
            # Getting some data here
            # ...
            message.reply_channel.send({"text": some_data}, immediately=True)
            time.sleep(1)

    if text['action'] == 'stop_getting_values':
        do_something_to_stop_the_loop_above()

      

+3


source to share


1 answer


Well, I was able to solve the problem on my own after I contacted the Django Channels developers.

The approach to creating a loop inside a consumer is bad because it blocks the site as soon as the consumer is launched several times, equal to the number of threads all working with that consumer.

So my approach was this: after the consumer receives the "start_getting_values" signal, it adds the current response channel to the group and also increments the value on the Redis server it is connected to (I am using Redis as a channel at the channel level, but it will work for any other server).

What value does it increase? In Redis, I have a key that says "groups" like hash objects. Each key of this key represents a group in channels, and the value represents the number of response channels in that group.

Then I created a new python file where I connected to the same Redis server. In this file, I run an endless loop that loads a dict of key "groups" from Redis. Then I loop through all the keys in this dict (each key represents the name of the channel groups) and broadcasts the data to each group with a non-zero value. When I run this file, it runs as a separate process and thus does not block anything on the consumer side.



To stop broadcasting for a user when I receive the corresponding signal from him, I simply remove him from the group and also decrease the corresponding Redis value.

Consumer code:

import redis

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

@channel_session_user
def ws_message(message):

    text = json.loads(message.content['text'])

    if text['stream'] == 'start_getting_values':
        value_id = text['value_id']
        redis_client.hincrby('redis_some_key', value_id, 1)
        Group(value_id).add(message.reply_channel)
        channel_session['value_id'] = value_id
        return 0

    if text['stream'] == 'stop_getting_values':
        if message.channel_session['value_id'] != '':
            value_id = message.channel_session['value_id']
            Group(value_id).discard(message.reply_channel)

            l = redis_client.lock(name='del_lock')
            val = redis_client.hincrby('redis_some_key', value_id, -1)
            if (val <= 0):
                redis_client.hdel('redis_some_key', value_id)
            l.release()
        return 0

      

Separate python file:

import time
import redis
from threading import Thread
import asgi_redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
cl = asgi_redis.RedisChannelLayer()

def some_action(value_id):

    # getting some data based on value_id
    # ....

    cl.send_group(value_id, {
        "text": some_data,
    })


while True:
    value_ids = redis_client.hgetall('redis_some_key')

    ths = []
    for b_value_id in value_ids.keys():
        value_id = b_value_id.decode("utf-8")
        ths.append(Thread(target=some_action, args=(value_id,)))

    for th in ths:
        th.start()
    for th in ths:
        th.join()


    time.sleep(1)

      

+3


source







All Articles