Skip to content

Instantly share code, notes, and snippets.

@esafwan
Created April 28, 2024 12:46
Show Gist options
  • Save esafwan/35ca19e0de44fa64a17e8ec5dc86317c to your computer and use it in GitHub Desktop.
Save esafwan/35ca19e0de44fa64a17e8ec5dc86317c to your computer and use it in GitHub Desktop.

For use in prompt for AI for code related to frappe realtime and websocket

Realtime (socket.io) Frappe ships with an API for realtime events based on socket.io. Since socket.io needs a Node server to run, we run a Node process in parallel to the main web server.

Client APIs (JavaScript) frappe.realtime.on To listen to realtime events on the client (browser), you can use the frappe.realtime.on method:

frappe.realtime.on('event_name', (data) => { console.log(data) }) frappe.realtime.off Stop listening to an event you have subscribed to:

frappe.realtime.off('event_name') Server APIs (Python) frappe.publish_realtime To publish a realtime event from the server, you can use the frappe.publish_realtime method:

frappe.publish_realtime('event_name', data={'key': 'value'}) frappe.publish_progress You can use this method to show a progress bar in a dialog:

frappe.publish_progress(25, title='Some title', description='Some description')

Custom Event Handlers Note: This feature is only available in nightly (v16) version. This feature is considered experimental.

You can implement custom real-time event handlers by creating a handlers.js file in real-time folder of your app.

You need to specify a single export from this file - a function that will setup event handlers on the socket instance. E.g. if you are developing an app called "chat", here's how the file should look like

// bench/apps/chat/realtime/handlers.js

function chat_app_handlers(socket) { socket.on("hello_chat", () => { console.log("hello world!"); }); }

module.exports = chat_app_handlers;

You can trigger this event using client side code frappe.realtime.emit("hello_chat"). Note that you might have to restart socketio server to see effect of code changes. Refer SocketIO documentation to learn more about writing custom event handlers: https://socket.io/docs/v4/

Custom Client You can write a custom client to connect to socket.io server if you're developing a SPA or a mobile app that doesn't use Desk interface. You can refer to official socket.io documentation for building your custom client: https://socket.io/docs/v4/client-initialization/

Here are some examples of custom clients:

https://github.com/frappe/gameplan/blob/9f9332cf29496afe5e912e4f1734fbf1142cb18c/frontend/src/socket.js#L13

https://github.com/frappe/frappe/blob/8093a1d0a54900fe4b43b01ae6ffc0adf855da43/frappe/public/js/frappe/socketio_client.js#L49

Authorization in custom clients There are two ways to authenticate a connection with socket.io server:

Cookies - if your custom client is in a browser-like environment then your connection will automatically send cookies and socketio server will be able to authenticate using the cookies.

Authorization header: if cookies are not available in your environment like mobile apps then you can use Authorization headers just like API requests. Refer REST API authentication documentation to understand this: https://frappeframework.com/docs/user/en/api/rest#authentication and https://socket.io/docs/v4/client-options/#extraheaders

Implementation Notes Realtime server uses socket.io server. The server is written in node.js and can be found in /realtime directory.

Realtime client is wrapper around socket.io client library and can be found in public/js/frappe/socketio_client.js.

Python processes publish events to node server using Redis pub-sub channel. Realtime server subscribes to the Redis channel and publishes it to all subscribed clients.

Realtime server is multi-tenant, all site-related traffic is namespaced by sitename. Namespaces are dynamically created using /{sitename} format where sitename is name of site's folder in sites directory or frappe.local.site.

Realtime server uses main Frappe web server to authenticate connections. SID cookie or authorization header are passed to client and realtime server uses it to ensure that the connection is from a valid user and can subscribe to certain DocType or documents based on permissions.

Realtime implementation allows the following rooms:

all - Room accessible and connected by default to all System Users.

website - Room accessible and connected by any user, even Guests.

user:{username} - Dynamic room created for each user. Allowed without any permission checks.

doctype:{doctype} - Dynamic room created for each DocType. Only user with permission to the DocType can join said room. User is automatically subscribed to this room when they open list view or form view of the document.

doc:{doctype}/{name} - Dynamic room created for each document. Only user with permission to the document room can join said room. User is automatically subscribed to this room when they form view of the document.

PY

frappe/frappe/realtime.py

# Copyright (c) 2015, Frappe Technologies Pvt. Ltd. and contributors
# License: MIT. See LICENSE

from contextlib import suppress

import redis

import frappe
from frappe.utils.data import cstr


def publish_progress(percent, title=None, doctype=None, docname=None, description=None, task_id=None):
  publish_realtime(
    "progress",
    {"percent": percent, "title": title, "description": description},
    user=None if doctype and docname else frappe.session.user,
    doctype=doctype,
    docname=docname,
    task_id=task_id,
  )


def publish_realtime(
  event: str = None,
  message: dict = None,
  room: str = None,
  user: str = None,
  doctype: str = None,
  docname: str = None,
  task_id: str = None,
  after_commit: bool = False,
):
  """Publish real-time updates

  :param event: Event name, like `task_progress` etc. that will be handled by the client (default is `task_progress` if within task or `global`)
  :param message: JSON message object. For async must contain `task_id`
  :param room: Room in which to publish update (default entire site)
  :param user: Transmit to user
  :param doctype: Transmit to doctype, docname
  :param docname: Transmit to doctype, docname
  :param after_commit: (default False) will emit after current transaction is committed"""
  if message is None:
    message = {}

  if not task_id and hasattr(frappe.local, "task_id"):
    task_id = frappe.local.task_id

  if event is None:
    event = "task_progress" if task_id else "global"
  elif event == "msgprint" and not user:
    user = frappe.session.user
  elif event == "list_update":
    doctype = doctype or message.get("doctype")
    room = get_doctype_room(doctype)
  elif event == "docinfo_update":
    room = get_doc_room(doctype, docname)

  if not room:
    if task_id:
      after_commit = False
      if "task_id" not in message:
        message["task_id"] = task_id
      room = get_task_progress_room(task_id)
    elif user:
      # transmit to specific user: System, Website or Guest
      room = get_user_room(user)
    elif doctype and docname:
      room = get_doc_room(doctype, docname)
    else:
      # This will be broadcasted to all Desk users
      room = get_site_room()

  if after_commit:
    if not hasattr(frappe.local, "_realtime_log"):
      frappe.local._realtime_log = []
      frappe.db.after_commit.add(flush_realtime_log)
      frappe.db.after_rollback.add(clear_realtime_log)

    params = [event, message, room]
    if params not in frappe.local._realtime_log:
      frappe.local._realtime_log.append(params)
  else:
    emit_via_redis(event, message, room)


def flush_realtime_log():
  for args in frappe.local._realtime_log:
    frappe.realtime.emit_via_redis(*args)

  clear_realtime_log()


def clear_realtime_log():
  if hasattr(frappe.local, "_realtime_log"):
    del frappe.local._realtime_log


def emit_via_redis(event, message, room):
  """Publish real-time updates via redis

  :param event: Event name, like `task_progress` etc.
  :param message: JSON message object. For async must contain `task_id`
  :param room: name of the room"""
  from frappe.utils.background_jobs import get_redis_connection_without_auth

  with suppress(redis.exceptions.ConnectionError):
    r = get_redis_connection_without_auth()
    r.publish(
      "events",
      frappe.as_json(
        {"event": event, "message": message, "room": room, "namespace": frappe.local.site}
      ),
    )


@frappe.whitelist(allow_guest=True)
def can_subscribe_doc(doctype: str, docname: str) -> bool:
  from frappe.exceptions import PermissionError

  if not frappe.has_permission(doctype=doctype, doc=docname, ptype="read"):
    raise PermissionError()

  return True


@frappe.whitelist(allow_guest=True)
def can_subscribe_doctype(doctype: str) -> bool:
  from frappe.exceptions import PermissionError

  if not frappe.has_permission(doctype=doctype, ptype="read"):
    raise PermissionError()

  return True


@frappe.whitelist(allow_guest=True)
def get_user_info():
  return {
    "user": frappe.session.user,
    "user_type": frappe.session.data.user_type,
  }


def get_doctype_room(doctype):
  return f"doctype:{doctype}"


def get_doc_room(doctype, docname):
  return f"doc:{doctype}/{cstr(docname)}"


def get_user_room(user):
  return f"user:{user}"


def get_site_room():
  return "all"


def get_task_progress_room(task_id):
  return f"task_progress:{task_id}"


def get_website_room():
  return "website"

frappe/frappe/init.py code:

#Realtime realted code alone copied from __init__.py
# frappe/frappe/__init__.py

import frappe

__version__ = "16.0.0-dev"
__title__ = "Frappe Framework"

def local_cache(namespace, key, generator, regenerate_if_none=False):
    """A key value store for caching within a request

    :param namespace: frappe.local.cache[namespace]
    :param key: frappe.local.cache[namespace][key] used to retrieve value
    :param generator: method to generate a value if not found in store

    """
    if namespace not in frappe.local.cache:
        frappe.local.cache[namespace] = {}

    if key not in frappe.local.cache[namespace]:
        frappe.local.cache[namespace][key] = generator()

    elif frappe.local.cache[namespace][key] is None and regenerate_if_none:
        # if key exists but the previous result was None
        frappe.local.cache[namespace][key] = generator()

    return frappe.local.cache[namespace][key]

def enqueue(*args, **kwargs):
    """
    Enqueue method to be executed using a background worker

    :param method: method string or method object
    :param queue: (optional) should be either long, default or short
    :param timeout: (optional) should be set according to the functions
    :param event: this is passed to enable clearing of jobs from queues
    :param is_async: (optional) if is_async=False, the method is executed immediately, else via a worker
    :param job_name: (optional) can be used to name an enqueue call, which can be used to prevent duplicate calls
    :param kwargs: keyword arguments to be passed to the method
    """
    import frappe.utils.background_jobs
    return frappe.utils.background_jobs.enqueue(*args, **kwargs)

def task(**task_kwargs):
    def decorator_task(f):
        f.enqueue = lambda **fun_kwargs: enqueue(f, **task_kwargs, **fun_kwargs)
        return f
    return decorator_task

def enqueue_doc(*args, **kwargs):
    """
    Enqueue method to be executed using a background worker

    :param doctype: DocType of the document on which you want to run the event
    :param name: Name of the document on which you want to run the event
    :param method: method string or method object
    :param queue: (optional) should be either long, default or short
    :param timeout: (optional) should be set according to the functions
    :param kwargs: keyword arguments to be passed to the method
    """
    import frappe.utils.background_jobs
    return frappe.utils.background_jobs.enqueue_doc(*args, **kwargs)

def publish_progress(*args, **kwargs):
    """Show the user progress for a long request

    :param percent: Percent progress
    :param title: Title
    :param doctype: Optional, for document type
    :param docname: Optional, for document name
    :param description: Optional description
    """
    import frappe.realtime
    return frappe.realtime.publish_progress(*args, **kwargs)

def publish_realtime(*args, **kwargs):
    """Publish real-time updates

    :param event: Event name, like `task_progress` etc.
    :param message: JSON message object. For async must contain `task_id`
    :param room: Room in which to publish update (default entire site)
    :param user: Transmit to user
    :param doctype: Transmit to doctype, docname
    :param docname: Transmit to doctype, docname
    :param after_commit: (default False) will emit after current transaction is committed
    """
    import frappe.realtime
    return frappe.realtime.publish_realtime(*args, **kwargs)

NODE:

frappe/realtime/index.js - node code:

const { Server } = require("socket.io");

const { get_conf, get_redis_subscriber } = require("../node_utils");
const conf = get_conf();

let io = new Server({
  cors: {
    // Should be fine since we are ensuring whether hostname and origin are same before adding setting listeners for s socket
    origin: true,
    credentials: true,
  },
  cleanupEmptyChildNamespaces: true,
});

// Multitenancy implementation.
// allow arbitrary sitename as namespaces
// namespaces get validated during authentication.
const realtime = io.of(/^\/.*$/);

// load and register middlewares
const authenticate = require("./middlewares/authenticate");
realtime.use(authenticate);
// =======================

// load and register handlers
const frappe_handlers = require("./handlers/frappe_handlers");
function on_connection(socket) {
  frappe_handlers(realtime, socket);

  // ESBUild "open in editor" on error
  socket.on("open_in_editor", async (data) => {
    await subscriber.connect();
    subscriber.publish("open_in_editor", JSON.stringify(data));
  });
}

realtime.on("connection", on_connection);
// =======================

// Consume events sent from python via redis pub-sub channel.
const subscriber = get_redis_subscriber();

(async () => {
  await subscriber.connect();
  subscriber.subscribe("events", (message) => {
    message = JSON.parse(message);
    let namespace = "/" + message.namespace;
    if (message.room) {
      io.of(namespace).to(message.room).emit(message.event, message.message);
    } else {
      // publish to ALL sites only used for things like build event.
      realtime.emit(message.event, message.message);
    }
  });
})();
// =======================

let port = conf.socketio_port;
io.listen(port);
console.log("Realtime service listening on: ", port);

frappe/realtime/utils.js

const { get_conf } = require("../node_utils");
const conf = get_conf();
const request = require("superagent");

function get_url(socket, path) {
  if (!path) {
    path = "";
  }
  let url = socket.request.headers.origin;
  if (conf.developer_mode) {
    let [protocol, host, port] = url.split(":");
    if (port != conf.webserver_port) {
      port = conf.webserver_port;
    }
    url = `${protocol}:${host}:${port}`;
  }
  return url + path;
}

// Authenticates a partial request created using superagent
function frappe_request(path, socket) {
  const partial_req = request.get(get_url(socket, path));
  if (socket.sid) {
    return partial_req.query({ sid: socket.sid });
  } else if (socket.authorization_header) {
    return partial_req.set("Authorization", socket.authorization_header);
  }
}

module.exports = {
  get_url,
  frappe_request,
};

frappe/realtime/handlers/frappe_handlers.js code:

const { frappe_request } = require("../utils");
const log = console.log;

const WEBSITE_ROOM = "website";
const SITE_ROOM = "all";

function frappe_handlers(realtime, socket) {
  socket.join(user_room(socket.user));
  socket.join(WEBSITE_ROOM);

  if (socket.user_type == "System User") {
    socket.join(SITE_ROOM);
  }

  socket.on("doctype_subscribe", function (doctype) {
    can_subscribe_doctype({
      socket,
      doctype,
      callback: () => {
        socket.join(doctype_room(doctype));
      },
    });
  });

  socket.on("doctype_unsubscribe", function (doctype) {
    socket.leave(doctype_room(doctype));
  });

  socket.on("task_subscribe", function (task_id) {
    const room = task_room(task_id);
    socket.join(room);
  });

  socket.on("task_unsubscribe", function (task_id) {
    const room = task_room(task_id);
    socket.leave(room);
  });

  socket.on("progress_subscribe", function (task_id) {
    const room = task_room(task_id);
    socket.join(room);
  });

  socket.on("doc_subscribe", function (doctype, docname) {
    can_subscribe_doc({
      socket,
      doctype,
      docname,
      callback: () => {
        let room = doc_room(doctype, docname);
        socket.join(room);
      },
    });
  });

  socket.on("doc_unsubscribe", function (doctype, docname) {
    let room = doc_room(doctype, docname);
    socket.leave(room);
  });

  socket.on("doc_open", function (doctype, docname) {
    can_subscribe_doc({
      socket,
      doctype,
      docname,
      callback: () => {
        let room = open_doc_room(doctype, docname);
        socket.join(room);
        if (!socket.subscribed_documents) socket.subscribed_documents = [];
        socket.subscribed_documents.push([doctype, docname]);

        // show who is currently viewing the form
        notify_subscribed_doc_users({
          socket: socket,
          doctype: doctype,
          docname: docname,
        });
      },
    });
  });

  socket.on("doc_close", function (doctype, docname) {
    // remove this user from the list of 'who is currently viewing the form'
    let room = open_doc_room(doctype, docname);
    socket.leave(room);

    if (socket.subscribed_documents) {
      socket.subscribed_documents = socket.subscribed_documents.filter(([dt, dn]) => {
        !(dt == doctype && dn == docname);
      });
    }

    notify_subscribed_doc_users({
      socket: socket,
      doctype: doctype,
      docname: docname,
    });
  });

  socket.on("disconnect", () => {
    notify_disconnected_documents(socket);
  });
}

function notify_disconnected_documents(socket) {
  if (socket.subscribed_documents) {
    socket.subscribed_documents.forEach(([doctype, docname]) => {
      notify_subscribed_doc_users({ socket, doctype, docname });
    });
  }
}

function can_subscribe_doctype(args) {
  if (!args) return;
  if (!args.doctype) return;
  frappe_request("/api/method/frappe.realtime.can_subscribe_doctype", args.socket)
    .type("form")
    .query({
      doctype: args.doctype,
    })
    .end(function (err, res) {
      if (!res || res.status == 403 || err) {
        if (err) {
          log(err);
        }
        return false;
      } else if (res.status == 200) {
        args.callback && args.callback(err, res);
        return true;
      }
      log("ERROR (can_subscribe_doctype): ", err, res);
    });
}

function notify_subscribed_doc_users(args) {
  if (!(args && args.doctype && args.docname)) {
    return;
  }
  const socket = args.socket;
  const room = open_doc_room(args.doctype, args.docname);

  const clients = Array.from(socket.nsp.adapter.rooms.get(room) || []);

  let users = [];

  socket.nsp.sockets.forEach((sock) => {
    if (clients.includes(sock.id)) {
      users.push(sock.user);
    }
  });

  // dont send update to self. meaningless.
  if (users.length == 1 && users[0] == args.socket.user) return;

  // notify
  socket.nsp.to(room).emit("doc_viewers", {
    doctype: args.doctype,
    docname: args.docname,
    users: Array.from(new Set(users)),
  });
}

function can_subscribe_doc(args) {
  if (!args) return;
  if (!args.doctype || !args.docname) return;
  frappe_request("/api/method/frappe.realtime.can_subscribe_doc", args.socket)
    .type("form")
    .query({
      doctype: args.doctype,
      docname: args.docname,
    })
    .end(function (err, res) {
      if (!res) {
        log("No response for doc_subscribe");
      } else if (res.status == 403) {
        return;
      } else if (err) {
        log(err);
      } else if (res.status == 200) {
        args.callback(err, res);
      } else {
        log("Something went wrong", err, res);
      }
    });
}

const doc_room = (doctype, docname) => "doc:" + doctype + "/" + docname;
const open_doc_room = (doctype, docname) => "open_doc:" + doctype + "/" + docname;
const user_room = (user) => "user:" + user;
const doctype_room = (doctype) => "doctype:" + doctype;
const task_room = (task_id) => "task_progress:" + task_id;

module.exports = frappe_handlers;

frappe/realtime/middlewares/authenticate.js

const cookie = require("cookie");
const request = require("superagent");
const { get_url } = require("../utils");

const { get_conf } = require("../../node_utils");
const conf = get_conf();

function authenticate_with_frappe(socket, next) {
  let namespace = socket.nsp.name;
  namespace = namespace.slice(1, namespace.length); // remove leading `/`

  if (namespace != get_site_name(socket)) {
    next(new Error("Invalid namespace"));
  }

  if (get_hostname(socket.request.headers.host) != get_hostname(socket.request.headers.origin)) {
    next(new Error("Invalid origin"));
    return;
  }

  if (!socket.request.headers.cookie) {
    next(new Error("No cookie transmitted."));
    return;
  }

  let cookies = cookie.parse(socket.request.headers.cookie || "");
  let authorization_header = socket.request.headers.authorization;

  if (!cookies.sid && !authorization_header) {
    next(new Error("No authentication method used. Use cookie or authorization header."));
    return;
  }

  let auth_req = request.get(get_url(socket, "/api/method/frappe.realtime.get_user_info"));
  if (cookies.sid) {
    auth_req = auth_req.query({ sid: cookies.sid });
  } else {
    auth_req = auth_req.set("Authorization", authorization_header);
  }

  auth_req
    .type("form")
    .then((res) => {
      socket.user = res.body.message.user;
      socket.user_type = res.body.message.user_type;
      socket.sid = cookies.sid;
      socket.authorization_header = authorization_header;
      next();
    })
    .catch((e) => {
      next(new Error(`Unauthorized: ${e}`));
    });
}

function get_site_name(socket) {
  if (socket.site_name) {
    return socket.site_name;
  } else if (socket.request.headers["x-frappe-site-name"]) {
    socket.site_name = get_hostname(socket.request.headers["x-frappe-site-name"]);
  } else if (
    conf.default_site &&
    ["localhost", "127.0.0.1"].indexOf(get_hostname(socket.request.headers.host)) !== -1
  ) {
    socket.site_name = conf.default_site;
  } else if (socket.request.headers.origin) {
    socket.site_name = get_hostname(socket.request.headers.origin);
  } else {
    socket.site_name = get_hostname(socket.request.headers.host);
  }
  return socket.site_name;
}

function get_hostname(url) {
  if (!url) return undefined;
  if (url.indexOf("://") > -1) {
    url = url.split("/")[2];
  }
  return url.match(/:/g) ? url.slice(0, url.indexOf(":")) : url;
}

module.exports = authenticate_with_frappe;

Browser Front-end

frappe/frappe/public/js/frappe/socketio_client.js

import { io } from "socket.io-client";

frappe.provide("frappe.realtime");

class RealTimeClient {
  constructor() {
    this.open_tasks = {};
    this.open_docs = new Set();
  }

  on(event, callback) {
    if (this.socket) {
      this.connect();
      this.socket.on(event, callback);
    }
  }

  off(event, callback) {
    if (this.socket) {
      this.socket.off(event, callback);
    }
  }

  connect() {
    if (this.lazy_connect) {
      this.socket.connect();
      this.lazy_connect = false;
    }
  }

  emit(event, ...args) {
    this.connect();
    this.socket.emit(event, ...args);
  }

  init(port = 9000, lazy_connect = false) {
    if (frappe.boot.disable_async) {
      return;
    }

    if (this.socket) {
      return;
    }
    this.lazy_connect = lazy_connect;
    let me = this;

    // Enable secure option when using HTTPS
    if (window.location.protocol == "https:") {
      this.socket = io(this.get_host(port), {
        secure: true,
        withCredentials: true,
        reconnectionAttempts: 3,
        autoConnect: !lazy_connect,
      });
    } else if (window.location.protocol == "http:") {
      this.socket = io(this.get_host(port), {
        withCredentials: true,
        reconnectionAttempts: 3,
        autoConnect: !lazy_connect,
      });
    }

    if (!this.socket) {
      console.log("Unable to connect to " + this.get_host(port));
      return;
    }

    this.socket.on("msgprint", function (message) {
      frappe.msgprint(message);
    });

    this.socket.on("progress", function (data) {
      if (data.progress) {
        data.percent = (flt(data.progress[0]) / data.progress[1]) * 100;
      }
      if (data.percent) {
        frappe.show_progress(
          data.title || __("Progress"),
          data.percent,
          100,
          data.description,
          true
        );
      }
    });

    this.setup_listeners();

    $(document).on("form-load form-rename", function (e, frm) {
      if (!frm.doc || frm.is_new()) {
        return;
      }
      me.doc_subscribe(frm.doctype, frm.docname);
    });

    $(document).on("form-refresh", function (e, frm) {
      if (!frm.doc || frm.is_new()) {
        return;
      }
      me.doc_open(frm.doctype, frm.docname);
    });

    $(document).on("form-unload", function (e, frm) {
      if (!frm.doc || frm.is_new()) {
        return;
      }

      me.doc_close(frm.doctype, frm.docname);
    });
  }

  get_host(port = 3000) {
    let host = window.location.origin;
    if (window.dev_server) {
      let parts = host.split(":");
      port = frappe.boot.socketio_port || port.toString() || "3000";
      if (parts.length > 2) {
        host = parts[0] + ":" + parts[1];
      }
      host = host + ":" + port;
    }
    return host + `/${frappe.boot.sitename}`;
  }

  subscribe(task_id, opts) {
    this.emit("task_subscribe", task_id);
    this.emit("progress_subscribe", task_id);

    this.open_tasks[task_id] = opts;
  }
  task_subscribe(task_id) {
    this.emit("task_subscribe", task_id);
  }
  task_unsubscribe(task_id) {
    this.emit("task_unsubscribe", task_id);
  }
  doctype_subscribe(doctype) {
    this.emit("doctype_subscribe", doctype);
  }
  doctype_unsubscribe(doctype) {
    this.emit("doctype_unsubscribe", doctype);
  }
  doc_subscribe(doctype, docname) {
    if (frappe.flags.doc_subscribe) {
      console.log("throttled");
      return;
    }
    if (this.open_docs.has(`${doctype}:${docname}`)) {
      return;
    }

    frappe.flags.doc_subscribe = true;

    // throttle to 1 per sec
    setTimeout(function () {
      frappe.flags.doc_subscribe = false;
    }, 1000);

    this.emit("doc_subscribe", doctype, docname);
    this.open_docs.add(`${doctype}:${docname}`);
  }
  doc_unsubscribe(doctype, docname) {
    this.emit("doc_unsubscribe", doctype, docname);
    return this.open_docs.delete(`${doctype}:${docname}`);
  }
  doc_open(doctype, docname) {
    this.emit("doc_open", doctype, docname);
  }
  doc_close(doctype, docname) {
    this.emit("doc_close", doctype, docname);
  }
  setup_listeners() {
    this.socket.on("task_status_change", function (data) {
      this.process_response(data, data.status.toLowerCase());
    });
    this.socket.on("task_progress", function (data) {
      this.process_response(data, "progress");
    });
  }
  process_response(data, method) {
    if (!data) {
      return;
    }

    // success
    let opts = this.open_tasks[data.task_id];
    if (opts[method]) {
      opts[method](data);
    }

    // "callback" is std frappe term
    if (method === "success") {
      if (opts.callback) opts.callback(data);
    }

    // always
    frappe.request.cleanup(opts, data);
    if (opts.always) {
      opts.always(data);
    }

    // error
    if (data.status_code && data.status_code > 400 && opts.error) {
      opts.error(data);
    }
  }

  publish(event, message) {
    if (this.socket) {
      this.emit(event, message);
    }
  }
}

frappe.realtime = new RealTimeClient();

// backward compatibility
frappe.socketio = frappe.realtime;

Example from another frappe app gameplan:

gameplan/frontend/src/socket.js

import { io } from 'socket.io-client'
import { socketio_port } from '../../../../sites/common_site_config.json'
import { getCachedListResource } from 'frappe-ui/src/resources/listResource'
import { getCachedResource } from 'frappe-ui/src/resources/resources'

export function initSocket() {
  let host = window.location.hostname
  let siteName = window.site_name
  let port = window.location.port ? `:${socketio_port}` : ''
  let protocol = port ? 'http' : 'https'
  let url = `${protocol}://${host}${port}/${siteName}`

  let socket = io(url, {
    withCredentials: true,
    reconnectionAttempts: 5,
  })
  socket.on('refetch_resource', (data) => {
    if (data.cache_key) {
      let resource =
        getCachedResource(data.cache_key) ||
        getCachedListResource(data.cache_key)
      if (resource) {
        resource.reload()
      }
    }
  })
  return socket
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment