prologin.synchronisation module

Synchronisation client/server library: sends updates to clients via long polling connections. Uses Tornado to be able to support an arbitrary number of clients.

class prologin.synchronisation.AsyncClient(url, pk, pub_secret=None, sub_secret=None)

Bases: prologin.webapi.AsyncClient

poll_updates(watch=None)

Starts polling for updates. Asynchronously yields tuples (state, meta) where:

  • state is a (primary_key -> record) mapping

  • meta is a (primary key changed -> kind of update) mapping, for all records than has undergone a change.

Note that records are yielded even if the watched list of changes is empty. See updated_backlog for the meaning of watch.

async send_update(update)
async send_updates(updates)
class prologin.synchronisation.BasePubSubQueue

Bases: object

Maintain a backlog of updates. Used by the server.

This is a base abstract class. It define common utilities, let subclasses implement required methods and let them add other methods.

get_backlog_message()

Return the backlog that is sent to new subscribers as a JSON object.

post_updates(update_msg)

Publish an update message to all subscribers.

register_subscriber(callback)

Register a new subscriber to the queue. callback will be invoked for each published update message (including the initial backlog).

unregister_subscriber(callback)

Remove a subscriber from the queue.

class prologin.synchronisation.Client(url, pk, pub_secret=None, sub_secret=None)

Bases: prologin.webapi.Client

Synchronisation client.

poll_updates(callback, watch=None)

Call callback for each set of updates.

callback is called with an iterable that contain an up-to-date mapping of records (primary_key -> record), and with a mapping: primary key changed -> kind of update, for all records than has watched changes. Note that the callback is invoked even if the watched list of changes is empty. See updated_backlog for the meaning of watch and for returned watched changes.

Warning: failure to complete any HTTP request to the server will raise SystemExit. It is your responsibility to restart the program.

send_update(update)
send_updates(updates)
class prologin.synchronisation.DefaultPubSubQueue(pk, initial_backlog)

Bases: prologin.synchronisation.BasePubSubQueue

Maintain a backlog of updates for records with a field that is unique.

apply_updates(updates)

Apply updates to the backlog and publish the corresponding update message.

get_backlog_message()

Return the backlog that is sent to new subscribers as a JSON object.

class prologin.synchronisation.PollHandler(application, request, **kwargs)

Bases: tornado.web.RequestHandler

get()
message_callback(msg)
on_connection_close()

Called in async handlers if the client closed the connection.

Override this to clean up resources associated with long-lived connections. Note that this method is called only if the connection was closed during asynchronous processing; if you need to do cleanup after every request override on_finish instead.

Proxies may keep a connection open for a time (perhaps indefinitely) after the client has gone away, so this method may not be called promptly after the end user closes their connection.

class prologin.synchronisation.Server(pk, pub_secret, sub_secret, port, app_name)

Bases: prologin.web.TornadoApp

Synchronisation server. Users must derive from this class and implement required methods.

create_pubsub_queue()

Create and return a brand new pubsub queue, taking care of filling it with an initial backlog.

Override this method if you want to have a custom pubsub queue.

get_handlers()

Return a list of URL/request handlers couples for this server.

get_initial_backlog()

Return the initial state of updates as a list.

Users must give an implementation for this method.

start()

Run the server.

class prologin.synchronisation.UpdateHandler(application, request, **kwargs)

Bases: tornado.web.RequestHandler

post()
class prologin.synchronisation.UpdateSenderTask(updater, *args, **kwargs)

Bases: threading.Thread

STOP_GUARD = <object object>
join(**kwargs)

Wait until the thread terminates.

This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception or until the optional timeout occurs.

When the timeout argument is present and not None, it should be a floating point number specifying a timeout for the operation in seconds (or fractions thereof). As join() always returns None, you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.

When the timeout argument is not present or None, the operation will block until the thread terminates.

A thread can be join()ed many times.

join() raises a RuntimeError if an attempt is made to join the current thread as that would cause a deadlock. It is also an error to join() a thread before it has been started and attempts to do so raises the same exception.

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

send(update)
stop()
prologin.synchronisation.apply_updates(pk, backlog, updates, watch=None)

Update backlog with updates using pk as primary key for records.

Return metadata about what have been updated using an encoding like:

{‘obj1’: ‘updated’, ‘obj3’: ‘deleted’, ‘obj6’: ‘created’}

The previous example means: the entity whose primary key is ‘obj1’ have been updated, ‘obj6’ have been created, … Note that deletion and creation of entities always result in returned metadata, whatever watch contains.

If watch is not None, it must be a set of field names. In this case, returned metadata includes only entities whose changes target fields in watch.

For instance:

>>> apply_updates('k', {1: {'k': 1, 'v': 2}},
                [{'type': 'update', 'data': {'k': 1, 'v': 3}}])
{1: 'updated'}
>>> apply_updates(
        'k',
        {
            1: {'k': 1, 'v': 2, 'w': 1},
            2: {'k': 2, 'v': 3, 'w': 4}
        },
        [
            {'type': 'update', 'data': {'k': 1, 'v': 5, 'w': 6}}
            {'type': 'update', 'data': {'k': 1, 'v': 7, 'w': 4}}
        ],
        watch={'w'}
    )
{1: 'updated'}
prologin.synchronisation.items_to_updates(items)