Socket.io, Node.js and rethinkdb

I am using a simple application in Socket.io, Node.js and rethinkdb. On the server, my server.js file runs for about 20 minutes. After about 20 minutes, my code stops. my server .js:

var express = require('express'),
path = require('path'),
http = require('http'),
io = require('socket.io'),
r = require('rethinkdb');


var app = express();

app.configure(function() {
app.set('port', process.env.PORT || 3000);
app.use(express.logger('dev'));
app.use(express.bodyParser())
app.use(express.static(path.join(__dirname, 'public')));
});

var server = http.createServer(app);
io = io.listen(server);


server.listen(app.get('port'), function() {
console.log("Express server listening on port " + app.get('port'));
});

var count = 0;
var say = 0;
io.sockets.on('connection', function(socket) {

count++;
socket.on('message', function(message) {
    url = message;
    //socket.join(url);
    r.connect({
        host: 'localhost',
        port: 28015
    }, function(err, conn) {
        if (err) throw err;
        r.db('test').table('online').insert({
            socket_id: socket.id,
            news_id: message
        }).run(conn, function(err) {
            if (err) throw err;
            r.db('test').table("online").filter({
                news_id: message
            }).count().run(conn, function(err, res) {
                if (err) throw err;
                say = res;


                console.log("connect say" + say);
                // ip = socket.handshake.address.address;              
                io.sockets.emit('pageview', {
                    'say': say,
                    'count': count,
                    'news_id': message
                });

            });

        });




    });


});



socket.on('disconnect', function() {

    var news_id = 0;
    console.log("Socket disconnected:" + socket.id);

    r.connect({
        host: 'localhost',
        port: 28015
    }, function(err, conn) {

        if (err) throw err;

        r.db('test').table("online").filter({
            socket_id: socket.id
        })("news_id").run(conn).then(function(cursor) {


            cursor.each(function(err, item) {

                news_id = item;


                r.db('test').table("online").filter({
                    socket_id: socket.id
                }).delete().run(conn, function() {

                    r.db('test').table("online").filter({
                        news_id: news_id
                    }).count().run(conn, function(err, res) {
                        if (err) throw err;
                        say = res;
                        count--;
                        console.log("disconnect say" + say);
                        io.sockets.emit('pageview', {
                            'say': say,
                            'count': count,
                            'news_id': news_id
                        });

                    });

                });


                //console.log("news_id"+news_id);




            });
        });




    });




});

});

      

my client.js:

 var socket = io.connect("http://192.168.1.198:3000");


    socket.on('connect', function () {

        console.log('Socket connected');
        socket.send(news_id);
        socket.on('pageview', function (msg) {


            if(msg.news_id == news_id)
                        {
                                //$('.count_icon').html(msg.say);
                                console.log(msg.say);

                        }


        });

    });

      

While server.js is running I am getting this error:

Unhandled rejection RqlDriverError: First argument to `run` must be an open connection.
at new RqlDriverError (/root/node_modules/rethinkdb/errors.js:14:13)
at Bracket.TermBase.run (/root/node_modules/rethinkdb/ast.js:129:29)
at /home/node1/server.js:82:90
at tryCatcher (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/util.js:24:31)
at Promise.errorAdapter (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/nodeify.js:35:34)
at Promise._settlePromiseAt (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/promise.js:528:21)
at Promise._settlePromises (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/promise.js:646:14)
at Async._drainQueue (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/async.js:177:16)
at Async._drainQueues (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/async.js:187:10)
at Async.drainQueues (/root/node_modules/rethinkdb/node_modules/bluebird/js/main/async.js:15:14)
at process._tickCallback (node.js:442:13)

      

+3


source to share


1 answer


The problem is that you are opening too many connections and RethinkDB may be closing some of those connections. You can add logic to close these connections every time you open them, you can use one global connection, or you can use something like rethinkdbdash that handles this for you using a connection pool.

This is where your code looks like using a global connection (and using promises).



var express = require('express'),
path = require('path'),
http = require('http'),
io = require('socket.io'),
r = require('rethinkdb');


var app = express();

app.configure(function() {
    app.set('port', process.env.PORT || 3000);
    app.use(express.logger('dev'));
    app.use(express.bodyParser())
    app.use(express.static(path.join(__dirname, 'public')));
});

var server = http.createServer(app);
io = io.listen(server);

server.listen(app.get('port'), function() {
    console.log("Express server listening on port " + app.get('port'));
});

var count = 0;
var say = 0;
r.connect({
    host: 'localhost',
    port: 28015
})
.then(function (conn) {
    io.sockets.on('connection', function(socket) {

        count++;
        socket.on('message', function(message) {
            url = message;
            //socket.join(url);
            Promise.resolve()
            .then(function() {
                return r.db('test').table('online').insert({
                    socket_id: socket.id,
                    news_id: message
                }).run(conn)
            })
            .then(function() {
                return r.db('test').table("online").filter({
                    news_id: message
                }).count().run(conn);
            })
            .then(function (res) {
                say = res;
                console.log("connect say" + say);
                io.sockets.emit('pageview', {
                    'say': say,
                    'count': count,
                    'news_id': message
                });
            });
        });

        socket.on('disconnect', function() {
            var news_id = 0;
            console.log("Socket disconnected:" + socket.id);
            Promise.resolve()
                .then(function () {
                    return r.db('test').table("online").filter({
                        socket_id: socket.id
                    })("news_id").run(conn);
                })
                .then(function (cursor) {
                    cursor.each(function(err, item) {
                        news_id = item;
                        Promise.resolve()
                            .then(function () {
                                return r.db('test').table("online").filter({
                                    socket_id: socket.id
                                }).delete().run(conn);
                            })
                            .then(function () {
                                return r.db('test').table("online").filter({
                                    news_id: news_id
                                }).count().run(conn);
                            })
                            .then(function (res) {
                                say = res;
                                count--;
                                console.log("disconnect say" + say);
                                io.sockets.emit('pageview', {
                                    'say': say,
                                    'count': count,
                                    'news_id': news_id
                                });
                            });
                    });
                });
        });
    });
})

      

+5


source







All Articles