CAF
0.17.6
|
State for a path to an upstream actor (source). More...
#include <inbound_path.hpp>
Classes | |
struct | stats_t |
Stores statistics for measuring complexity of incoming batches. More... | |
Public Types | |
using | regular_shutdown = upstream_msg::drop |
Message type for propagating graceful shutdowns. | |
using | irregular_shutdown = upstream_msg::forced_drop |
Message type for propagating errors. | |
Public Member Functions | |
inbound_path (stream_manager_ptr mgr_ptr, stream_slots id, strong_actor_ptr ptr, rtti_pair input_type) | |
Constructs a path for given handle and stream ID. | |
void | handle (downstream_msg::batch &x) |
Updates last_batch_id and assigned_credit before dispatching to the manager. | |
template<class T > | |
void | handle (T &x) |
Dispatches any downstream_msg other than batch directly to the manager. | |
void | emit_ack_open (local_actor *self, actor_addr rebind_from) |
Emits an upstream_msg::ack_batch . | |
void | emit_ack_batch (local_actor *self, int32_t queued_items, actor_clock::time_point now, timespan cycle, timespan desired_batch_complexity) |
Sends an upstream_msg::ack_batch for granting new credit. More... | |
bool | up_to_date () |
Returns whether the path received no input since last emitting ack_batch , i.e., last_acked_batch_id == last_batch_id . | |
void | emit_regular_shutdown (local_actor *self) |
Sends an upstream_msg::drop on this path. | |
void | emit_irregular_shutdown (local_actor *self, error reason) |
Sends an upstream_msg::forced_drop on this path. | |
Static Public Member Functions | |
static void | emit_irregular_shutdown (local_actor *self, stream_slots slots, const strong_actor_ptr &hdl, error reason) |
Sends an upstream_msg::forced_drop . | |
Public Attributes | |
stream_manager_ptr | mgr |
Points to the manager responsible for incoming traffic. | |
strong_actor_ptr | hdl |
Handle to the source. | |
stream_slots | slots |
Stores slot IDs for sender (hdl) and receiver (self). | |
int32_t | desired_batch_size |
Stores the last computed desired batch size. | |
int32_t | assigned_credit |
Amount of credit we have signaled upstream. | |
stream_priority | prio |
Priority of incoming batches from this source. | |
int64_t | last_acked_batch_id |
ID of the last acknowledged batch ID. | |
int64_t | last_batch_id |
ID of the last received batch. | |
stats_t | stats |
Summarizes how many elements we processed during the last cycle and how much time we spent processing those elements. | |
actor_clock::time_point | last_credit_decision |
Stores the time point of the last credit decision for this source. | |
actor_clock::time_point | next_credit_decision |
Stores the time point of the last credit decision for this source. | |
Static Public Attributes | |
static constexpr int | initial_credit = 50 |
Amount of credit we assign sources after receiving open . | |
Related Functions | |
(Note that these are not member functions.) | |
template<class Inspector > | |
Inspector::return_type | inspect (Inspector &f, inbound_path &x) |
State for a path to an upstream actor (source).
void caf::inbound_path::emit_ack_batch | ( | local_actor * | self, |
int32_t | queued_items, | ||
actor_clock::time_point | now, | ||
timespan | cycle, | ||
timespan | desired_batch_complexity | ||
) |
Sends an upstream_msg::ack_batch
for granting new credit.
Credit is calculated from sampled batch durations, the cycle duration and the desired batch complexity.
self | Points to the parent actor, i.e., sender of the message. |
queued_items | Accumulated size of all batches that are currently waiting in the mailbox. |
now | Current timestamp. |
cycle | Time between credit rounds. |
desired_batch_complexity | Desired processing time per batch. |