data_publisher behaviour (kernel v10.6.2)

Copy Markdown View Source

A behavior for building eventually consistent, replicated data stores across distributed nodes.

This module provides the infrastructure for:

  • Replicating data across all nodes running the same scope
  • Automatic peer discovery
  • Subscription-based notifications for data changes
  • Version translation for rolling upgrades

Implementing modules define how data is stored, updated, and how changes are calculated and propagated. This is a generalization of the pg module.

Summary

Types

global_view()

(not exported) (since OTP @OTP-20055@)
-type global_view() :: dynamic().

local_data()

(not exported) (since OTP @OTP-20055@)
-type local_data() :: dynamic().

options()

(not exported) (since OTP @OTP-20055@)
-type options() :: #{standalone => boolean(), dynamic() => dynamic()}.

scope()

(not exported) (since OTP @OTP-20055@)
-type scope() :: atom().

state()

(not exported) (since OTP @OTP-20055@)
-type state() ::
          #state{scope :: atom(),
                 module :: module(),
                 options :: options(),
                 version :: version(),
                 global_view :: global_view(),
                 local_data :: local_data(),
                 peers :: #{pid() => {reference(), version(), local_data()}},
                 subscriptions :: subscriptions(),
                 subscribe_refs :: #{reference() => subscription()}}.

subscribe_result()

(not exported) (since OTP @OTP-20055@)
-type subscribe_result() :: dynamic().

subscription()

(not exported) (since OTP @OTP-20055@)
-type subscription() :: dynamic().

subscriptions()

(not exported) (since OTP @OTP-20055@)
-type subscriptions() :: #{subscription() => #{reference() => pid()}}.

update()

(not exported) (since OTP @OTP-20055@)
-type update() :: dynamic().

version()

(not exported) (since OTP @OTP-20055@)
-type version() :: dynamic().

Callbacks

data_diff(Old, New)

(since OTP @OTP-20055@)
-callback data_diff(Old :: local_data(), New :: local_data()) -> update().

init_global_view/2

(since OTP @OTP-20055@)
-callback init_global_view(scope(), options()) -> global_view().

init_local_data/1

(since OTP @OTP-20055@)
-callback init_local_data(options()) -> local_data().

new_subscription/2

(since OTP @OTP-20055@)
-callback new_subscription(subscription(), global_view()) -> subscribe_result().

stop_global_view/1

(since OTP @OTP-20055@)
-callback stop_global_view(global_view()) -> term().

translate_local_data/3

(since OTP @OTP-20055@)
-callback translate_local_data(MyVersion :: version(), PeerVersion :: version(), local_data()) ->
                                  local_data() | {'$plain_message', dynamic()}.

translate_message/2

(since OTP @OTP-20055@) (optional)
-callback translate_message(dynamic(), global_view()) ->
                               {update, update(), global_view()} |
                               {drop, global_view()} |
                               {update, pid(), update(), global_view()} |
                               {local_data, pid(), version(), local_data(), global_view()} |
                               {discover, pid(), version(), global_view()}.

translate_update/3

(since OTP @OTP-20055@)
-callback translate_update(MyVersion :: version(), PeerVersion :: version(), update()) ->
                              update() | {'$plain_messages', [dynamic()]}.

update_global_view_and_notify/4

(since OTP @OTP-20055@)
-callback update_global_view_and_notify(node(), update(), subscriptions(), global_view()) -> global_view().

update_local_data/2

(since OTP @OTP-20055@)
-callback update_local_data(update(), local_data()) -> local_data().

version()

(since OTP @OTP-20055@)
-callback version() -> version().

Functions

handle_call/3

(since OTP @OTP-20055@)
-spec handle_call({update, update()}, gen_server:from(), state()) -> {reply, ok, state()};
                 ({subscribe, subscription()}, gen_server:from(), state()) ->
                     {reply, {reference(), subscribe_result()}, state()};
                 ({unsubscribe, reference()}, gen_server:from(), state()) -> {reply, ok, state()}.

handle_cast(Message, State)

(since OTP @OTP-20055@)
-spec handle_cast(Message, state()) -> {noreply, state()}
                     when Message :: {local_data, pid(), version(), local_data()} | dynamic().

handle_info/2

(since OTP @OTP-20055@)
-spec handle_info(Message, state()) -> {noreply, state()}
                     when
                         Message ::
                             {discover, pid(), version()} |
                             {local_data, pid(), version(), local_data()} |
                             {update, pid(), update()} |
                             {nodeup, node()} |
                             {nodedown, node()} |
                             {{'DOWN', subscribe}, reference(), process, pid(), dynamic()} |
                             {{'DOWN', peer}, reference(), process, pid(), dynamic()} |
                             dynamic().

init/1

(since OTP @OTP-20055@)
-spec init({scope(), module(), options()}) -> {ok, state()}.

start(Scope, Module)

(since OTP @OTP-20055@)
-spec start(scope(), module()) -> {ok, pid()} | {error, term()}.

start(Scope, Module, Options)

(since OTP @OTP-20055@)
-spec start(scope(), module(), options()) -> {ok, pid()} | {error, term()}.

start_link(Scope, Module)

(since OTP @OTP-20055@)
-spec start_link(scope(), module()) -> {ok, pid()} | {error, term()}.

start_link(Scope, Module, Options)

(since OTP @OTP-20055@)
-spec start_link(scope(), module(), options()) -> {ok, pid()} | {error, term()}.

subscribe(Scope, Subscription)

(since OTP @OTP-20055@)
-spec subscribe(scope(), subscription()) -> {reference(), subscribe_result()}.

terminate/2

(since OTP @OTP-20055@)
-spec terminate(dynamic(), state()) -> term().

unsubscribe(Scope, Ref)

(since OTP @OTP-20055@)
-spec unsubscribe(scope(), reference()) -> ok.

update(Scope, Update)

(since OTP @OTP-20055@)
-spec update(scope(), update()) -> ok.