Messages
Streams and messages are the standard, unified mechanism that the Qbix platform provides for apps and plugins to manage data. This mechanism enables many common needs to be addressed, including:
- Persistence - Storing data on the client or server.
- Consistency - Having everyone see the same messages posted in the same order
- Realtime - Having updates arrive via sockets in real time for clients currently online
- Subscriptions - Subscribing to receiving notifications to users who are offline when something of interest occurs
- History - Being able to replay what happened in the past in a given stream.
- Efficiency - Send only the minimum information, and refresh data when a client goes back online.
Unlike other technologies like Meteor and Firebase, the Qbix approach is not built around data & sync as the primary concept. Instead, the primary concept for managing data in Qbix is streams & messages. We believe this is more consistent with our focus on facilitating multi-user, "social" experiences.
An overview
Here is an short overview of the Streams and Messages concept, with implications for environments with many users, latency, disconnections, and more.
A stream represents a specific object that users can interact with. Sometimes that can be data, such as your Streams/user/firstName, Streams/user/relationshipStatus, and other times it can be more interactive, like a Streams/chat or Chess/game. Here is the flow for how users interact with streams:
- A client (user agent) issues a request to a Qbix server endpoint (such as PUT, POST or DELETE)
- The server's PHP validates the request, checks if the user has access to certain streams, etc. and if not, throwsUsers_Exception_NotAuthorized
- The PHP does whatever operations were requested
- The PHP may post messages on one or more streams
- Your app's Node.js service then pushes the message out via sockets to all the clients which are currently connected, of users who are participating in the stream
- Those users who are subscribed to be notified about certain types of messages in a stream will be sent offline notifications if none of their clients are currently connected.
- The user will receive an email, sms, or notification on their mobile app, that something occurred. This notification may bring them to a certain page in the app.
The server enforces consistency, making sure the messages are saved in the same order for everyone, as each message is saved with an ordinal which begins with 1 and increments. On the client, the stream.onMessage event provides an easy way to be notified when a message arrives, and behind the scenes, the system automatically makes sure that messages arrive in the right order. The developer doesn't have to worry about building their own logic to handle all the edge cases. The client side developer of an app or reusable tool just subscribes to the right messages and handles them. On the server side, no code even needs to be written, unless someone wants to implement a custom type of stream
As you can see, streams are a bit like files in UNIX, in that they provide a lot of standard functionality out of the box. They are also a bit like objects in Smalltalk or Objective C, except that messages have a slightly different purpose — they are used to reconstruct what operations were done on the stream, and in what order. Multiple users may be connected to the stream, and performing various operations on it (usually via the HTTP REST interface). Operations can include creating / editing / joining / leaving streams, etc. and messages are generated by the server in the course of performing those operations. Although, sometimes a client can also simply request to post a message to a stream (e.g. a chatroom), like so:
stream.post(message, callback);
Consistency
Well-known theoretical results, such as the CAP theorem say that a large system cannot have high levels of Consistency, Availability, and Partition Tolerance. Qbix's answer is to guarantee consistency on the level of a single stream (so that people can e.g. chat in a chatroom), and to let users and app developers make up their own relations between streams across various countries and domains.
For each stream, one given server is the authority regarding the stream's fields, custom attributes, current participants, which users can access it, the messages posted to it, etc. The clients, however, maintain a cached a local representation of the stream and messages posted to it, via objects of type Streams.Stream, Streams.Message, etc. These Javascript objects on the client constitute a given "state" of the stream, which can be updated via messages being pushed from the server, and being processed by handlers on the client side. The idea is that these handlers processing the information in the messages are sufficient to update the local representation of the stream to its subsequent state, without having to refresh the stream after every message. The Streams API on the client makes use of Q.getter and Q.Cache to retrieve and hold a cached representation of server-side data, but it relies on messages from the server to keep this representation up to date. Well, except when it comes to offline support).
Participating
Users can become participants in a stream by calling stream.join() (or the server can add participants with $stream->join()) and then your app's Node.js service will begin sending messages to the user's connected clients in real-time whenever they are posted to the stream. The handlers on the client will then process the messages as they come in, to update the state of the locally cached representation. A user joining a stream results in a Streams/join message being posted to that stream, as well as in the stream being added to the user's private list of streams they are participating in. This list is used by their logged-in clients to know that they should open a socket to the server nodes responsible for each stream, to listen for real-time messages coming in. It is actually itself represented by a Streams/participating stream published by the user, one of the standard set of streams which are auto-created and auto-joined by every new user of the app.
Subscriptions
Participants can also subscribe to offline notifications by calling stream.subscribe() (or the server can subscribe them with $stream->subscribe()) and the app's Node.js service will send them offline notifications whenever a message arrives which matches their subscription filters, but they were not online. This is, in fact, how people are invites are delivered to new and existing users.
Whenever your app's Node.js service is not running, the offline notifications aren't being sent. Keep this in mind if invites to streams aren't arriving, and have a way to restart any Node.js scripts that have crashed for some reason.
You can show the user a standard dialog to manage their subscriptions to a stream, as follows:
Q.Streams.subscriptionDialog( publisherId, streamName, callback );
Creating new streams
You can create new streams in the system by doing the following:
var fields = { type: "Chess/game", // provide at least this title: "Some title" // you can override defaults }; Q.Streams.create(fields, callback);
When the client connects to the server that's supposed to host the stream, it will issue a POST request to the Streams/stream action, which will insert a new Streams_Stream row in the database, configure access and subscriptions to it, etc. all based on the templates already in the database for that type of stream, and the user id of the user that's going to be publishing it. When all that is done, it will post a Streams/created message to the stream.
Retaining and Refreshing streams
With your app's Node.js service up and running, the Qbix platform is able to push messages in real time to the client, which updates the local Javascript objects in the streams cache, and lets your tools and code update the view as they need. But sometimes, the Node.js service is not running, perhaps because it crashed (in which case it should be restarted), or perhaps because the Platform is installed on a cheap/simple host which doesn't support Node.js .
In this case, Qbix isn't able to push updates to the client, but the client Streams.js library also has the ability to pull streams and newly posted messages. Even if sockets are connected, it uses the "pull and refresh" ability whenever, say, you were offline for a while and just reconnected, or you are on a mobile device and switch back to the browser or your WebView-based app. While it was in the background, the web sockets were likely not getting messages. So, in these situations, Q.Streams.refresh() is automatically called (unless you have set Q.Streams.refresh.options.preventAutomatic = true;)
When Q.Streams.refresh() runs, it only refreshes those streams which were retained by the client. You can call stream.retain(key) and stream.release(key) to mark and unmark streams for being refreshed whenever the interface needs to be updated in one batch. When building custom tools, you can of course pass the tool instead of a key, similarly to how it is done with events. When the tool is removed, the stream is released under that key, and — if it isn't being retained under any other key — it won't be refreshed the next time Q.Streams.refresh() runs.
To summarize: under ideal conditions, you have a local representation of the stream in Javascript, messages are being pushed via a socket, and Javascript handlers are processing those messages to, among other things, update the local representation. But, once in a while, a retained stream might simply be refreshed, which typically results in lots of fields being changed and causing the tools observing this stream to refresh also. This might happen because, say, you were offline for a while. Refreshing the stream is better than fetching 500 messages that were posted in the meantime, and trying to "replay" them to "catch up".
Message handlers
Here is how you would implement client-side handlers for other types of messages that are being posted to the stream: The Streams plugin exposes event factories that return events which are triggered when a new message arrives. When designing a new tool, you'd typically add handlers to these events like so:
// given a stream: stream.onMessage(mType).set(callback, tool); // which basically calls this: Streams.Stream.onMessage( publisherId, streamName, messageType ).set(callback, tool); // or for all streams of a certain type: Streams.onMessage( streamType, messageType ).set(callback, tool);
The tool would update its state from the current state of the stream to the new state, depending on the message and the instructions it contains. Messages are guaranteed to arrive in order, so your handlers can rely on it. By the time the event is fired, the stream's messageCount has already been incremented to match the message's ordinal.
Plain Data
So how does Qbix implement mutating and synchronizing plain old data, like meteor and firebase do? This is such a common use case that Qbix already implements a standard way of doing it, using the Streams and Messages method:
Q.Streams.get(publisherId, streamName, function (err) { if (err) return; // We got the Streams.Stream object, // now let's modify some fields this.pendingFields.title = 'foo'; this.pendingFields.content = 'bar'; // let's modify custom attributes this.set('progress', 0.5); this.set('url', 'http://yahoo.com'); // this set pendingFields.attributes // this.fields still contains // current, not pending, values // here is the difference: var oldTitle = this.fields.title; var newTitle = this.pendingFields.title; var oldUrl = this.get('url'); var newUrl = this.get('url', true); // now let's persist it to the datbabase this.save(); // this issues a PUT request to // "Streams/stream" action. });
Meanwhile, various reusable tools, and other places in your app's code may be listening for changes in fields and attributes of certain streams. Here is how they would do it:
var Streams = Q.Streams; var Stream = Streams.Stream; Stream.onUpdated(pubId, streamName, 'url') .set(function (attributes) { var oldValue = this.get('url'); var newValue = attributes.url; // Do what you have to do. // After all the handlers have run, // the stream cache will be automatically // updated so this.get('url') // will return the new value. }); Streams.onUpdated(pubId, streamName) .set(function (attributes, updated, cleared) { // this is called whenever any // custom attributes on a given stream // are either updated or cleared }); Streams.onUpdated(pubId, '') .set(function (attributes, updated, cleared) { // this is called whenever any // custom attributes on any stream // published by pubId // are either updated or cleared }); Streams.onFieldChanged(pubId, streamName, 'icon') .set(function (fields) { // refresh the icon var oldSrc = this.iconUrl('50.png'); myTool.imgCool.src = Streams.iconUrl( fields.icon, '50.png' ); }); Streams.onFieldChanged(pubId, streamName) .set(function (fields, changed) { // this is called whenever any // fields of the stream are changed });
You get the idea. You can implement your own types of streams and your own handlers for various types of messages – chess moves, chatting, file diffs, sorting, adding/removing relations between streams, etc. But when it comes to changing a stream's fields and custom attributes, it's already been implemented. Behind the scenes, a message of type "Streams/changed". is generated in PHP when you call $stream->changed() instead of $stream->save(). Qbix. The Streams.js code already handles this message by firing stream.onFieldChanged(field) stream.onUpdated(attribute) events for each changed field and attribute, before it updates the Streams in the local cache
Offline Support
Multi-user applications typically require a central server, and indeed, each stream has a server which acts as a "single source of truth". From time to time, various users can experience high latency to this server, or be disconnected from it entirely. (Note, by the way, that having a bad connection to the Internet does not rule out hosting streams on a local network!)
In these cases, it would be very useful if users could continue to try to interact with the stream, but have the resulting operations queued on the client in order they are attempted. The Streams JS API does, in fact, queue most operations until the server becomes reachable. For example, if the server was unreachable, the following would result in queued operations. Any callbacks would be invoked right away (though asynchronously), with the last parameter as true to indicate that the operation was, in fact, queued:
stream.post(msg); Q.Streams.Message.post(msg); Q.Streams.create(fields, callback, related); stream.relate(relType, fromPubId, fromName); stream.relateTo(relType, toPubId, toName); stream.pendingFields.content = 'a'; stream.save(callback);
Besides queuing the operations however, the Streams JS API also simulates the posting of messages that would have been generated if the operations had succeeded. It does this by calling the Q.Streams.Message.simulate method.
If you'd like to simulate the server posting a message to a stream (e.g. because you expect it is about to happen), you can use this method yourself:
Q.Streams.Message.simulate({ "type": "Paint/draw", "instructions": ... });
Just to be clear, this method simply simulates, in the client, that a message was successfully posted to a stream on the server, which then "pushed" the message to the client. The reason for simulating this is to trigger all the handlers registered for this message, which will then mutate the local representation of the stream, update the view, etc.
In this way, calling the standard Streams API methods results in the local representation of the stream being updated just as if the server were reachable and this client were the only one interacting with the stream. In most cases, though, we can't actually be sure that other clients aren't also interacting with the stream. The only time we can be totally sure is when the user has created the stream with Streams.create(...) and has never yet flushed it to the server.
When the client connects again to the server, it refreshes all the streams from the server. This normally happens automatically:
Q.Streams.refresh();
This refreshes all the streams that were retained by the client, as well as the ones for which operations were done. This blows away any results of simulated messages, restoring the state of the streams on the client to the latest state on the server. Of course, the newly created streams will not yet be on the server, and thus the refresh will not affect their local representation.
After the refresh, some streams will have messageCount stayed the same as it was on the client (before any simulated messages). These are streams for which no new messages were posted, and for them, the client can automatically "flush" the journal of operations on them to the server. The same goes for streams which were created on the client — they will now be created on the server and the queued operations will be done with them there, as long as the user has the permissions to do so. This is done internally by the following function:
Q.Streams.flush(callback);
After the refresh, streams for which the messageCount on the server became higher than it was on the client (before any simulated messages), are streams to which new messages were posted — either through actions of other clients, or by the server itself. The queued operations for this stream are then presented to the client, to decide what to do. The client might choose to perform operational transformation, re-do some operations, or ask the user, since the underlying stream has changed. To perform these operations, the client simply calls the corresponding methods of the API again. (They will be queued again if the client has lost their connection with the server in the meantime.)
Any errors in flushing the queued operations, e.g. because the user actually lacks permissions to carry out those operations, are reported back to the client. Since the callbacks for those operations have long been called, the whole array of errors is reported at once, and the client can choose to do what it wants with them.
(Streams intercom between windows, using 'storage' event.)
Optimistic Updates
A pessimistic approach, which waits for server confirmations for each operation, can appear slow and frustrating to users, and have a serious impact on your bottom line. The Qbix platform enables instantaneous, "optimistic" visual feedback as a side effect of its far-reaching support for offline operations. For example, when the standard stream.save() method is called while the server hosting the stream is unreachable, it queues the operation and calls Q.Streams.Message.simulate to simulate the corresponding "Streams/changed" message, as if the change succeeded. This simulation causes all the corresponding parts of the visual interface to update. When the server re-connects, the stream is refreshed and the visual interface is updated to the latest state on the server. If nothing has changed (and, if there is low latency to the server, the most of the time nothing did), the queued posting of the message finally occurs, and the visual interface once again reflects the "optimistic" value. Currently this causes a brief flicker as the interface shifts back and forth. Any ideas?