Examples

To run the example below, first start a celery broker. For example like this:

docker run -p 6379:6379 redis:alpine

Then, it is time to install the dependencies. We use a virtualenv to isolate them from the rest of the system:

python3 -m venv venv # create the virtualenv
source venv/bin/activate # activate the virtualenv
pip install Flask-WebSub[celery,redis] # install the dependencies

Now our environment is set up, we can actually create a server and client file:

#!/usr/bin/env python3
# server_example.py

from flask import Flask, render_template, url_for
# The publisher and hub are combined in the same process because it's easier.
# There's no need to do so, though.
from flask_websub.publisher import publisher, init_publisher
from flask_websub.hub import Hub, SQLite3HubStorage
from celery import Celery

# app & celery
app = Flask(__name__)
app.config['SERVER_NAME'] = 'localhost:8080'
celery = Celery('server_example', broker='redis://localhost:6379')

# initialise publisher
init_publisher(app)

# initialise hub
#
# PUBLISH_SUPPORTED is not recommended in production, as it just accepts any
# link without validation, but it's but nice for testing.
app.config['PUBLISH_SUPPORTED'] = True
# we could also have passed in just PUBLISH_SUPPORTED, but this is probably a
# nice pattern for your app:
hub = Hub(SQLite3HubStorage('server_data.sqlite3'), celery, **app.config)
app.register_blueprint(hub.build_blueprint(url_prefix='/hub'))


def validate_topic_existence(callback_url, topic_url, *args):
    with app.app_context():
        if topic_url.startswith('https://websub.rocks/'):
            return  # pass validation
        if topic_url != url_for('topic', _external=True):
            return "Topic not allowed"


hub.register_validator(validate_topic_existence)
hub.schedule_cleanup()  # cleanup expired subscriptions once a day, by default


@app.before_first_request
def cleanup():
    # or just cleanup manually at some point
    hub.cleanup_expired_subscriptions.delay()


@app.route('/')
@publisher()
def topic():
    return render_template('server_example.html')


@app.route('/update_now')
def update_now():
    hub.send_change_notification.delay(url_for('topic', _external=True))
    return "Notification send!"


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)
#!/usr/bin/env python3
# client_example.py

from flask import Flask, url_for
from flask_websub.subscriber import Subscriber, SQLite3TempSubscriberStorage, \
                                    SQLite3SubscriberStorage, discover

import sys

app = Flask(__name__)
app.config['SERVER_NAME'] = 'localhost:8081'

subscriber = Subscriber(SQLite3SubscriberStorage('client_data.sqlite3'),
                        SQLite3TempSubscriberStorage('client_data.sqlite3'))
app.register_blueprint(subscriber.build_blueprint(url_prefix='/callbacks'))


@subscriber.add_success_handler
def on_success(topic_url, callback_id, mode):
    print("SUCCESS!", topic_url, callback_id, mode)


@subscriber.add_error_handler
def on_error(topic_url, callback_id, msg):
    print("ERROR!", topic_url, callback_id, msg)


@subscriber.add_listener
def on_topic_change(topic_url, callback_id, body):
    print('TOPIC CHANGED!', topic_url, callback_id, body)


if len(sys.argv) == 2:
    published_url = sys.argv[1]
else:
    published_url = 'http://localhost:8080/'


@app.route('/subscribe')
def subscribe_route():
    id = subscriber.subscribe(**discover(published_url))
    return 'Subscribed. ' + url_for('renew_route', id=id, _external=True)


@app.route('/renew/<id>')
def renew_route(id):
    new_id = subscriber.renew(id)
    return 'Renewed: ' + url_for('unsubscribe_route', id=new_id,
                                 _external=True)


@app.route('/unsubscribe/<id>')
def unsubscribe_route(id):
    subscriber.unsubscribe(id)
    return 'Unsubscribed: ' + url_for('cleanup_and_renew_all', _external=True)


@app.route('/cleanup_and_renew_all')
def cleanup_and_renew_all():
    subscriber.cleanup()
    # 100 days, to make sure every single subscription is renewed
    subscriber.renew_close_to_expiration(24 * 60 * 60 * 100)
    return 'Done!'


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8081)
<!DOCTYPE html>
<!-- templates/server_example.html -->
<html>
  <head>
    <title>WebSub test</title>
    {{ websub_self_link }}
    {{ websub_hub_link }}
  </head>
  <body>
    Hello World!
  </body>
</html>

Don’t forget to update server_example.py and client_example.py’s SERVER_NAME config variable when creating those files. Simply set them to whatever hostname the server will have (it can just be localhost).

Finally, it’s time to start the applications. Each line in a different terminal (assuming the virtualenv is active in each):

celery -A server_example.celery worker -l info  # starts the celery worker
./server_example.py  # runs the server flask application
./client_example.py  # runs the client flask application

You can now see the page the hub publishes by navigating to the root url of the hub server (port 8080). As it’s a static page, you can simulate updating it by navingating to /update_now. Of course, for this to do something, you need to subscribe to the URL. The subscriber server (as defined in client_example.py, port 8081) offers a /subscribe endpoint to help you with this. It will also tell you other URLs you can visit to control the subscription side of the process.