flowchart TD A[Data Sources] --> B[Data Ingestion Layer] B --> C[Stream Processing] C --> D[Data Storage/Cache] D --> E[Shiny Server Logic] E --> F[Real-Time UI Updates] F --> G[User Interactions] G --> H[Feedback Loop] I[Update Mechanisms] --> J[Automatic Refresh] I --> K[WebSocket Connections] I --> L[Server-Sent Events] I --> M[Polling Strategies] N[Performance Layers] --> O[Memory Management] N --> P[Data Sampling] N --> Q[Connection Pooling] N --> R[Caching Strategies] S[Monitoring Systems] --> T[Health Checks] S --> U[Alert Management] S --> V[Performance Metrics] S --> W[Error Recovery] style A fill:#e1f5fe style F fill:#e8f5e8 style I fill:#fff3e0 style N fill:#f3e5f5 style S fill:#fce4ec
Key Takeaways
- Live Data Intelligence: Real-time applications provide immediate insights into changing business conditions, enabling rapid response to critical situations and opportunities
- Streaming Architecture: Advanced data handling techniques manage continuous data flows efficiently while maintaining application responsiveness and stability
- Performance at Scale: Optimized memory management and intelligent data sampling ensure smooth operation even with high-frequency updates and large data volumes
- Interactive Monitoring: Dynamic visualizations and automated alerting systems create comprehensive monitoring platforms that rival commercial solutions
- Production Reliability: Robust error handling and connection management ensure continuous operation in mission-critical monitoring environments
Introduction
Real-time data applications represent the cutting edge of business intelligence, transforming static reporting into dynamic monitoring systems that respond instantly to changing conditions. While traditional dashboards provide valuable historical insights, real-time applications enable organizations to detect patterns, respond to alerts, and make decisions based on current data streams rather than outdated snapshots.
This comprehensive guide covers the complete spectrum of real-time application development in Shiny, from basic automatic refresh patterns to sophisticated streaming data architectures that handle high-frequency updates with WebSocket connections. You’ll master the techniques needed to build monitoring systems, trading platforms, IoT dashboards, and operational intelligence applications that provide genuine real-time value.
Whether you’re building financial trading dashboards that need millisecond updates, manufacturing monitoring systems that track production metrics, or social media analytics platforms that process live data streams, understanding real-time application architecture is essential for creating tools that provide competitive advantage through timely information.
Understanding Real-Time Application Architecture
Real-time Shiny applications involve sophisticated data flow management that balances immediate responsiveness with system stability and performance optimization.
Core Real-Time Components
Data Ingestion: Efficient systems for receiving continuous data streams from databases, APIs, message queues, or direct sensor feeds.
Stream Processing: Real-time data transformation, filtering, and aggregation that prepares raw data streams for visualization and analysis.
Update Management: Intelligent refresh strategies that balance data freshness with application performance and user experience.
Connection Handling: Robust connection management that maintains data flow integrity even during network interruptions or system stress.
Strategic Architecture Approaches
Polling-Based Updates: Simple timer-based refresh patterns suitable for moderate update frequencies and stable data sources.
Push-Based Streaming: Advanced WebSocket or Server-Sent Event implementations for high-frequency updates and bidirectional communication.
Hybrid Systems: Combined approaches that optimize different data streams based on their characteristics and business requirements.
Foundation Real-Time Implementation
Start with core patterns that demonstrate essential real-time concepts while providing the foundation for advanced streaming architectures.
Basic Automatic Refresh Systems
library(shiny)
library(plotly)
library(DT)
library(dplyr)
<- fluidPage(
ui titlePanel("Real-Time Monitoring Dashboard"),
fluidRow(
column(4,
wellPanel(
h4("Update Controls"),
selectInput("update_frequency", "Update Frequency:",
choices = c("Real-time (1s)" = 1000,
"Fast (5s)" = 5000,
"Normal (10s)" = 10000,
"Slow (30s)" = 30000),
selected = 5000),
checkboxInput("auto_update", "Auto Update", value = TRUE),
actionButton("manual_refresh", "Manual Refresh",
class = "btn-primary"),
br(), br(),
h5("System Status"),
textOutput("last_update"),
textOutput("update_count"),
textOutput("data_points")
)
),
column(8,
# Real-time metrics
fluidRow(
valueBoxOutput("current_value", width = 4),
valueBoxOutput("trend_indicator", width = 4),
valueBoxOutput("alert_status", width = 4)
)
)
),
fluidRow(
column(8,
tabsetPanel(
tabPanel("Live Chart",
plotlyOutput("realtime_plot", height = "400px")
),tabPanel("Data Stream",
::dataTableOutput("realtime_table")
DT
),tabPanel("System Metrics",
plotlyOutput("system_metrics", height = "400px")
)
)
),
column(4,
wellPanel(
h4("Live Alerts"),
div(id = "alerts_container",
uiOutput("live_alerts")
)
),
wellPanel(
h4("Performance Monitor"),
plotlyOutput("performance_gauge", height = "200px")
)
)
)
)
<- function(input, output, session) {
server
# Reactive values for real-time data
<- reactiveValues(
realtime_data current_data = data.frame(),
update_counter = 0,
last_update_time = Sys.time(),
system_metrics = data.frame(),
alerts = list()
)
# Initialize with sample data
observe({
<- generate_initial_data()
initial_data $current_data <- initial_data
realtime_data$system_metrics <- generate_system_metrics()
realtime_data
})
# Automatic update mechanism
observe({
# Only update if auto-update is enabled
req(input$auto_update)
# Get update frequency from input
<- as.numeric(input$update_frequency)
frequency invalidateLater(frequency)
# Generate new data point
<- generate_realtime_datapoint()
new_data
# Update current data with rolling window
<- realtime_data$current_data
current_data <- rbind(current_data, new_data)
updated_data
# Keep only last 100 points for performance
if(nrow(updated_data) > 100) {
<- tail(updated_data, 100)
updated_data
}
$current_data <- updated_data
realtime_data$update_counter <- realtime_data$update_counter + 1
realtime_data$last_update_time <- Sys.time()
realtime_data
# Update system metrics
$system_metrics <- rbind(
realtime_data$system_metrics,
realtime_datadata.frame(
timestamp = Sys.time(),
cpu_usage = runif(1, 20, 80),
memory_usage = runif(1, 30, 70),
network_io = runif(1, 10, 90)
)
)
# Keep system metrics window
if(nrow(realtime_data$system_metrics) > 50) {
$system_metrics <- tail(realtime_data$system_metrics, 50)
realtime_data
}
# Check for alerts
check_and_generate_alerts(new_data)
})
# Manual refresh handler
observeEvent(input$manual_refresh, {
# Force immediate update
<- generate_realtime_datapoint()
new_data <- realtime_data$current_data
current_data <- rbind(current_data, new_data)
updated_data
if(nrow(updated_data) > 100) {
<- tail(updated_data, 100)
updated_data
}
$current_data <- updated_data
realtime_data$update_counter <- realtime_data$update_counter + 1
realtime_data$last_update_time <- Sys.time()
realtime_data
showNotification("Data refreshed manually", type = "message", duration = 2)
})
# Value boxes
$current_value <- renderValueBox({
output
<- realtime_data$current_data
current_data
if(nrow(current_data) > 0) {
<- tail(current_data$value, 1)
latest_value
valueBox(
value = round(latest_value, 2),
subtitle = "Current Value",
icon = icon("tachometer-alt"),
color = if(latest_value > 75) "red" else if(latest_value > 50) "yellow" else "green"
)else {
} valueBox(
value = "N/A",
subtitle = "Current Value",
icon = icon("tachometer-alt"),
color = "blue"
)
}
})
$trend_indicator <- renderValueBox({
output
<- realtime_data$current_data
current_data
if(nrow(current_data) >= 2) {
<- tail(current_data$value, 2)
recent_values <- recent_values[2] - recent_values[1]
trend <- round((trend / recent_values[1]) * 100, 1)
trend_pct
valueBox(
value = paste0(ifelse(trend >= 0, "+", ""), trend_pct, "%"),
subtitle = "Trend",
icon = icon(if(trend >= 0) "arrow-up" else "arrow-down"),
color = if(trend >= 0) "green" else "red"
)else {
} valueBox(
value = "0%",
subtitle = "Trend",
icon = icon("minus"),
color = "blue"
)
}
})
$alert_status <- renderValueBox({
output
<- length(realtime_data$alerts)
alert_count
valueBox(
value = alert_count,
subtitle = "Active Alerts",
icon = icon("exclamation-triangle"),
color = if(alert_count > 0) "red" else "green"
)
})
# Real-time plot
$realtime_plot <- renderPlotly({
output
<- realtime_data$current_data
data
if(nrow(data) == 0) return(NULL)
plot_ly(
data = data,
x = ~timestamp,
y = ~value,
type = "scatter",
mode = "lines+markers",
line = list(color = "#2ecc71", width = 3),
marker = list(color = "#27ae60", size = 6),
hovertemplate = "<b>%{x}</b><br>Value: %{y:.2f}<extra></extra>"
%>%
) layout(
title = "Real-Time Data Stream",
xaxis = list(
title = "Time",
type = "date",
tickformat = "%H:%M:%S"
),yaxis = list(title = "Value"),
hovermode = "x",
showlegend = FALSE
%>%
) config(displayModeBar = FALSE)
})
# Real-time data table
$realtime_table <- DT::renderDataTable({
output
<- realtime_data$current_data
data
if(nrow(data) == 0) return(NULL)
# Show most recent data first
<- data %>%
display_data arrange(desc(timestamp)) %>%
mutate(
timestamp = format(timestamp, "%H:%M:%S"),
value = round(value, 3)
%>%
) head(20)
::datatable(
DT
display_data,options = list(
pageLength = 10,
searching = FALSE,
ordering = FALSE,
info = FALSE,
dom = 't',
scrollY = "300px"
),colnames = c("Time", "Value", "Status"),
rownames = FALSE
%>%
) ::formatStyle(
DT"value",
backgroundColor = DT::styleInterval(
cuts = c(25, 50, 75),
values = c("lightgreen", "lightyellow", "orange", "lightcoral")
)
)
})
# System metrics visualization
$system_metrics <- renderPlotly({
output
<- realtime_data$system_metrics
metrics_data
if(nrow(metrics_data) == 0) return(NULL)
plot_ly(metrics_data, x = ~timestamp) %>%
add_lines(y = ~cpu_usage, name = "CPU Usage",
line = list(color = "#e74c3c")) %>%
add_lines(y = ~memory_usage, name = "Memory Usage",
line = list(color = "#3498db")) %>%
add_lines(y = ~network_io, name = "Network I/O",
line = list(color = "#2ecc71")) %>%
layout(
title = "System Performance Metrics",
xaxis = list(title = "Time"),
yaxis = list(title = "Usage (%)", range = c(0, 100)),
hovermode = "x unified"
)
})
# Live alerts display
$live_alerts <- renderUI({
output
<- realtime_data$alerts
alerts
if(length(alerts) == 0) {
return(div(
class = "alert alert-success",
icon("check-circle"),
" All systems normal"
))
}
<- lapply(alerts, function(alert) {
alert_ui div(
class = paste("alert", paste0("alert-", alert$severity)),
strong(alert$title),
br(),
$message,
alertbr(),
small(paste("Time:", format(alert$timestamp, "%H:%M:%S")))
)
})
return(do.call(tagList, alert_ui))
})
# Performance gauge
$performance_gauge <- renderPlotly({
output
<- realtime_data$system_metrics
metrics_data
if(nrow(metrics_data) == 0) {
<- 0
current_cpu else {
} <- tail(metrics_data$cpu_usage, 1)
current_cpu
}
plot_ly(
type = "indicator",
mode = "gauge+number",
value = current_cpu,
domain = list(x = c(0, 1), y = c(0, 1)),
title = list(text = "CPU Usage"),
gauge = list(
axis = list(range = list(NULL, 100)),
bar = list(color = "darkblue"),
steps = list(
list(range = c(0, 50), color = "lightgray"),
list(range = c(50, 80), color = "yellow"),
list(range = c(80, 100), color = "red")
),threshold = list(
line = list(color = "red", width = 4),
thickness = 0.75,
value = 90
)
)%>%
) layout(
margin = list(l = 20, r = 20, t = 40, b = 20),
font = list(color = "darkblue", family = "Arial")
)
})
# Status outputs
$last_update <- renderText({
outputpaste("Last Update:", format(realtime_data$last_update_time, "%H:%M:%S"))
})
$update_count <- renderText({
outputpaste("Updates:", realtime_data$update_counter)
})
$data_points <- renderText({
outputpaste("Data Points:", nrow(realtime_data$current_data))
})
# Helper functions
<- function() {
generate_initial_data <- 20
n data.frame(
timestamp = seq(Sys.time() - n, Sys.time(), length.out = n),
value = cumsum(rnorm(n, 0, 5)) + 50,
status = sample(c("Normal", "Warning", "Critical"), n, replace = TRUE, prob = c(0.8, 0.15, 0.05))
)
}
<- function() {
generate_realtime_datapoint
# Get last value for trend continuation
<- realtime_data$current_data
current_data <- if(nrow(current_data) > 0) tail(current_data$value, 1) else 50
last_value
# Generate new value with some trend
<- last_value + rnorm(1, 0, 3)
new_value <- max(0, min(100, new_value)) # Constrain to 0-100
new_value
data.frame(
timestamp = Sys.time(),
value = new_value,
status = if(new_value > 80) "Critical" else if(new_value > 60) "Warning" else "Normal"
)
}
<- function() {
generate_system_metrics <- 10
n data.frame(
timestamp = seq(Sys.time() - n*10, Sys.time(), length.out = n),
cpu_usage = runif(n, 20, 60),
memory_usage = runif(n, 30, 50),
network_io = runif(n, 10, 40)
)
}
<- function(new_data) {
check_and_generate_alerts
# Clear old alerts (older than 1 minute)
<- Sys.time()
current_time $alerts <- realtime_data$alerts[
realtime_datasapply(realtime_data$alerts, function(x) difftime(current_time, x$timestamp, units = "secs") < 60)
]
# Check for new alerts
if(new_data$value > 90) {
<- list(
new_alert title = "High Value Alert",
message = paste("Value exceeded threshold:", round(new_data$value, 2)),
severity = "danger",
timestamp = Sys.time()
)
$alerts <- append(realtime_data$alerts, list(new_alert))
realtime_data
# Show notification
showNotification(
paste("Alert: High value detected -", round(new_data$value, 2)),
type = "error",
duration = 5
)
}
}
}
shinyApp(ui = ui, server = server)
# Advanced WebSocket implementation for high-frequency updates
library(shiny)
library(websocket)
library(jsonlite)
# WebSocket server setup (would run separately in production)
<- function(port = 8080) {
create_websocket_server
# This would typically be a separate process
# For demonstration, showing the structure
<- function(ws) {
websocket_server
cat("WebSocket connection opened\n")
# Send initial data
$send(toJSON(list(
wstype = "initial_data",
data = generate_sample_data(50)
)))
# Set up periodic data sending
<- later::later(function() {
timer
if(ws$readyState() == 1) { # Connection open
# Generate and send new data
<- generate_realtime_point()
new_data
$send(toJSON(list(
wstype = "data_update",
data = new_data,
timestamp = as.numeric(Sys.time())
)))
# Schedule next update
::later(function() {
later# Recursive scheduling for continuous updates
delay = 1) # 1 second intervals
},
}delay = 1)
},
$onMessage(function(binary, message) {
wscat("Received message:", message, "\n")
# Handle client messages
<- fromJSON(message)
parsed_message
if(parsed_message$type == "subscribe") {
# Handle subscription requests
$send(toJSON(list(
wstype = "subscription_confirmed",
channels = parsed_message$channels
)))
}
})
$onClose(function() {
wscat("WebSocket connection closed\n")
})
}
return(websocket_server)
}
# Shiny application with WebSocket integration
<- fluidPage(
ui
# Include WebSocket client library
$head(
tags$script(src = "https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.0/socket.io.js"),
tags$script(HTML("
tags var websocket;
var connected = false;
// WebSocket connection management
function connectWebSocket() {
websocket = new WebSocket('ws://localhost:8080');
websocket.onopen = function(event) {
console.log('WebSocket connected');
connected = true;
Shiny.setInputValue('websocket_status', 'connected');
// Subscribe to data streams
websocket.send(JSON.stringify({
type: 'subscribe',
channels: ['realtime_data', 'system_metrics']
}));
};
websocket.onmessage = function(event) {
var message = JSON.parse(event.data);
// Send data to Shiny
if(message.type === 'data_update') {
Shiny.setInputValue('websocket_data', {
data: message.data,
timestamp: message.timestamp
});
} else if(message.type === 'initial_data') {
Shiny.setInputValue('websocket_initial', message.data);
}
};
websocket.onclose = function(event) {
console.log('WebSocket disconnected');
connected = false;
Shiny.setInputValue('websocket_status', 'disconnected');
// Attempt reconnection after delay
setTimeout(connectWebSocket, 5000);
};
websocket.onerror = function(error) {
console.error('WebSocket error:', error);
Shiny.setInputValue('websocket_status', 'error');
};
}
// Initialize WebSocket connection when page loads
$(document).ready(function() {
connectWebSocket();
});
"))
),
titlePanel("WebSocket Real-Time Dashboard"),
fluidRow(
column(4,
wellPanel(
h4("Connection Status"),
textOutput("connection_status"),
br(),
h4("Stream Controls"),
checkboxGroupInput("data_streams", "Active Streams:",
choices = c("Market Data" = "market",
"System Metrics" = "system",
"User Activity" = "activity"),
selected = c("market", "system")),
actionButton("reconnect_ws", "Reconnect WebSocket",
class = "btn-warning")
)
),
column(8,
# Real-time metrics cards
fluidRow(
valueBoxOutput("ws_current_value", width = 4),
valueBoxOutput("ws_update_rate", width = 4),
valueBoxOutput("ws_data_points", width = 4)
)
)
),
fluidRow(
column(8,
tabsetPanel(
tabPanel("Live Stream",
plotlyOutput("websocket_plot", height = "400px")
),tabPanel("High Frequency",
plotlyOutput("hf_plot", height = "400px")
),tabPanel("System Monitor",
plotlyOutput("system_monitor", height = "400px")
)
)
),
column(4,
wellPanel(
h4("Stream Statistics"),
verbatimTextOutput("stream_stats")
),
wellPanel(
h4("Latest Data"),
::dataTableOutput("latest_data")
DT
)
)
)
)
<- function(input, output, session) {
server
# WebSocket data storage
<- reactiveValues(
websocket_data current_data = data.frame(),
connection_status = "disconnected",
update_count = 0,
last_update = NULL,
data_rate = 0
)
# Handle WebSocket status updates
observeEvent(input$websocket_status, {
$connection_status <- input$websocket_status
websocket_data
})
# Handle initial WebSocket data
observeEvent(input$websocket_initial, {
<- input$websocket_initial
initial_data
if(!is.null(initial_data)) {
# Convert from JSON structure to data frame
<- data.frame(
df timestamp = as.POSIXct(initial_data$timestamp, origin = "1970-01-01"),
value = unlist(initial_data$values),
source = "websocket"
)
$current_data <- df
websocket_data
}
})
# Handle real-time WebSocket data updates
observeEvent(input$websocket_data, {
<- input$websocket_data
ws_update
if(!is.null(ws_update)) {
# Create new data point
<- data.frame(
new_point timestamp = as.POSIXct(ws_update$timestamp, origin = "1970-01-01"),
value = ws_update$data$value,
source = "websocket"
)
# Add to existing data
<- websocket_data$current_data
current_data <- rbind(current_data, new_point)
updated_data
# Maintain rolling window for performance
if(nrow(updated_data) > 200) {
<- tail(updated_data, 200)
updated_data
}
$current_data <- updated_data
websocket_data$update_count <- websocket_data$update_count + 1
websocket_data$last_update <- Sys.time()
websocket_data
# Calculate update rate
if(websocket_data$update_count > 1) {
<- as.numeric(difftime(Sys.time(), websocket_data$last_update, units = "secs"))
time_diff $data_rate <- 1 / max(time_diff, 0.1) # Updates per second
websocket_data
}
}
})
# Connection status display
$connection_status <- renderText({
output
<- websocket_data$connection_status
status
<- switch(status,
status_text "connected" = "🟢 Connected",
"disconnected" = "🔴 Disconnected",
"error" = "🟡 Error",
"🔵 Unknown"
)
paste("WebSocket:", status_text)
})
# WebSocket value boxes
$ws_current_value <- renderValueBox({
output
<- websocket_data$current_data
data
if(nrow(data) > 0) {
<- tail(data$value, 1)
current_value
valueBox(
value = round(current_value, 2),
subtitle = "Current Value",
icon = icon("chart-line"),
color = "blue"
)else {
} valueBox(
value = "N/A",
subtitle = "Current Value",
icon = icon("chart-line"),
color = "light-blue"
)
}
})
$ws_update_rate <- renderValueBox({
output
<- websocket_data$data_rate
rate
valueBox(
value = paste0(round(rate, 1), "/s"),
subtitle = "Update Rate",
icon = icon("tachometer-alt"),
color = if(rate > 0.5) "green" else "yellow"
)
})
$ws_data_points <- renderValueBox({
output
<- nrow(websocket_data$current_data)
data_count
valueBox(
value = data_count,
subtitle = "Data Points",
icon = icon("database"),
color = "purple"
)
})
# WebSocket real-time plot
$websocket_plot <- renderPlotly({
output
<- websocket_data$current_data
data
if(nrow(data) == 0) {
return(plot_ly() %>%
layout(title = "Waiting for WebSocket data...") %>%
config(displayModeBar = FALSE))
}
plot_ly(
data = data,
x = ~timestamp,
y = ~value,
type = "scatter",
mode = "lines",
line = list(color = "#3498db", width = 2),
hovertemplate = "<b>%{x}</b><br>Value: %{y:.4f}<extra></extra>"
%>%
) layout(
title = "WebSocket Live Data Stream",
xaxis = list(
title = "Time",
type = "date",
tickformat = "%H:%M:%S"
),yaxis = list(title = "Value"),
hovermode = "x"
%>%
) config(displayModeBar = FALSE)
})
# Stream statistics
$stream_stats <- renderPrint({
output
<- websocket_data$current_data
data
cat("Connection Status:", websocket_data$connection_status, "\n")
cat("Total Updates:", websocket_data$update_count, "\n")
cat("Data Points:", nrow(data), "\n")
cat("Update Rate:", round(websocket_data$data_rate, 2), "updates/sec\n")
if(!is.null(websocket_data$last_update)) {
cat("Last Update:", format(websocket_data$last_update, "%H:%M:%S"), "\n")
}
if(nrow(data) > 1) {
cat("\nData Statistics:\n")
cat("Min Value:", round(min(data$value), 4), "\n")
cat("Max Value:", round(max(data$value), 4), "\n")
cat("Mean Value:", round(mean(data$value), 4), "\n")
cat("Std Dev:", round(sd(data$value), 4), "\n")
}
})
# Latest data table
$latest_data <- DT::renderDataTable({
output
<- websocket_data$current_data
data
if(nrow(data) == 0) return(NULL)
<- data %>%
latest_data arrange(desc(timestamp)) %>%
head(10) %>%
mutate(
timestamp = format(timestamp, "%H:%M:%S.%OS3"),
value = round(value, 4)
)
::datatable(
DT
latest_data,options = list(
pageLength = 5,
searching = FALSE,
ordering = FALSE,
info = FALSE,
dom = 't',
scrollY = "200px"
),colnames = c("Time", "Value", "Source"),
rownames = FALSE
)
})
# Reconnect WebSocket handler
observeEvent(input$reconnect_ws, {
runjs("
if(websocket) {
websocket.close();
}
setTimeout(connectWebSocket, 1000);
")
showNotification("Reconnecting WebSocket...", type = "message", duration = 3)
}) }
Streaming Data Processing and Analytics
Implement sophisticated real-time data processing that transforms raw streams into actionable insights:
<- function(input, output, session) {
server
# Advanced streaming data processing
<- reactiveValues(
stream_processor raw_buffer = list(),
processed_data = data.frame(),
analytics_cache = list(),
processing_stats = list(
total_processed = 0,
processing_rate = 0,
error_count = 0,
last_error = NULL
)
)
# Real-time data ingestion with buffering
<- function(new_data_batch) {
ingest_streaming_data
tryCatch({
# Add to buffer
$raw_buffer <- append(stream_processor$raw_buffer, list(new_data_batch))
stream_processor
# Process buffer when it reaches threshold
if(length(stream_processor$raw_buffer) >= 10) {
# Combine buffered data
<- do.call(rbind, stream_processor$raw_buffer)
combined_data
# Apply real-time analytics
<- apply_streaming_analytics(combined_data)
processed_batch
# Update processed data store
update_processed_data(processed_batch)
# Clear buffer
$raw_buffer <- list()
stream_processor
# Update processing statistics
$processing_stats$total_processed <-
stream_processor$processing_stats$total_processed + nrow(combined_data)
stream_processor
}
error = function(e) {
},
$processing_stats$error_count <-
stream_processor$processing_stats$error_count + 1
stream_processor$processing_stats$last_error <- e$message
stream_processor
# Log error but continue processing
cat("Streaming processing error:", e$message, "\n")
})
}
# Advanced analytics pipeline
<- function(raw_data) {
apply_streaming_analytics
# 1. Data cleansing and validation
<- raw_data %>%
clean_data filter(!is.na(value), !is.infinite(value)) %>%
mutate(
# Outlier detection using rolling statistics
rolling_mean = zoo::rollmean(value, k = 5, fill = NA, align = "right"),
rolling_sd = zoo::rollapply(value, width = 5, FUN = sd, fill = NA, align = "right"),
z_score = (value - rolling_mean) / rolling_sd,
is_outlier = abs(z_score) > 3
)
# 2. Feature engineering
<- clean_data %>%
enriched_data mutate(
# Trend indicators
momentum = value - lag(value, 1),
acceleration = momentum - lag(momentum, 1),
# Statistical features
percentile_rank = percent_rank(value),
moving_avg_5 = zoo::rollmean(value, k = 5, fill = NA, align = "right"),
moving_avg_20 = zoo::rollmean(value, k = 20, fill = NA, align = "right"),
# Volatility measures
volatility = zoo::rollapply(value, width = 10, FUN = sd, fill = NA, align = "right"),
# Signal classification
signal = case_when(
> moving_avg_20 + 2*volatility ~ "strong_buy",
value > moving_avg_20 + volatility ~ "buy",
value < moving_avg_20 - 2*volatility ~ "strong_sell",
value < moving_avg_20 - volatility ~ "sell",
value TRUE ~ "hold"
)
)
# 3. Pattern detection
<- enriched_data %>%
pattern_data mutate(
# Trend patterns
trend_direction = case_when(
> moving_avg_20 ~ "uptrend",
moving_avg_5 < moving_avg_20 ~ "downtrend",
moving_avg_5 TRUE ~ "sideways"
),
# Support/resistance levels
local_max = zoo::rollapply(value, width = 5, FUN = function(x) which.max(x) == 3,
fill = FALSE, align = "center"),
local_min = zoo::rollapply(value, width = 5, FUN = function(x) which.min(x) == 3,
fill = FALSE, align = "center")
)
return(pattern_data)
}
# Update processed data with sliding window
<- function(new_processed_data) {
update_processed_data
<- stream_processor$processed_data
current_data <- rbind(current_data, new_processed_data)
combined_data
# Maintain sliding window of last N points
<- 1000
max_points if(nrow(combined_data) > max_points) {
<- tail(combined_data, max_points)
combined_data
}
$processed_data <- combined_data
stream_processor
# Update analytics cache
update_analytics_cache(combined_data)
}
# Real-time analytics caching
<- function(data) {
update_analytics_cache
if(nrow(data) == 0) return()
# Calculate key metrics
$analytics_cache <- list(
stream_processor
# Basic statistics
current_value = tail(data$value, 1),
min_value = min(data$value, na.rm = TRUE),
max_value = max(data$value, na.rm = TRUE),
mean_value = mean(data$value, na.rm = TRUE),
median_value = median(data$value, na.rm = TRUE),
std_dev = sd(data$value, na.rm = TRUE),
# Trend analysis
current_trend = tail(data$trend_direction, 1),
momentum = tail(data$momentum, 1),
volatility = tail(data$volatility, 1),
# Signal analysis
current_signal = tail(data$signal, 1),
buy_signals = sum(data$signal %in% c("buy", "strong_buy"), na.rm = TRUE),
sell_signals = sum(data$signal %in% c("sell", "strong_sell"), na.rm = TRUE),
# Pattern detection
recent_peaks = sum(tail(data$local_max, 20), na.rm = TRUE),
recent_troughs = sum(tail(data$local_min, 20), na.rm = TRUE),
# Quality metrics
outlier_rate = mean(data$is_outlier, na.rm = TRUE),
data_quality_score = 1 - mean(is.na(data$value))
)
}
# Real-time alert system
<- reactiveValues(
realtime_alerts active_alerts = list(),
alert_history = data.frame()
)
# Monitor for alert conditions
observe({
<- stream_processor$analytics_cache
cache
if(length(cache) == 0) return()
# Check alert conditions
<- list()
new_alerts
# Volatility alert
if(!is.null(cache$volatility) && !is.na(cache$volatility) && cache$volatility > 10) {
<- append(new_alerts, list(list(
new_alerts id = paste0("volatility_", Sys.time()),
type = "volatility",
severity = "warning",
title = "High Volatility Detected",
message = paste("Current volatility:", round(cache$volatility, 2)),
timestamp = Sys.time(),
value = cache$volatility
)))
}
# Trend change alert
if(!is.null(cache$current_trend) && cache$current_trend != "sideways") {
# Check if trend changed recently
<- stream_processor$processed_data
processed_data if(nrow(processed_data) > 10) {
<- processed_data$trend_direction[nrow(processed_data) - 5]
previous_trend
if(!is.na(previous_trend) && previous_trend != cache$current_trend) {
<- append(new_alerts, list(list(
new_alerts id = paste0("trend_", Sys.time()),
type = "trend_change",
severity = "info",
title = "Trend Change Detected",
message = paste("New trend:", cache$current_trend),
timestamp = Sys.time(),
value = cache$current_trend
)))
}
}
}
# Signal alert
if(!is.null(cache$current_signal) && cache$current_signal %in% c("strong_buy", "strong_sell")) {
<- append(new_alerts, list(list(
new_alerts id = paste0("signal_", Sys.time()),
type = "trading_signal",
severity = if(cache$current_signal == "strong_buy") "success" else "danger",
title = "Strong Trading Signal",
message = paste("Signal:", toupper(gsub("_", " ", cache$current_signal))),
timestamp = Sys.time(),
value = cache$current_signal
)))
}
# Add new alerts to active list
if(length(new_alerts) > 0) {
$active_alerts <- append(realtime_alerts$active_alerts, new_alerts)
realtime_alerts
# Show notifications
for(alert in new_alerts) {
showNotification(
ui = div(
strong(alert$title),
br(),
$message
alert
),type = switch(alert$severity,
"success" = "message",
"info" = "message",
"warning" = "warning",
"danger" = "error"),
duration = 8
)
}
}
# Clean up old alerts (keep only last 50)
if(length(realtime_alerts$active_alerts) > 50) {
$active_alerts <- tail(realtime_alerts$active_alerts, 50)
realtime_alerts
}
})
# Advanced visualization outputs
$streaming_analytics_plot <- renderPlotly({
output
<- stream_processor$processed_data
data
if(nrow(data) == 0) return(NULL)
# Create multi-series plot with analytics
plot_ly(data, x = ~timestamp) %>%
# Main value line
add_lines(y = ~value, name = "Value",
line = list(color = "#2c3e50", width = 2)) %>%
# Moving averages
add_lines(y = ~moving_avg_5, name = "MA(5)",
line = list(color = "#3498db", width = 1, dash = "dot")) %>%
add_lines(y = ~moving_avg_20, name = "MA(20)",
line = list(color = "#e74c3c", width = 1, dash = "dash")) %>%
# Volatility bands
add_ribbons(ymin = ~moving_avg_20 - volatility,
ymax = ~moving_avg_20 + volatility,
name = "Volatility Band",
fillcolor = "rgba(52, 152, 219, 0.1)",
line = list(color = "transparent")) %>%
# Signal markers
add_markers(data = data[data$signal %in% c("strong_buy", "strong_sell"), ],
y = ~value, color = ~signal,
colors = c("strong_buy" = "green", "strong_sell" = "red"),
marker = list(size = 8, symbol = "triangle-up"),
name = "Signals") %>%
layout(
title = "Real-Time Analytics Dashboard",
xaxis = list(title = "Time"),
yaxis = list(title = "Value"),
hovermode = "x unified",
showlegend = TRUE
)
})
# Analytics summary output
$analytics_summary <- renderUI({
output
<- stream_processor$analytics_cache
cache
if(length(cache) == 0) {
return(div("No analytics data available"))
}
div(
class = "analytics-summary",
fluidRow(
column(6,
h5("Current Metrics"),
p(strong("Value:"), round(cache$current_value %||% 0, 4)),
p(strong("Trend:"), cache$current_trend %||% "Unknown"),
p(strong("Signal:"), toupper(gsub("_", " ", cache$current_signal %||% "None"))),
p(strong("Volatility:"), round(cache$volatility %||% 0, 2))
),
column(6,
h5("Statistics"),
p(strong("Mean:"), round(cache$mean_value %||% 0, 4)),
p(strong("Std Dev:"), round(cache$std_dev %||% 0, 4)),
p(strong("Min/Max:"), paste(round(cache$min_value %||% 0, 2), "/", round(cache$max_value %||% 0, 2))),
p(strong("Quality:"), paste0(round((cache$data_quality_score %||% 0) * 100, 1), "%"))
)
),
hr(),
fluidRow(
column(12,
h5("Recent Activity"),
p(strong("Buy Signals:"), cache$buy_signals %||% 0),
p(strong("Sell Signals:"), cache$sell_signals %||% 0),
p(strong("Recent Peaks:"), cache$recent_peaks %||% 0),
p(strong("Recent Troughs:"), cache$recent_troughs %||% 0)
)
)
)
}) }
High-Performance Real-Time Architectures
Memory-Efficient Streaming Data Management
# Optimized high-performance real-time data handling
<- function() {
create_high_performance_realtime_app
<- function(input, output, session) {
server
# High-performance data structures
<- reactiveValues(
performance_manager # Circular buffers for memory efficiency
data_buffer = rep(NA_real_, 10000),
timestamp_buffer = rep(as.POSIXct(NA), 10000),
buffer_pointer = 1,
buffer_size = 10000,
# Performance metrics
processing_times = numeric(100),
memory_usage_history = numeric(100),
update_frequency = 0,
# Connection health
connection_status = "disconnected",
missed_updates = 0,
last_heartbeat = NULL
)
# Optimized data ingestion
<- function(new_values, timestamps) {
ingest_high_frequency_data
<- Sys.time()
start_time
tryCatch({
# Batch processing for efficiency
<- length(new_values)
n_new
if(n_new > 0) {
# Calculate insertion positions in circular buffer
<- performance_manager$buffer_pointer
start_pos <- (start_pos + n_new - 1) %% performance_manager$buffer_size + 1
end_pos
# Handle wraparound
if(end_pos < start_pos) {
# Split insertion across buffer wrap
<- performance_manager$buffer_size - start_pos + 1
first_chunk
# Insert first chunk
$data_buffer[start_pos:performance_manager$buffer_size] <-
performance_manager1:first_chunk]
new_values[$timestamp_buffer[start_pos:performance_manager$buffer_size] <-
performance_manager1:first_chunk]
timestamps[
# Insert remaining chunk
if(n_new > first_chunk) {
<- new_values[(first_chunk + 1):n_new]
remaining <- timestamps[(first_chunk + 1):n_new]
remaining_ts
$data_buffer[1:length(remaining)] <- remaining
performance_manager$timestamp_buffer[1:length(remaining)] <- remaining_ts
performance_manager
}
else {
} # Contiguous insertion
<- start_pos:end_pos
indices $data_buffer[indices] <- new_values
performance_manager$timestamp_buffer[indices] <- timestamps
performance_manager
}
# Update buffer pointer
$buffer_pointer <-
performance_manager$buffer_pointer + n_new - 1) %% performance_manager$buffer_size + 1
(performance_manager
}
# Record processing time
<- as.numeric(difftime(Sys.time(), start_time, units = "secs"))
processing_time $processing_times <- c(
performance_managertail(performance_manager$processing_times, 99),
processing_time
)
error = function(e) {
}, cat("High-frequency ingestion error:", e$message, "\n")
$missed_updates <- performance_manager$missed_updates + 1
performance_manager
})
}
# Efficient data extraction from circular buffer
<- function(n_points = 1000) {
get_recent_data
# Get valid data points from circular buffer
<- !is.na(performance_manager$data_buffer)
valid_indices
if(!any(valid_indices)) {
return(data.frame(timestamp = as.POSIXct(character(0)), value = numeric(0)))
}
# Extract most recent n_points
<- performance_manager$buffer_pointer
pointer <- performance_manager$buffer_size
buffer_size
# Calculate extraction range
if(sum(valid_indices) >= n_points) {
# Get last n_points worth of data
<- (pointer - 1 + buffer_size) %% buffer_size + 1
end_pos <- (end_pos - n_points + 1 + buffer_size) %% buffer_size + 1
start_pos
if(start_pos <= end_pos) {
# Contiguous range
<- start_pos:end_pos
indices else {
} # Wraparound range
<- c(start_pos:buffer_size, 1:end_pos)
indices
}
else {
} # Return all valid data
<- which(valid_indices)
indices
}
# Create data frame
data.frame(
timestamp = performance_manager$timestamp_buffer[indices],
value = performance_manager$data_buffer[indices]
%>%
) filter(!is.na(timestamp)) %>%
arrange(timestamp)
}
# Optimized plotting for high-frequency data
$high_frequency_plot <- renderPlotly({
output
# Get recent data efficiently
<- get_recent_data(500) # Last 500 points
plot_data
if(nrow(plot_data) == 0) {
return(plot_ly() %>%
layout(title = "Waiting for high-frequency data..."))
}
# Use WebGL for performance with large datasets
plot_ly(
plot_data,x = ~timestamp,
y = ~value,
type = "scattergl", # WebGL for performance
mode = "lines",
line = list(color = "#2ecc71", width = 1),
hovertemplate = "Time: %{x}<br>Value: %{y:.6f}<extra></extra>"
%>%
) layout(
title = paste("Live High-Frequency Data (", nrow(plot_data), "points)"),
xaxis = list(
title = "Time",
type = "date",
tickformat = "%H:%M:%S.%L" # Show milliseconds
),yaxis = list(title = "Value"),
hovermode = "x"
%>%
) config(
displayModeBar = FALSE,
scrollZoom = TRUE,
responsive = TRUE
)
})
# Performance monitoring
$performance_metrics <- renderUI({
output
<- performance_manager$processing_times
processing_times <- processing_times[!is.na(processing_times) & processing_times > 0]
valid_times
if(length(valid_times) == 0) {
return(div("No performance data available"))
}
# Calculate performance statistics
<- mean(valid_times) * 1000 # Convert to milliseconds
avg_processing_time <- max(valid_times) * 1000
max_processing_time <- 1 / mean(valid_times) # Operations per second
processing_rate
# Memory usage
<- as.numeric(object.size(performance_manager)) / (1024^2) # MB
current_memory
div(
class = "performance-panel",
h4("Performance Metrics"),
fluidRow(
column(6,
p(strong("Avg Processing Time:"), paste0(round(avg_processing_time, 2), "ms")),
p(strong("Max Processing Time:"), paste0(round(max_processing_time, 2), "ms")),
p(strong("Processing Rate:"), paste0(round(processing_rate, 1), " ops/sec"))
),
column(6,
p(strong("Memory Usage:"), paste0(round(current_memory, 2), "MB")),
p(strong("Buffer Utilization:"), paste0(round(sum(!is.na(performance_manager$data_buffer)) / performance_manager$buffer_size * 100, 1), "%")),
p(strong("Missed Updates:"), performance_manager$missed_updates)
)
),
# Performance indicator
div(
class = if(avg_processing_time < 1) "alert alert-success" else if(avg_processing_time < 5) "alert alert-warning" else "alert alert-danger",
strong("Performance Status: "),
if(avg_processing_time < 1) "Optimal" else if(avg_processing_time < 5) "Good" else "Needs Attention"
)
)
})
# Automatic memory cleanup
observe({
invalidateLater(300000) # Every 5 minutes
# Force garbage collection
gc()
# Reset performance counters periodically
if(performance_manager$missed_updates > 100) {
$missed_updates <- 0
performance_managershowNotification("Performance counters reset", type = "message")
}
})
# Simulated high-frequency data generation
observe({
# High frequency updates (multiple times per second)
invalidateLater(100) # 10 updates per second
# Generate batch of data points
<- sample(1:5, 1) # Variable batch sizes
batch_size
<- cumsum(rnorm(batch_size, 0, 0.01)) +
new_values tail(performance_manager$data_buffer[!is.na(performance_manager$data_buffer)], 1) %||% 0
<- seq(Sys.time(), by = 0.01, length.out = batch_size)
new_timestamps
# Ingest data using optimized function
ingest_high_frequency_data(new_values, new_timestamps)
})
}
return(server)
}
WebSocket Performance Optimization
Advanced WebSocket configurations ensure reliable high-frequency data transmission:
# Production-grade WebSocket configuration
<- function() {
optimize_websocket_performance
# Client-side optimization
<- list(
websocket_client_config # Connection settings
reconnect_attempts = 10,
reconnect_delay = c(1000, 2000, 4000, 8000, 16000), # Exponential backoff
heartbeat_interval = 30000, # 30 seconds
# Buffer settings
max_buffer_size = 1000000, # 1MB
batch_size = 100,
flush_interval = 100, # 100ms
# Performance settings
compression = TRUE,
binary_mode = FALSE,
keep_alive = TRUE
)
# Server-side configuration
<- list(
websocket_server_config # Resource management
max_connections = 1000,
connection_timeout = 300000, # 5 minutes
message_size_limit = 65536, # 64KB
# Performance tuning
enable_compression = TRUE,
tcp_nodelay = TRUE,
socket_keepalive = TRUE,
# Rate limiting
messages_per_second = 100,
burst_limit = 500,
# Memory management
gc_interval = 60000, # 1 minute
memory_threshold = 512 # 512MB
)
return(list(
client = websocket_client_config,
server = websocket_server_config
))
}
# Enhanced WebSocket message handling
<- function() {
create_optimized_websocket_handler
# Message compression and batching
<- function(messages) {
compress_message_batch
# Batch multiple small messages
if(length(messages) > 1) {
<- list(
batch_message type = "batch",
messages = messages,
timestamp = as.numeric(Sys.time()),
compression = "gzip"
)
# Compress JSON
<- jsonlite::toJSON(batch_message, auto_unbox = TRUE)
json_data <- memCompress(charToRaw(json_data), type = "gzip")
compressed_data
return(compressed_data)
else {
} # Single message - no batching needed
return(jsonlite::toJSON(messages[[1]], auto_unbox = TRUE))
}
}
# Message decompression and processing
<- function(raw_data) {
decompress_message_batch
tryCatch({
# Check if data is compressed
if(is.raw(raw_data)) {
# Decompress
<- rawToChar(memDecompress(raw_data, type = "gzip"))
decompressed <- jsonlite::fromJSON(decompressed)
message_data else {
} # Plain JSON
<- jsonlite::fromJSON(raw_data)
message_data
}
# Handle batch messages
if(!is.null(message_data$type) && message_data$type == "batch") {
return(message_data$messages)
else {
} return(list(message_data))
}
error = function(e) {
}, cat("Message decompression error:", e$message, "\n")
return(list())
})
}
return(list(
compress = compress_message_batch,
decompress = decompress_message_batch
)) }
Production Real-Time Deployment Strategies
Scalable Architecture for High-Volume Streams
# Enterprise-grade real-time application architecture
<- function() {
create_enterprise_realtime_app
<- fluidPage(
ui
# Production CSS and JS optimizations
$head(
tags$style(HTML("
tags .realtime-container {
position: relative;
min-height: 400px;
}
.performance-indicator {
position: absolute;
top: 10px;
right: 10px;
z-index: 1000;
}
.status-green { color: #27ae60; }
.status-yellow { color: #f39c12; }
.status-red { color: #e74c3c; }
.metrics-card {
background: #f8f9fa;
border-radius: 8px;
padding: 15px;
margin-bottom: 15px;
border-left: 4px solid #3498db;
}
")),
# WebSocket client with production features
$script(HTML("
tags class ProductionWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
reconnectAttempts: 10,
reconnectDelay: [1000, 2000, 4000, 8000, 16000],
heartbeatInterval: 30000,
messageQueue: [],
maxQueueSize: 1000,
...options
};
this.reconnectCount = 0;
this.isConnecting = false;
this.messageBuffer = [];
this.lastHeartbeat = null;
this.connect();
this.startHeartbeat();
}
connect() {
if(this.isConnecting) return;
this.isConnecting = true;
this.ws = new WebSocket(this.url);
this.ws.onopen = (event) => {
console.log('WebSocket connected');
this.isConnecting = false;
this.reconnectCount = 0;
// Flush message buffer
this.flushMessageBuffer();
// Notify Shiny
Shiny.setInputValue('websocket_status', 'connected');
};
this.ws.onmessage = (event) => {
this.handleMessage(event.data);
};
this.ws.onclose = (event) => {
console.log('WebSocket disconnected');
this.isConnecting = false;
Shiny.setInputValue('websocket_status', 'disconnected');
// Attempt reconnection
this.scheduleReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
Shiny.setInputValue('websocket_status', 'error');
};
}
handleMessage(data) {
try {
// Update last heartbeat
this.lastHeartbeat = Date.now();
// Parse message
const message = JSON.parse(data);
// Handle different message types
if(message.type === 'heartbeat') {
// Heartbeat response - no action needed
return;
}
if(message.type === 'batch') {
// Process batch messages
message.messages.forEach(msg => {
this.processMessage(msg);
});
} else {
// Single message
this.processMessage(message);
}
} catch(error) {
console.error('Message processing error:', error);
}
}
processMessage(message) {
// Send to Shiny based on message type
if(message.type === 'realtime_data') {
Shiny.setInputValue('realtime_data_stream', {
data: message.data,
timestamp: message.timestamp,
source: message.source || 'websocket'
});
} else if(message.type === 'system_metrics') {
Shiny.setInputValue('system_metrics_stream', message.data);
} else if(message.type === 'alert') {
Shiny.setInputValue('realtime_alert', {
alert: message.data,
timestamp: Date.now()
});
}
}
send(data) {
if(this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
} else {
// Queue message for later
if(this.messageBuffer.length < this.options.maxQueueSize) {
this.messageBuffer.push(data);
}
}
}
flushMessageBuffer() {
while(this.messageBuffer.length > 0) {
const message = this.messageBuffer.shift();
this.send(message);
}
}
scheduleReconnect() {
if(this.reconnectCount >= this.options.reconnectAttempts) {
console.error('Max reconnection attempts reached');
return;
}
const delay = this.options.reconnectDelay[
Math.min(this.reconnectCount, this.options.reconnectDelay.length - 1)
];
setTimeout(() => {
this.reconnectCount++;
this.connect();
}, delay);
}
startHeartbeat() {
setInterval(() => {
if(this.ws && this.ws.readyState === WebSocket.OPEN) {
this.send({ type: 'heartbeat', timestamp: Date.now() });
}
// Check for connection health
if(this.lastHeartbeat && Date.now() - this.lastHeartbeat > 60000) {
console.warn('Connection appears stale, reconnecting...');
this.ws.close();
}
}, this.options.heartbeatInterval);
}
}
// Initialize production WebSocket
let productionWS;
$(document).ready(function() {
productionWS = new ProductionWebSocket('ws://localhost:8080', {
reconnectAttempts: 15,
heartbeatInterval: 20000
});
});
"))
),
titlePanel("Enterprise Real-Time Monitoring System"),
# Control panel
fluidRow(
column(3,
wellPanel(
h4("System Control"),
div(class = "metrics-card",
h5("Connection Status"),
textOutput("connection_health"),
br(),
actionButton("force_reconnect", "Force Reconnect",
class = "btn-warning btn-sm")
),
div(class = "metrics-card",
h5("Stream Configuration"),
selectInput("stream_frequency", "Update Frequency:",
choices = c("Ultra High (100ms)" = 100,
"High (250ms)" = 250,
"Normal (1s)" = 1000,
"Low (5s)" = 5000),
selected = 1000),
checkboxGroupInput("active_streams", "Active Data Streams:",
choices = c("Market Data" = "market",
"System Metrics" = "system",
"User Events" = "events",
"Performance Data" = "performance"),
selected = c("market", "system"))
),
div(class = "metrics-card",
h5("Performance Monitoring"),
textOutput("throughput_rate"),
textOutput("latency_stats"),
textOutput("error_rate")
)
)
),
# Main dashboard
column(9,
# Status indicators
fluidRow(
valueBoxOutput("current_throughput", width = 3),
valueBoxOutput("avg_latency", width = 3),
valueBoxOutput("active_connections", width = 3),
valueBoxOutput("system_health", width = 3)
),
# Real-time visualizations
div(class = "realtime-container",
div(class = "performance-indicator",
uiOutput("realtime_status_indicator")
),
tabsetPanel(
tabPanel("Live Data Stream",
plotlyOutput("enterprise_realtime_plot", height = "500px")
),
tabPanel("System Performance",
fluidRow(
column(8,
plotlyOutput("system_performance_plot", height = "400px")
),column(4,
plotlyOutput("performance_gauge", height = "200px"),
br(),
plotlyOutput("throughput_gauge", height = "200px")
)
)
),
tabPanel("Analytics Dashboard",
fluidRow(
column(6,
plotlyOutput("analytics_trend", height = "300px")
),column(6,
plotlyOutput("distribution_plot", height = "300px")
)
),
fluidRow(
column(12,
h4("Real-Time Analytics Summary"),
::dataTableOutput("analytics_summary_table")
DT
)
)
),
tabPanel("Alert Management",
fluidRow(
column(8,
h4("Active Alerts"),
uiOutput("active_alerts_display")
),column(4,
wellPanel(
h5("Alert Configuration"),
numericInput("alert_threshold", "Alert Threshold:",
value = 85, min = 0, max = 100),
selectInput("alert_severity", "Minimum Severity:",
choices = c("Info" = "info",
"Warning" = "warning",
"Critical" = "critical"),
selected = "warning"),
checkboxInput("enable_notifications", "Enable Notifications", TRUE),
actionButton("clear_alerts", "Clear All Alerts",
class = "btn-danger btn-sm")
)
)
)
)
)
)
)
)
)
<- function(input, output, session) {
server
# Enterprise-grade reactive values
<- reactiveValues(
enterprise_data # Data streams
realtime_stream = data.frame(),
system_metrics = data.frame(),
performance_data = data.frame(),
# Performance tracking
throughput_history = numeric(100),
latency_history = numeric(100),
error_count = 0,
total_messages = 0,
# Connection management
connection_status = "disconnected",
last_message_time = NULL,
connection_quality = 100,
# Alert system
active_alerts = list(),
alert_history = data.frame()
)
# Handle WebSocket status updates
observeEvent(input$websocket_status, {
$connection_status <- input$websocket_status
enterprise_data
if(input$websocket_status == "connected") {
$connection_quality <- 100
enterprise_datashowNotification("WebSocket connected successfully",
type = "success", duration = 3)
else if(input$websocket_status == "disconnected") {
} $connection_quality <- max(0, enterprise_data$connection_quality - 20)
enterprise_datashowNotification("WebSocket disconnected - attempting reconnection",
type = "warning", duration = 5)
}
})
# Process real-time data stream
observeEvent(input$realtime_data_stream, {
<- input$realtime_data_stream
stream_data
if(!is.null(stream_data)) {
# Update message tracking
$total_messages <- enterprise_data$total_messages + 1
enterprise_data$last_message_time <- Sys.time()
enterprise_data
# Calculate latency
if(!is.null(stream_data$timestamp)) {
<- as.numeric(Sys.time()) - stream_data$timestamp
latency $latency_history <- c(
enterprise_datatail(enterprise_data$latency_history, 99),
* 1000 # Convert to milliseconds
latency
)
}
# Process data point
<- data.frame(
new_data_point timestamp = Sys.time(),
value = stream_data$data$value,
source = stream_data$source,
latency_ms = tail(enterprise_data$latency_history, 1)
)
# Update data stream
<- enterprise_data$realtime_stream
current_stream <- rbind(current_stream, new_data_point)
updated_stream
# Maintain sliding window
if(nrow(updated_stream) > 2000) {
<- tail(updated_stream, 2000)
updated_stream
}
$realtime_stream <- updated_stream
enterprise_data
# Update throughput calculation
calculate_throughput()
}
})
# Throughput calculation
<- function() {
calculate_throughput
<- Sys.time()
current_time
# Calculate messages per second over last 10 seconds
<- enterprise_data$realtime_stream
recent_data
if(nrow(recent_data) > 1) {
<- recent_data[recent_data$timestamp > (current_time - 10), ]
recent_window
if(nrow(recent_window) > 0) {
<- as.numeric(difftime(max(recent_window$timestamp),
time_span min(recent_window$timestamp),
units = "secs"))
<- if(time_span > 0) nrow(recent_window) / time_span else 0
throughput
$throughput_history <- c(
enterprise_datatail(enterprise_data$throughput_history, 99),
throughput
)
}
}
}
# Value boxes for enterprise metrics
$current_throughput <- renderValueBox({
output
<- tail(enterprise_data$throughput_history, 1)
current_throughput if(length(current_throughput) == 0) current_throughput <- 0
valueBox(
value = paste0(round(current_throughput, 1), "/s"),
subtitle = "Message Throughput",
icon = icon("tachometer-alt"),
color = if(current_throughput > 10) "green" else if(current_throughput > 5) "yellow" else "red"
)
})
$avg_latency <- renderValueBox({
output
<- tail(enterprise_data$latency_history, 10)
recent_latency <- if(length(recent_latency) > 0) mean(recent_latency, na.rm = TRUE) else 0
avg_latency
valueBox(
value = paste0(round(avg_latency, 1), "ms"),
subtitle = "Average Latency",
icon = icon("clock"),
color = if(avg_latency < 100) "green" else if(avg_latency < 500) "yellow" else "red"
)
})
$active_connections <- renderValueBox({
output
<- if(enterprise_data$connection_status == "connected") 1 else 0
connection_count
valueBox(
value = connection_count,
subtitle = "Active Connections",
icon = icon("plug"),
color = if(connection_count > 0) "green" else "red"
)
})
$system_health <- renderValueBox({
output
<- enterprise_data$connection_quality
health_score
valueBox(
value = paste0(health_score, "%"),
subtitle = "System Health",
icon = icon("heartbeat"),
color = if(health_score > 80) "green" else if(health_score > 50) "yellow" else "red"
)
})
# Enterprise real-time plot
$enterprise_realtime_plot <- renderPlotly({
output
<- enterprise_data$realtime_stream
plot_data
if(nrow(plot_data) == 0) {
return(plot_ly() %>%
layout(title = "Waiting for enterprise data stream...") %>%
config(displayModeBar = FALSE))
}
# Use last 500 points for optimal performance
<- tail(plot_data, 500)
display_data
plot_ly(
display_data,x = ~timestamp,
y = ~value,
type = "scattergl",
mode = "lines",
line = list(color = "#2c3e50", width = 2),
hovertemplate = paste(
"<b>Time:</b> %{x}<br>",
"<b>Value:</b> %{y:.4f}<br>",
"<b>Latency:</b> %{customdata:.1f}ms<extra></extra>"
),customdata = ~latency_ms
%>%
) layout(
title = "Enterprise Real-Time Data Stream",
xaxis = list(
title = "Time",
type = "date",
tickformat = "%H:%M:%S"
),yaxis = list(title = "Value"),
hovermode = "x",
showlegend = FALSE
%>%
) config(
displayModeBar = TRUE,
displaylogo = FALSE,
modeBarButtonsToRemove = c("pan2d", "lasso2d", "select2d", "autoScale2d")
)
})
# Status indicator
$realtime_status_indicator <- renderUI({
output
<- enterprise_data$connection_status
status <- tail(enterprise_data$throughput_history, 1)
throughput if(length(throughput) == 0) throughput <- 0
<- switch(status,
indicator_class "connected" = if(throughput > 5) "status-green" else "status-yellow",
"disconnected" = "status-red",
"error" = "status-red",
"status-yellow"
)
<- switch(status,
indicator_text "connected" = "● LIVE",
"disconnected" = "● OFFLINE",
"error" = "● ERROR",
"● UNKNOWN"
)
span(indicator_text, class = paste("h5", indicator_class))
})
# Performance monitoring outputs
$throughput_rate <- renderText({
output<- tail(enterprise_data$throughput_history, 1)
current_throughput if(length(current_throughput) == 0) current_throughput <- 0
paste("Throughput:", round(current_throughput, 2), "msg/sec")
})
$latency_stats <- renderText({
output<- tail(enterprise_data$latency_history, 20)
recent_latency
if(length(recent_latency) > 0) {
<- mean(recent_latency, na.rm = TRUE)
avg_latency paste("Avg Latency:", round(avg_latency, 1), "ms")
else {
} "Latency: N/A"
}
})
$error_rate <- renderText({
output<- enterprise_data$total_messages
total_messages <- enterprise_data$error_count
error_count
<- if(total_messages > 0) (error_count / total_messages) * 100 else 0
error_rate paste("Error Rate:", round(error_rate, 2), "%")
})
# Connection health display
$connection_health <- renderText({
output
<- enterprise_data$connection_status
status <- enterprise_data$connection_quality
quality
<- switch(status,
health_text "connected" = paste("🟢 Connected (", quality, "%)"),
"disconnected" = "🔴 Disconnected",
"error" = "🟡 Connection Error",
"🔵 Unknown Status"
)
health_text
})
# Force reconnect handler
observeEvent(input$force_reconnect, {
runjs("
if(typeof productionWS !== 'undefined') {
productionWS.ws.close();
setTimeout(() => productionWS.connect(), 1000);
}
")
showNotification("Forcing WebSocket reconnection...", type = "message", duration = 3)
})
}
return(list(ui = ui, server = server))
}
Common Issues and Solutions
Issue 1: Memory Leaks in Long-Running Applications
Problem: Real-time applications accumulate data over time, leading to memory exhaustion and performance degradation.
Solution:
Implement proper memory management with sliding windows and garbage collection:
# Memory-efficient data management
<- function(reactive_values, max_points = 1000) {
manage_memory_efficiently
# Sliding window for time series data
if(nrow(reactive_values$data) > max_points) {
$data <- tail(reactive_values$data, max_points)
reactive_values
}
# Periodic cleanup of old objects
observe({
invalidateLater(300000) # Every 5 minutes
# Force garbage collection
gc()
# Clear old cached calculations
if(length(reactive_values$cache) > 100) {
$cache <- tail(reactive_values$cache, 50)
reactive_values
}
}) }
Issue 2: WebSocket Connection Drops and Recovery
Problem: Network interruptions cause WebSocket disconnections, leading to data loss and poor user experience.
Solution:
Implement robust reconnection logic with exponential backoff:
# Robust WebSocket reconnection strategy
<- function() {
implement_connection_recovery
<- list(
reconnection_config max_attempts = 10,
base_delay = 1000, # 1 second
max_delay = 30000, # 30 seconds
backoff_factor = 1.5,
jitter = TRUE
)
# JavaScript reconnection logic
<- "
reconnection_js function attemptReconnection(attempt = 1) {
if(attempt > config.max_attempts) {
console.error('Max reconnection attempts reached');
return;
}
const delay = Math.min(
config.base_delay * Math.pow(config.backoff_factor, attempt - 1),
config.max_delay
);
const jitteredDelay = config.jitter ?
delay + (Math.random() * 1000) : delay;
setTimeout(() => {
console.log(`Reconnection attempt ${attempt}`);
connectWebSocket();
}, jitteredDelay);
}
"
return(reconnection_js)
}
Issue 3: High-Frequency Update Performance
Problem: Very frequent updates (multiple per second) cause UI lag and poor responsiveness.
Solution:
Implement intelligent update throttling and batching:
# Performance optimization for high-frequency updates
<- function() {
optimize_high_frequency_updates
# Throttling mechanism
<- reactiveVal(Sys.time())
last_update <- 100 # Minimum 100ms between UI updates
update_throttle
# Batched update processing
<- reactiveValues(
update_queue pending_updates = list(),
last_flush = Sys.time()
)
# Process updates in batches
observe({
invalidateLater(update_throttle)
<- Sys.time()
current_time <- difftime(current_time, last_update(), units = "secs")
time_since_last
# Only update if enough time has passed
if(time_since_last >= (update_throttle / 1000)) {
# Process all pending updates at once
if(length(update_queue$pending_updates) > 0) {
# Combine updates
<- do.call(rbind, update_queue$pending_updates)
combined_updates
# Update display
process_combined_updates(combined_updates)
# Clear queue
$pending_updates <- list()
update_queuelast_update(current_time)
}
}
}) }
When deploying real-time applications to production, ensure proper resource allocation, implement connection pooling, configure load balancing for WebSocket traffic, and establish comprehensive monitoring and alerting systems.
Common Questions About Real-Time Shiny Applications
You can implement multiple invalidation timers for different parts of your application. Fast-updating components (like live charts) can refresh every second, while summary statistics might update every 30 seconds. Use invalidateLater()
with different intervals in separate observe()
blocks, and consider using reactive expressions to share data processing between components with different update schedules.
Polling uses invalidateLater()
to periodically fetch new data from databases or APIs - it’s simple to implement but creates constant server load. WebSockets establish persistent connections for bidirectional communication, offering lower latency and reduced server overhead for high-frequency updates. Choose polling for moderate update frequencies (every few seconds) and WebSockets for sub-second updates or when you need server-initiated communications.
Implement sliding window data storage using circular buffers or by limiting data frames to a maximum number of rows. Use gc()
periodically to force garbage collection, clear old reactive values and cached calculations, and monitor memory usage with object.size()
. Consider storing only essential data in reactive values and moving historical data to databases or files for long-term storage.
Yes, you can create effective real-time applications using only Shiny’s built-in reactivity with invalidateLater()
, reactiveTimer()
, and database polling. This approach works well for update frequencies of 1-10 seconds and is much simpler to deploy. For sub-second updates or when you need to handle thousands of concurrent users, external WebSocket infrastructure becomes necessary.
Implement exponential backoff reconnection strategies, queue messages during disconnections for later transmission, use heartbeat/ping mechanisms to detect stale connections, and implement data checksums or sequence numbers to detect missing updates. Always include error handling in your data processing pipelines and provide clear user feedback about connection status and data freshness.
Test Your Understanding
You’re building a financial trading dashboard that needs to display stock prices updating multiple times per second. Which architectural approach would provide the best performance and user experience?
- Use
invalidateLater(100)
to poll a REST API every 100ms
- Implement WebSocket connections with client-side data buffering
- Use
reactiveTimer()
with database queries every 500ms
- Poll multiple APIs simultaneously using
invalidateLater(50)
- Consider the frequency of updates needed (multiple times per second)
- Think about server load and network efficiency
- Consider the user experience and responsiveness requirements
- Remember the scalability implications of each approach
B) Implement WebSocket connections with client-side data buffering
For high-frequency financial data (multiple updates per second), WebSocket connections provide the optimal solution:
Why WebSockets are ideal:
- Real-time bidirectional communication eliminates polling overhead
- Lower latency for time-critical financial data
- Efficient bandwidth usage compared to repeated HTTP requests
- Server-initiated updates ensure immediate data delivery
Why other options are suboptimal:
- Option A: 100ms polling creates excessive server load and network traffic
- Option C: 500ms intervals are too slow for high-frequency trading data
- Option D: 50ms polling would overwhelm servers and networks
Implementation considerations: Use message batching, implement connection recovery, and buffer data client-side to handle temporary network issues without losing critical price updates.
Your real-time monitoring application runs 24/7 and accumulates data continuously. After 6 hours, the application becomes slow and eventually crashes. Complete this memory management solution:
<- function(input, output, session) {
server
<- reactiveValues(
realtime_data current_data = data.frame(),
update_counter = 0
)
# Memory management strategy
observe({
_______(______) # How often to clean up?
# Maintain sliding window
if(nrow(realtime_data$current_data) > ______) {
$current_data <- ______(realtime_data$current_data, ______)
realtime_data
}
# Force garbage collection
______()
}) }
- Consider how frequently cleanup should occur (balance performance vs memory)
- Think about appropriate data window sizes for real-time applications
- Remember R’s garbage collection function
- Consider what function keeps the most recent data
<- function(input, output, session) {
server
<- reactiveValues(
realtime_data current_data = data.frame(),
update_counter = 0
)
# Memory management strategy
observe({
invalidateLater(300000) # Clean up every 5 minutes
# Maintain sliding window
if(nrow(realtime_data$current_data) > 1000) {
$current_data <- tail(realtime_data$current_data, 1000)
realtime_data
}
# Force garbage collection
gc()
}) }
Key principles:
- Regular cleanup intervals: 5-minute intervals balance memory management with performance
- Sliding window size: 1000 points provides sufficient history without excessive memory usage
- Forced garbage collection:
gc()
ensures memory is actually freed - Proactive management: Clean up before memory issues occur, not after
Your WebSocket connection frequently drops due to network issues. Design a robust reconnection strategy that handles temporary outages gracefully while preventing overwhelming the server during extended outages.
What elements should your reconnection strategy include, and in what order should they be implemented?
- Immediate reconnection attempts with fixed 1-second intervals
- Exponential backoff with maximum delay limits and jitter
- Linear backoff with connection health monitoring
- Random delay intervals with no maximum attempt limits
- Consider what happens if many clients reconnect simultaneously
- Think about server load during network outages
- Consider how to handle both brief and extended outages
- Remember the importance of preventing connection storms
B) Exponential backoff with maximum delay limits and jitter
A robust reconnection strategy should include:
1. Exponential Backoff Pattern:
const delays = [1000, 2000, 4000, 8000, 16000, 30000]; // 1s to 30s max
const attempt_delay = delays[Math.min(attempt - 1, delays.length - 1)];
2. Jitter Addition:
const jittered_delay = attempt_delay + (Math.random() * 1000);
3. Maximum Attempt Limits:
if(reconnect_attempts > 10) {
console.error('Max reconnection attempts reached');
return;
}
4. Connection Health Monitoring:
// Heartbeat mechanism
setInterval(() => {
if(websocket.readyState === WebSocket.OPEN) {
.send(JSON.stringify({type: 'heartbeat'}));
websocket
}, 30000); }
Why this approach works:
- Exponential backoff reduces server load during outages
- Jitter prevents connection storms when networks recover
- Maximum delays ensure eventual reconnection attempts
- Attempt limits prevent infinite retry loops
- Health monitoring detects stale connections proactively
Conclusion
Real-time data applications represent the pinnacle of interactive analytics, transforming static dashboards into dynamic monitoring systems that provide immediate insights and enable rapid decision-making. Through this comprehensive guide, you’ve mastered the complete spectrum of real-time application development, from basic automatic refresh patterns to sophisticated WebSocket architectures that handle high-frequency data streams with enterprise-grade reliability.
The techniques you’ve learned—streaming data processing, memory-efficient data management, robust connection handling, and performance optimization—form the foundation for building monitoring systems that rival commercial solutions. Whether you’re creating financial trading platforms, IoT monitoring dashboards, or operational intelligence systems, these real-time capabilities enable you to deliver applications that provide genuine competitive advantage through timely information delivery.
Your understanding of real-time architecture patterns, combined with practical implementation experience, positions you to tackle the most demanding interactive application requirements while maintaining the analytical power and flexibility that makes R-based solutions superior for data-driven organizations.
Next Steps
Based on your mastery of real-time data handling, here are the recommended paths for continuing your advanced Shiny development journey:
Immediate Next Steps (Complete These First)
- Advanced Shiny Modules for Scalable Apps - Build modular real-time systems that scale across enterprise applications
- Production Deployment Strategies - Deploy your real-time applications to production environments with proper scaling and monitoring
- Practice Exercise: Extend your real-time application to include multiple data streams with different update frequencies and implement a comprehensive alerting system
Building on Your Foundation (Choose Your Path)
For High-Performance Applications:
For Enterprise Integration:
For Advanced Interactivity:
Long-term Goals (2-4 Weeks)
- Build a complete real-time monitoring system for your domain (financial, IoT, operational, etc.)
- Implement a multi-user real-time collaboration platform using Shiny
- Create a high-frequency data processing pipeline that scales to thousands of concurrent users
- Contribute to the Shiny community by sharing your real-time application patterns and optimizations
Explore More Articles
Here are more articles from the same category to help you dive deeper into the topic.
Reuse
Citation
@online{kassambara2025,
author = {Kassambara, Alboukadel},
title = {Real-Time {Data} and {Live} {Updates} in {Shiny:} {Build}
{Dynamic} {Monitoring} {Systems}},
date = {2025-05-23},
url = {https://www.datanovia.com/learn/tools/shiny-apps/interactive-features/real-time-updates.html},
langid = {en}
}