flowchart TD
A[Data Sources] --> B[Data Ingestion Layer]
B --> C[Stream Processing]
C --> D[Data Storage/Cache]
D --> E[Shiny Server Logic]
E --> F[Real-Time UI Updates]
F --> G[User Interactions]
G --> H[Feedback Loop]
I[Update Mechanisms] --> J[Automatic Refresh]
I --> K[WebSocket Connections]
I --> L[Server-Sent Events]
I --> M[Polling Strategies]
N[Performance Layers] --> O[Memory Management]
N --> P[Data Sampling]
N --> Q[Connection Pooling]
N --> R[Caching Strategies]
S[Monitoring Systems] --> T[Health Checks]
S --> U[Alert Management]
S --> V[Performance Metrics]
S --> W[Error Recovery]
style A fill:#e1f5fe
style F fill:#e8f5e8
style I fill:#fff3e0
style N fill:#f3e5f5
style S fill:#fce4ec
Key Takeaways
- Live Data Intelligence: Real-time applications provide immediate insights into changing business conditions, enabling rapid response to critical situations and opportunities
- Streaming Architecture: Advanced data handling techniques manage continuous data flows efficiently while maintaining application responsiveness and stability
- Performance at Scale: Optimized memory management and intelligent data sampling ensure smooth operation even with high-frequency updates and large data volumes
- Interactive Monitoring: Dynamic visualizations and automated alerting systems create comprehensive monitoring platforms that rival commercial solutions
- Production Reliability: Robust error handling and connection management ensure continuous operation in mission-critical monitoring environments
Introduction
Real-time data applications represent the cutting edge of business intelligence, transforming static reporting into dynamic monitoring systems that respond instantly to changing conditions. While traditional dashboards provide valuable historical insights, real-time applications enable organizations to detect patterns, respond to alerts, and make decisions based on current data streams rather than outdated snapshots.
This comprehensive guide covers the complete spectrum of real-time application development in Shiny, from basic automatic refresh patterns to sophisticated streaming data architectures that handle high-frequency updates with WebSocket connections. You’ll master the techniques needed to build monitoring systems, trading platforms, IoT dashboards, and operational intelligence applications that provide genuine real-time value.
Whether you’re building financial trading dashboards that need millisecond updates, manufacturing monitoring systems that track production metrics, or social media analytics platforms that process live data streams, understanding real-time application architecture is essential for creating tools that provide competitive advantage through timely information.
Understanding Real-Time Application Architecture
Real-time Shiny applications involve sophisticated data flow management that balances immediate responsiveness with system stability and performance optimization.
Core Real-Time Components
Data Ingestion: Efficient systems for receiving continuous data streams from databases, APIs, message queues, or direct sensor feeds.
Stream Processing: Real-time data transformation, filtering, and aggregation that prepares raw data streams for visualization and analysis.
Update Management: Intelligent refresh strategies that balance data freshness with application performance and user experience.
Connection Handling: Robust connection management that maintains data flow integrity even during network interruptions or system stress.
Strategic Architecture Approaches
Polling-Based Updates: Simple timer-based refresh patterns suitable for moderate update frequencies and stable data sources.
Push-Based Streaming: Advanced WebSocket or Server-Sent Event implementations for high-frequency updates and bidirectional communication.
Hybrid Systems: Combined approaches that optimize different data streams based on their characteristics and business requirements.
Foundation Real-Time Implementation
Start with core patterns that demonstrate essential real-time concepts while providing the foundation for advanced streaming architectures.
Basic Automatic Refresh Systems
library(shiny)
library(plotly)
library(DT)
library(dplyr)
ui <- fluidPage(
titlePanel("Real-Time Monitoring Dashboard"),
fluidRow(
column(4,
wellPanel(
h4("Update Controls"),
selectInput("update_frequency", "Update Frequency:",
choices = c("Real-time (1s)" = 1000,
"Fast (5s)" = 5000,
"Normal (10s)" = 10000,
"Slow (30s)" = 30000),
selected = 5000),
checkboxInput("auto_update", "Auto Update", value = TRUE),
actionButton("manual_refresh", "Manual Refresh",
class = "btn-primary"),
br(), br(),
h5("System Status"),
textOutput("last_update"),
textOutput("update_count"),
textOutput("data_points")
)
),
column(8,
# Real-time metrics
fluidRow(
valueBoxOutput("current_value", width = 4),
valueBoxOutput("trend_indicator", width = 4),
valueBoxOutput("alert_status", width = 4)
)
)
),
fluidRow(
column(8,
tabsetPanel(
tabPanel("Live Chart",
plotlyOutput("realtime_plot", height = "400px")
),
tabPanel("Data Stream",
DT::dataTableOutput("realtime_table")
),
tabPanel("System Metrics",
plotlyOutput("system_metrics", height = "400px")
)
)
),
column(4,
wellPanel(
h4("Live Alerts"),
div(id = "alerts_container",
uiOutput("live_alerts")
)
),
wellPanel(
h4("Performance Monitor"),
plotlyOutput("performance_gauge", height = "200px")
)
)
)
)
server <- function(input, output, session) {
# Reactive values for real-time data
realtime_data <- reactiveValues(
current_data = data.frame(),
update_counter = 0,
last_update_time = Sys.time(),
system_metrics = data.frame(),
alerts = list()
)
# Initialize with sample data
observe({
initial_data <- generate_initial_data()
realtime_data$current_data <- initial_data
realtime_data$system_metrics <- generate_system_metrics()
})
# Automatic update mechanism
observe({
# Only update if auto-update is enabled
req(input$auto_update)
# Get update frequency from input
frequency <- as.numeric(input$update_frequency)
invalidateLater(frequency)
# Generate new data point
new_data <- generate_realtime_datapoint()
# Update current data with rolling window
current_data <- realtime_data$current_data
updated_data <- rbind(current_data, new_data)
# Keep only last 100 points for performance
if(nrow(updated_data) > 100) {
updated_data <- tail(updated_data, 100)
}
realtime_data$current_data <- updated_data
realtime_data$update_counter <- realtime_data$update_counter + 1
realtime_data$last_update_time <- Sys.time()
# Update system metrics
realtime_data$system_metrics <- rbind(
realtime_data$system_metrics,
data.frame(
timestamp = Sys.time(),
cpu_usage = runif(1, 20, 80),
memory_usage = runif(1, 30, 70),
network_io = runif(1, 10, 90)
)
)
# Keep system metrics window
if(nrow(realtime_data$system_metrics) > 50) {
realtime_data$system_metrics <- tail(realtime_data$system_metrics, 50)
}
# Check for alerts
check_and_generate_alerts(new_data)
})
# Manual refresh handler
observeEvent(input$manual_refresh, {
# Force immediate update
new_data <- generate_realtime_datapoint()
current_data <- realtime_data$current_data
updated_data <- rbind(current_data, new_data)
if(nrow(updated_data) > 100) {
updated_data <- tail(updated_data, 100)
}
realtime_data$current_data <- updated_data
realtime_data$update_counter <- realtime_data$update_counter + 1
realtime_data$last_update_time <- Sys.time()
showNotification("Data refreshed manually", type = "message", duration = 2)
})
# Value boxes
output$current_value <- renderValueBox({
current_data <- realtime_data$current_data
if(nrow(current_data) > 0) {
latest_value <- tail(current_data$value, 1)
valueBox(
value = round(latest_value, 2),
subtitle = "Current Value",
icon = icon("tachometer-alt"),
color = if(latest_value > 75) "red" else if(latest_value > 50) "yellow" else "green"
)
} else {
valueBox(
value = "N/A",
subtitle = "Current Value",
icon = icon("tachometer-alt"),
color = "blue"
)
}
})
output$trend_indicator <- renderValueBox({
current_data <- realtime_data$current_data
if(nrow(current_data) >= 2) {
recent_values <- tail(current_data$value, 2)
trend <- recent_values[2] - recent_values[1]
trend_pct <- round((trend / recent_values[1]) * 100, 1)
valueBox(
value = paste0(ifelse(trend >= 0, "+", ""), trend_pct, "%"),
subtitle = "Trend",
icon = icon(if(trend >= 0) "arrow-up" else "arrow-down"),
color = if(trend >= 0) "green" else "red"
)
} else {
valueBox(
value = "0%",
subtitle = "Trend",
icon = icon("minus"),
color = "blue"
)
}
})
output$alert_status <- renderValueBox({
alert_count <- length(realtime_data$alerts)
valueBox(
value = alert_count,
subtitle = "Active Alerts",
icon = icon("exclamation-triangle"),
color = if(alert_count > 0) "red" else "green"
)
})
# Real-time plot
output$realtime_plot <- renderPlotly({
data <- realtime_data$current_data
if(nrow(data) == 0) return(NULL)
plot_ly(
data = data,
x = ~timestamp,
y = ~value,
type = "scatter",
mode = "lines+markers",
line = list(color = "#2ecc71", width = 3),
marker = list(color = "#27ae60", size = 6),
hovertemplate = "<b>%{x}</b><br>Value: %{y:.2f}<extra></extra>"
) %>%
layout(
title = "Real-Time Data Stream",
xaxis = list(
title = "Time",
type = "date",
tickformat = "%H:%M:%S"
),
yaxis = list(title = "Value"),
hovermode = "x",
showlegend = FALSE
) %>%
config(displayModeBar = FALSE)
})
# Real-time data table
output$realtime_table <- DT::renderDataTable({
data <- realtime_data$current_data
if(nrow(data) == 0) return(NULL)
# Show most recent data first
display_data <- data %>%
arrange(desc(timestamp)) %>%
mutate(
timestamp = format(timestamp, "%H:%M:%S"),
value = round(value, 3)
) %>%
head(20)
DT::datatable(
display_data,
options = list(
pageLength = 10,
searching = FALSE,
ordering = FALSE,
info = FALSE,
dom = 't',
scrollY = "300px"
),
colnames = c("Time", "Value", "Status"),
rownames = FALSE
) %>%
DT::formatStyle(
"value",
backgroundColor = DT::styleInterval(
cuts = c(25, 50, 75),
values = c("lightgreen", "lightyellow", "orange", "lightcoral")
)
)
})
# System metrics visualization
output$system_metrics <- renderPlotly({
metrics_data <- realtime_data$system_metrics
if(nrow(metrics_data) == 0) return(NULL)
plot_ly(metrics_data, x = ~timestamp) %>%
add_lines(y = ~cpu_usage, name = "CPU Usage",
line = list(color = "#e74c3c")) %>%
add_lines(y = ~memory_usage, name = "Memory Usage",
line = list(color = "#3498db")) %>%
add_lines(y = ~network_io, name = "Network I/O",
line = list(color = "#2ecc71")) %>%
layout(
title = "System Performance Metrics",
xaxis = list(title = "Time"),
yaxis = list(title = "Usage (%)", range = c(0, 100)),
hovermode = "x unified"
)
})
# Live alerts display
output$live_alerts <- renderUI({
alerts <- realtime_data$alerts
if(length(alerts) == 0) {
return(div(
class = "alert alert-success",
icon("check-circle"),
" All systems normal"
))
}
alert_ui <- lapply(alerts, function(alert) {
div(
class = paste("alert", paste0("alert-", alert$severity)),
strong(alert$title),
br(),
alert$message,
br(),
small(paste("Time:", format(alert$timestamp, "%H:%M:%S")))
)
})
return(do.call(tagList, alert_ui))
})
# Performance gauge
output$performance_gauge <- renderPlotly({
metrics_data <- realtime_data$system_metrics
if(nrow(metrics_data) == 0) {
current_cpu <- 0
} else {
current_cpu <- tail(metrics_data$cpu_usage, 1)
}
plot_ly(
type = "indicator",
mode = "gauge+number",
value = current_cpu,
domain = list(x = c(0, 1), y = c(0, 1)),
title = list(text = "CPU Usage"),
gauge = list(
axis = list(range = list(NULL, 100)),
bar = list(color = "darkblue"),
steps = list(
list(range = c(0, 50), color = "lightgray"),
list(range = c(50, 80), color = "yellow"),
list(range = c(80, 100), color = "red")
),
threshold = list(
line = list(color = "red", width = 4),
thickness = 0.75,
value = 90
)
)
) %>%
layout(
margin = list(l = 20, r = 20, t = 40, b = 20),
font = list(color = "darkblue", family = "Arial")
)
})
# Status outputs
output$last_update <- renderText({
paste("Last Update:", format(realtime_data$last_update_time, "%H:%M:%S"))
})
output$update_count <- renderText({
paste("Updates:", realtime_data$update_counter)
})
output$data_points <- renderText({
paste("Data Points:", nrow(realtime_data$current_data))
})
# Helper functions
generate_initial_data <- function() {
n <- 20
data.frame(
timestamp = seq(Sys.time() - n, Sys.time(), length.out = n),
value = cumsum(rnorm(n, 0, 5)) + 50,
status = sample(c("Normal", "Warning", "Critical"), n, replace = TRUE, prob = c(0.8, 0.15, 0.05))
)
}
generate_realtime_datapoint <- function() {
# Get last value for trend continuation
current_data <- realtime_data$current_data
last_value <- if(nrow(current_data) > 0) tail(current_data$value, 1) else 50
# Generate new value with some trend
new_value <- last_value + rnorm(1, 0, 3)
new_value <- max(0, min(100, new_value)) # Constrain to 0-100
data.frame(
timestamp = Sys.time(),
value = new_value,
status = if(new_value > 80) "Critical" else if(new_value > 60) "Warning" else "Normal"
)
}
generate_system_metrics <- function() {
n <- 10
data.frame(
timestamp = seq(Sys.time() - n*10, Sys.time(), length.out = n),
cpu_usage = runif(n, 20, 60),
memory_usage = runif(n, 30, 50),
network_io = runif(n, 10, 40)
)
}
check_and_generate_alerts <- function(new_data) {
# Clear old alerts (older than 1 minute)
current_time <- Sys.time()
realtime_data$alerts <- realtime_data$alerts[
sapply(realtime_data$alerts, function(x) difftime(current_time, x$timestamp, units = "secs") < 60)
]
# Check for new alerts
if(new_data$value > 90) {
new_alert <- list(
title = "High Value Alert",
message = paste("Value exceeded threshold:", round(new_data$value, 2)),
severity = "danger",
timestamp = Sys.time()
)
realtime_data$alerts <- append(realtime_data$alerts, list(new_alert))
# Show notification
showNotification(
paste("Alert: High value detected -", round(new_data$value, 2)),
type = "error",
duration = 5
)
}
}
}
shinyApp(ui = ui, server = server)# Advanced WebSocket implementation for high-frequency updates
library(shiny)
library(websocket)
library(jsonlite)
# WebSocket server setup (would run separately in production)
create_websocket_server <- function(port = 8080) {
# This would typically be a separate process
# For demonstration, showing the structure
websocket_server <- function(ws) {
cat("WebSocket connection opened\n")
# Send initial data
ws$send(toJSON(list(
type = "initial_data",
data = generate_sample_data(50)
)))
# Set up periodic data sending
timer <- later::later(function() {
if(ws$readyState() == 1) { # Connection open
# Generate and send new data
new_data <- generate_realtime_point()
ws$send(toJSON(list(
type = "data_update",
data = new_data,
timestamp = as.numeric(Sys.time())
)))
# Schedule next update
later::later(function() {
# Recursive scheduling for continuous updates
}, delay = 1) # 1 second intervals
}
}, delay = 1)
ws$onMessage(function(binary, message) {
cat("Received message:", message, "\n")
# Handle client messages
parsed_message <- fromJSON(message)
if(parsed_message$type == "subscribe") {
# Handle subscription requests
ws$send(toJSON(list(
type = "subscription_confirmed",
channels = parsed_message$channels
)))
}
})
ws$onClose(function() {
cat("WebSocket connection closed\n")
})
}
return(websocket_server)
}
# Shiny application with WebSocket integration
ui <- fluidPage(
# Include WebSocket client library
tags$head(
tags$script(src = "https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.0/socket.io.js"),
tags$script(HTML("
var websocket;
var connected = false;
// WebSocket connection management
function connectWebSocket() {
websocket = new WebSocket('ws://localhost:8080');
websocket.onopen = function(event) {
console.log('WebSocket connected');
connected = true;
Shiny.setInputValue('websocket_status', 'connected');
// Subscribe to data streams
websocket.send(JSON.stringify({
type: 'subscribe',
channels: ['realtime_data', 'system_metrics']
}));
};
websocket.onmessage = function(event) {
var message = JSON.parse(event.data);
// Send data to Shiny
if(message.type === 'data_update') {
Shiny.setInputValue('websocket_data', {
data: message.data,
timestamp: message.timestamp
});
} else if(message.type === 'initial_data') {
Shiny.setInputValue('websocket_initial', message.data);
}
};
websocket.onclose = function(event) {
console.log('WebSocket disconnected');
connected = false;
Shiny.setInputValue('websocket_status', 'disconnected');
// Attempt reconnection after delay
setTimeout(connectWebSocket, 5000);
};
websocket.onerror = function(error) {
console.error('WebSocket error:', error);
Shiny.setInputValue('websocket_status', 'error');
};
}
// Initialize WebSocket connection when page loads
$(document).ready(function() {
connectWebSocket();
});
"))
),
titlePanel("WebSocket Real-Time Dashboard"),
fluidRow(
column(4,
wellPanel(
h4("Connection Status"),
textOutput("connection_status"),
br(),
h4("Stream Controls"),
checkboxGroupInput("data_streams", "Active Streams:",
choices = c("Market Data" = "market",
"System Metrics" = "system",
"User Activity" = "activity"),
selected = c("market", "system")),
actionButton("reconnect_ws", "Reconnect WebSocket",
class = "btn-warning")
)
),
column(8,
# Real-time metrics cards
fluidRow(
valueBoxOutput("ws_current_value", width = 4),
valueBoxOutput("ws_update_rate", width = 4),
valueBoxOutput("ws_data_points", width = 4)
)
)
),
fluidRow(
column(8,
tabsetPanel(
tabPanel("Live Stream",
plotlyOutput("websocket_plot", height = "400px")
),
tabPanel("High Frequency",
plotlyOutput("hf_plot", height = "400px")
),
tabPanel("System Monitor",
plotlyOutput("system_monitor", height = "400px")
)
)
),
column(4,
wellPanel(
h4("Stream Statistics"),
verbatimTextOutput("stream_stats")
),
wellPanel(
h4("Latest Data"),
DT::dataTableOutput("latest_data")
)
)
)
)
server <- function(input, output, session) {
# WebSocket data storage
websocket_data <- reactiveValues(
current_data = data.frame(),
connection_status = "disconnected",
update_count = 0,
last_update = NULL,
data_rate = 0
)
# Handle WebSocket status updates
observeEvent(input$websocket_status, {
websocket_data$connection_status <- input$websocket_status
})
# Handle initial WebSocket data
observeEvent(input$websocket_initial, {
initial_data <- input$websocket_initial
if(!is.null(initial_data)) {
# Convert from JSON structure to data frame
df <- data.frame(
timestamp = as.POSIXct(initial_data$timestamp, origin = "1970-01-01"),
value = unlist(initial_data$values),
source = "websocket"
)
websocket_data$current_data <- df
}
})
# Handle real-time WebSocket data updates
observeEvent(input$websocket_data, {
ws_update <- input$websocket_data
if(!is.null(ws_update)) {
# Create new data point
new_point <- data.frame(
timestamp = as.POSIXct(ws_update$timestamp, origin = "1970-01-01"),
value = ws_update$data$value,
source = "websocket"
)
# Add to existing data
current_data <- websocket_data$current_data
updated_data <- rbind(current_data, new_point)
# Maintain rolling window for performance
if(nrow(updated_data) > 200) {
updated_data <- tail(updated_data, 200)
}
websocket_data$current_data <- updated_data
websocket_data$update_count <- websocket_data$update_count + 1
websocket_data$last_update <- Sys.time()
# Calculate update rate
if(websocket_data$update_count > 1) {
time_diff <- as.numeric(difftime(Sys.time(), websocket_data$last_update, units = "secs"))
websocket_data$data_rate <- 1 / max(time_diff, 0.1) # Updates per second
}
}
})
# Connection status display
output$connection_status <- renderText({
status <- websocket_data$connection_status
status_text <- switch(status,
"connected" = "🟢 Connected",
"disconnected" = "🔴 Disconnected",
"error" = "🟡 Error",
"🔵 Unknown"
)
paste("WebSocket:", status_text)
})
# WebSocket value boxes
output$ws_current_value <- renderValueBox({
data <- websocket_data$current_data
if(nrow(data) > 0) {
current_value <- tail(data$value, 1)
valueBox(
value = round(current_value, 2),
subtitle = "Current Value",
icon = icon("chart-line"),
color = "blue"
)
} else {
valueBox(
value = "N/A",
subtitle = "Current Value",
icon = icon("chart-line"),
color = "light-blue"
)
}
})
output$ws_update_rate <- renderValueBox({
rate <- websocket_data$data_rate
valueBox(
value = paste0(round(rate, 1), "/s"),
subtitle = "Update Rate",
icon = icon("tachometer-alt"),
color = if(rate > 0.5) "green" else "yellow"
)
})
output$ws_data_points <- renderValueBox({
data_count <- nrow(websocket_data$current_data)
valueBox(
value = data_count,
subtitle = "Data Points",
icon = icon("database"),
color = "purple"
)
})
# WebSocket real-time plot
output$websocket_plot <- renderPlotly({
data <- websocket_data$current_data
if(nrow(data) == 0) {
return(plot_ly() %>%
layout(title = "Waiting for WebSocket data...") %>%
config(displayModeBar = FALSE))
}
plot_ly(
data = data,
x = ~timestamp,
y = ~value,
type = "scatter",
mode = "lines",
line = list(color = "#3498db", width = 2),
hovertemplate = "<b>%{x}</b><br>Value: %{y:.4f}<extra></extra>"
) %>%
layout(
title = "WebSocket Live Data Stream",
xaxis = list(
title = "Time",
type = "date",
tickformat = "%H:%M:%S"
),
yaxis = list(title = "Value"),
hovermode = "x"
) %>%
config(displayModeBar = FALSE)
})
# Stream statistics
output$stream_stats <- renderPrint({
data <- websocket_data$current_data
cat("Connection Status:", websocket_data$connection_status, "\n")
cat("Total Updates:", websocket_data$update_count, "\n")
cat("Data Points:", nrow(data), "\n")
cat("Update Rate:", round(websocket_data$data_rate, 2), "updates/sec\n")
if(!is.null(websocket_data$last_update)) {
cat("Last Update:", format(websocket_data$last_update, "%H:%M:%S"), "\n")
}
if(nrow(data) > 1) {
cat("\nData Statistics:\n")
cat("Min Value:", round(min(data$value), 4), "\n")
cat("Max Value:", round(max(data$value), 4), "\n")
cat("Mean Value:", round(mean(data$value), 4), "\n")
cat("Std Dev:", round(sd(data$value), 4), "\n")
}
})
# Latest data table
output$latest_data <- DT::renderDataTable({
data <- websocket_data$current_data
if(nrow(data) == 0) return(NULL)
latest_data <- data %>%
arrange(desc(timestamp)) %>%
head(10) %>%
mutate(
timestamp = format(timestamp, "%H:%M:%S.%OS3"),
value = round(value, 4)
)
DT::datatable(
latest_data,
options = list(
pageLength = 5,
searching = FALSE,
ordering = FALSE,
info = FALSE,
dom = 't',
scrollY = "200px"
),
colnames = c("Time", "Value", "Source"),
rownames = FALSE
)
})
# Reconnect WebSocket handler
observeEvent(input$reconnect_ws, {
runjs("
if(websocket) {
websocket.close();
}
setTimeout(connectWebSocket, 1000);
")
showNotification("Reconnecting WebSocket...", type = "message", duration = 3)
})
}Streaming Data Processing and Analytics
Implement sophisticated real-time data processing that transforms raw streams into actionable insights:
server <- function(input, output, session) {
# Advanced streaming data processing
stream_processor <- reactiveValues(
raw_buffer = list(),
processed_data = data.frame(),
analytics_cache = list(),
processing_stats = list(
total_processed = 0,
processing_rate = 0,
error_count = 0,
last_error = NULL
)
)
# Real-time data ingestion with buffering
ingest_streaming_data <- function(new_data_batch) {
tryCatch({
# Add to buffer
stream_processor$raw_buffer <- append(stream_processor$raw_buffer, list(new_data_batch))
# Process buffer when it reaches threshold
if(length(stream_processor$raw_buffer) >= 10) {
# Combine buffered data
combined_data <- do.call(rbind, stream_processor$raw_buffer)
# Apply real-time analytics
processed_batch <- apply_streaming_analytics(combined_data)
# Update processed data store
update_processed_data(processed_batch)
# Clear buffer
stream_processor$raw_buffer <- list()
# Update processing statistics
stream_processor$processing_stats$total_processed <-
stream_processor$processing_stats$total_processed + nrow(combined_data)
}
}, error = function(e) {
stream_processor$processing_stats$error_count <-
stream_processor$processing_stats$error_count + 1
stream_processor$processing_stats$last_error <- e$message
# Log error but continue processing
cat("Streaming processing error:", e$message, "\n")
})
}
# Advanced analytics pipeline
apply_streaming_analytics <- function(raw_data) {
# 1. Data cleansing and validation
clean_data <- raw_data %>%
filter(!is.na(value), !is.infinite(value)) %>%
mutate(
# Outlier detection using rolling statistics
rolling_mean = zoo::rollmean(value, k = 5, fill = NA, align = "right"),
rolling_sd = zoo::rollapply(value, width = 5, FUN = sd, fill = NA, align = "right"),
z_score = (value - rolling_mean) / rolling_sd,
is_outlier = abs(z_score) > 3
)
# 2. Feature engineering
enriched_data <- clean_data %>%
mutate(
# Trend indicators
momentum = value - lag(value, 1),
acceleration = momentum - lag(momentum, 1),
# Statistical features
percentile_rank = percent_rank(value),
moving_avg_5 = zoo::rollmean(value, k = 5, fill = NA, align = "right"),
moving_avg_20 = zoo::rollmean(value, k = 20, fill = NA, align = "right"),
# Volatility measures
volatility = zoo::rollapply(value, width = 10, FUN = sd, fill = NA, align = "right"),
# Signal classification
signal = case_when(
value > moving_avg_20 + 2*volatility ~ "strong_buy",
value > moving_avg_20 + volatility ~ "buy",
value < moving_avg_20 - 2*volatility ~ "strong_sell",
value < moving_avg_20 - volatility ~ "sell",
TRUE ~ "hold"
)
)
# 3. Pattern detection
pattern_data <- enriched_data %>%
mutate(
# Trend patterns
trend_direction = case_when(
moving_avg_5 > moving_avg_20 ~ "uptrend",
moving_avg_5 < moving_avg_20 ~ "downtrend",
TRUE ~ "sideways"
),
# Support/resistance levels
local_max = zoo::rollapply(value, width = 5, FUN = function(x) which.max(x) == 3,
fill = FALSE, align = "center"),
local_min = zoo::rollapply(value, width = 5, FUN = function(x) which.min(x) == 3,
fill = FALSE, align = "center")
)
return(pattern_data)
}
# Update processed data with sliding window
update_processed_data <- function(new_processed_data) {
current_data <- stream_processor$processed_data
combined_data <- rbind(current_data, new_processed_data)
# Maintain sliding window of last N points
max_points <- 1000
if(nrow(combined_data) > max_points) {
combined_data <- tail(combined_data, max_points)
}
stream_processor$processed_data <- combined_data
# Update analytics cache
update_analytics_cache(combined_data)
}
# Real-time analytics caching
update_analytics_cache <- function(data) {
if(nrow(data) == 0) return()
# Calculate key metrics
stream_processor$analytics_cache <- list(
# Basic statistics
current_value = tail(data$value, 1),
min_value = min(data$value, na.rm = TRUE),
max_value = max(data$value, na.rm = TRUE),
mean_value = mean(data$value, na.rm = TRUE),
median_value = median(data$value, na.rm = TRUE),
std_dev = sd(data$value, na.rm = TRUE),
# Trend analysis
current_trend = tail(data$trend_direction, 1),
momentum = tail(data$momentum, 1),
volatility = tail(data$volatility, 1),
# Signal analysis
current_signal = tail(data$signal, 1),
buy_signals = sum(data$signal %in% c("buy", "strong_buy"), na.rm = TRUE),
sell_signals = sum(data$signal %in% c("sell", "strong_sell"), na.rm = TRUE),
# Pattern detection
recent_peaks = sum(tail(data$local_max, 20), na.rm = TRUE),
recent_troughs = sum(tail(data$local_min, 20), na.rm = TRUE),
# Quality metrics
outlier_rate = mean(data$is_outlier, na.rm = TRUE),
data_quality_score = 1 - mean(is.na(data$value))
)
}
# Real-time alert system
realtime_alerts <- reactiveValues(
active_alerts = list(),
alert_history = data.frame()
)
# Monitor for alert conditions
observe({
cache <- stream_processor$analytics_cache
if(length(cache) == 0) return()
# Check alert conditions
new_alerts <- list()
# Volatility alert
if(!is.null(cache$volatility) && !is.na(cache$volatility) && cache$volatility > 10) {
new_alerts <- append(new_alerts, list(list(
id = paste0("volatility_", Sys.time()),
type = "volatility",
severity = "warning",
title = "High Volatility Detected",
message = paste("Current volatility:", round(cache$volatility, 2)),
timestamp = Sys.time(),
value = cache$volatility
)))
}
# Trend change alert
if(!is.null(cache$current_trend) && cache$current_trend != "sideways") {
# Check if trend changed recently
processed_data <- stream_processor$processed_data
if(nrow(processed_data) > 10) {
previous_trend <- processed_data$trend_direction[nrow(processed_data) - 5]
if(!is.na(previous_trend) && previous_trend != cache$current_trend) {
new_alerts <- append(new_alerts, list(list(
id = paste0("trend_", Sys.time()),
type = "trend_change",
severity = "info",
title = "Trend Change Detected",
message = paste("New trend:", cache$current_trend),
timestamp = Sys.time(),
value = cache$current_trend
)))
}
}
}
# Signal alert
if(!is.null(cache$current_signal) && cache$current_signal %in% c("strong_buy", "strong_sell")) {
new_alerts <- append(new_alerts, list(list(
id = paste0("signal_", Sys.time()),
type = "trading_signal",
severity = if(cache$current_signal == "strong_buy") "success" else "danger",
title = "Strong Trading Signal",
message = paste("Signal:", toupper(gsub("_", " ", cache$current_signal))),
timestamp = Sys.time(),
value = cache$current_signal
)))
}
# Add new alerts to active list
if(length(new_alerts) > 0) {
realtime_alerts$active_alerts <- append(realtime_alerts$active_alerts, new_alerts)
# Show notifications
for(alert in new_alerts) {
showNotification(
ui = div(
strong(alert$title),
br(),
alert$message
),
type = switch(alert$severity,
"success" = "message",
"info" = "message",
"warning" = "warning",
"danger" = "error"),
duration = 8
)
}
}
# Clean up old alerts (keep only last 50)
if(length(realtime_alerts$active_alerts) > 50) {
realtime_alerts$active_alerts <- tail(realtime_alerts$active_alerts, 50)
}
})
# Advanced visualization outputs
output$streaming_analytics_plot <- renderPlotly({
data <- stream_processor$processed_data
if(nrow(data) == 0) return(NULL)
# Create multi-series plot with analytics
plot_ly(data, x = ~timestamp) %>%
# Main value line
add_lines(y = ~value, name = "Value",
line = list(color = "#2c3e50", width = 2)) %>%
# Moving averages
add_lines(y = ~moving_avg_5, name = "MA(5)",
line = list(color = "#3498db", width = 1, dash = "dot")) %>%
add_lines(y = ~moving_avg_20, name = "MA(20)",
line = list(color = "#e74c3c", width = 1, dash = "dash")) %>%
# Volatility bands
add_ribbons(ymin = ~moving_avg_20 - volatility,
ymax = ~moving_avg_20 + volatility,
name = "Volatility Band",
fillcolor = "rgba(52, 152, 219, 0.1)",
line = list(color = "transparent")) %>%
# Signal markers
add_markers(data = data[data$signal %in% c("strong_buy", "strong_sell"), ],
y = ~value, color = ~signal,
colors = c("strong_buy" = "green", "strong_sell" = "red"),
marker = list(size = 8, symbol = "triangle-up"),
name = "Signals") %>%
layout(
title = "Real-Time Analytics Dashboard",
xaxis = list(title = "Time"),
yaxis = list(title = "Value"),
hovermode = "x unified",
showlegend = TRUE
)
})
# Analytics summary output
output$analytics_summary <- renderUI({
cache <- stream_processor$analytics_cache
if(length(cache) == 0) {
return(div("No analytics data available"))
}
div(
class = "analytics-summary",
fluidRow(
column(6,
h5("Current Metrics"),
p(strong("Value:"), round(cache$current_value %||% 0, 4)),
p(strong("Trend:"), cache$current_trend %||% "Unknown"),
p(strong("Signal:"), toupper(gsub("_", " ", cache$current_signal %||% "None"))),
p(strong("Volatility:"), round(cache$volatility %||% 0, 2))
),
column(6,
h5("Statistics"),
p(strong("Mean:"), round(cache$mean_value %||% 0, 4)),
p(strong("Std Dev:"), round(cache$std_dev %||% 0, 4)),
p(strong("Min/Max:"), paste(round(cache$min_value %||% 0, 2), "/", round(cache$max_value %||% 0, 2))),
p(strong("Quality:"), paste0(round((cache$data_quality_score %||% 0) * 100, 1), "%"))
)
),
hr(),
fluidRow(
column(12,
h5("Recent Activity"),
p(strong("Buy Signals:"), cache$buy_signals %||% 0),
p(strong("Sell Signals:"), cache$sell_signals %||% 0),
p(strong("Recent Peaks:"), cache$recent_peaks %||% 0),
p(strong("Recent Troughs:"), cache$recent_troughs %||% 0)
)
)
)
})
}High-Performance Real-Time Architectures
Memory-Efficient Streaming Data Management
# Optimized high-performance real-time data handling
create_high_performance_realtime_app <- function() {
server <- function(input, output, session) {
# High-performance data structures
performance_manager <- reactiveValues(
# Circular buffers for memory efficiency
data_buffer = rep(NA_real_, 10000),
timestamp_buffer = rep(as.POSIXct(NA), 10000),
buffer_pointer = 1,
buffer_size = 10000,
# Performance metrics
processing_times = numeric(100),
memory_usage_history = numeric(100),
update_frequency = 0,
# Connection health
connection_status = "disconnected",
missed_updates = 0,
last_heartbeat = NULL
)
# Optimized data ingestion
ingest_high_frequency_data <- function(new_values, timestamps) {
start_time <- Sys.time()
tryCatch({
# Batch processing for efficiency
n_new <- length(new_values)
if(n_new > 0) {
# Calculate insertion positions in circular buffer
start_pos <- performance_manager$buffer_pointer
end_pos <- (start_pos + n_new - 1) %% performance_manager$buffer_size + 1
# Handle wraparound
if(end_pos < start_pos) {
# Split insertion across buffer wrap
first_chunk <- performance_manager$buffer_size - start_pos + 1
# Insert first chunk
performance_manager$data_buffer[start_pos:performance_manager$buffer_size] <-
new_values[1:first_chunk]
performance_manager$timestamp_buffer[start_pos:performance_manager$buffer_size] <-
timestamps[1:first_chunk]
# Insert remaining chunk
if(n_new > first_chunk) {
remaining <- new_values[(first_chunk + 1):n_new]
remaining_ts <- timestamps[(first_chunk + 1):n_new]
performance_manager$data_buffer[1:length(remaining)] <- remaining
performance_manager$timestamp_buffer[1:length(remaining)] <- remaining_ts
}
} else {
# Contiguous insertion
indices <- start_pos:end_pos
performance_manager$data_buffer[indices] <- new_values
performance_manager$timestamp_buffer[indices] <- timestamps
}
# Update buffer pointer
performance_manager$buffer_pointer <-
(performance_manager$buffer_pointer + n_new - 1) %% performance_manager$buffer_size + 1
}
# Record processing time
processing_time <- as.numeric(difftime(Sys.time(), start_time, units = "secs"))
performance_manager$processing_times <- c(
tail(performance_manager$processing_times, 99),
processing_time
)
}, error = function(e) {
cat("High-frequency ingestion error:", e$message, "\n")
performance_manager$missed_updates <- performance_manager$missed_updates + 1
})
}
# Efficient data extraction from circular buffer
get_recent_data <- function(n_points = 1000) {
# Get valid data points from circular buffer
valid_indices <- !is.na(performance_manager$data_buffer)
if(!any(valid_indices)) {
return(data.frame(timestamp = as.POSIXct(character(0)), value = numeric(0)))
}
# Extract most recent n_points
pointer <- performance_manager$buffer_pointer
buffer_size <- performance_manager$buffer_size
# Calculate extraction range
if(sum(valid_indices) >= n_points) {
# Get last n_points worth of data
end_pos <- (pointer - 1 + buffer_size) %% buffer_size + 1
start_pos <- (end_pos - n_points + 1 + buffer_size) %% buffer_size + 1
if(start_pos <= end_pos) {
# Contiguous range
indices <- start_pos:end_pos
} else {
# Wraparound range
indices <- c(start_pos:buffer_size, 1:end_pos)
}
} else {
# Return all valid data
indices <- which(valid_indices)
}
# Create data frame
data.frame(
timestamp = performance_manager$timestamp_buffer[indices],
value = performance_manager$data_buffer[indices]
) %>%
filter(!is.na(timestamp)) %>%
arrange(timestamp)
}
# Optimized plotting for high-frequency data
output$high_frequency_plot <- renderPlotly({
# Get recent data efficiently
plot_data <- get_recent_data(500) # Last 500 points
if(nrow(plot_data) == 0) {
return(plot_ly() %>%
layout(title = "Waiting for high-frequency data..."))
}
# Use WebGL for performance with large datasets
plot_ly(
plot_data,
x = ~timestamp,
y = ~value,
type = "scattergl", # WebGL for performance
mode = "lines",
line = list(color = "#2ecc71", width = 1),
hovertemplate = "Time: %{x}<br>Value: %{y:.6f}<extra></extra>"
) %>%
layout(
title = paste("Live High-Frequency Data (", nrow(plot_data), "points)"),
xaxis = list(
title = "Time",
type = "date",
tickformat = "%H:%M:%S.%L" # Show milliseconds
),
yaxis = list(title = "Value"),
hovermode = "x"
) %>%
config(
displayModeBar = FALSE,
scrollZoom = TRUE,
responsive = TRUE
)
})
# Performance monitoring
output$performance_metrics <- renderUI({
processing_times <- performance_manager$processing_times
valid_times <- processing_times[!is.na(processing_times) & processing_times > 0]
if(length(valid_times) == 0) {
return(div("No performance data available"))
}
# Calculate performance statistics
avg_processing_time <- mean(valid_times) * 1000 # Convert to milliseconds
max_processing_time <- max(valid_times) * 1000
processing_rate <- 1 / mean(valid_times) # Operations per second
# Memory usage
current_memory <- as.numeric(object.size(performance_manager)) / (1024^2) # MB
div(
class = "performance-panel",
h4("Performance Metrics"),
fluidRow(
column(6,
p(strong("Avg Processing Time:"), paste0(round(avg_processing_time, 2), "ms")),
p(strong("Max Processing Time:"), paste0(round(max_processing_time, 2), "ms")),
p(strong("Processing Rate:"), paste0(round(processing_rate, 1), " ops/sec"))
),
column(6,
p(strong("Memory Usage:"), paste0(round(current_memory, 2), "MB")),
p(strong("Buffer Utilization:"), paste0(round(sum(!is.na(performance_manager$data_buffer)) / performance_manager$buffer_size * 100, 1), "%")),
p(strong("Missed Updates:"), performance_manager$missed_updates)
)
),
# Performance indicator
div(
class = if(avg_processing_time < 1) "alert alert-success" else if(avg_processing_time < 5) "alert alert-warning" else "alert alert-danger",
strong("Performance Status: "),
if(avg_processing_time < 1) "Optimal" else if(avg_processing_time < 5) "Good" else "Needs Attention"
)
)
})
# Automatic memory cleanup
observe({
invalidateLater(300000) # Every 5 minutes
# Force garbage collection
gc()
# Reset performance counters periodically
if(performance_manager$missed_updates > 100) {
performance_manager$missed_updates <- 0
showNotification("Performance counters reset", type = "message")
}
})
# Simulated high-frequency data generation
observe({
# High frequency updates (multiple times per second)
invalidateLater(100) # 10 updates per second
# Generate batch of data points
batch_size <- sample(1:5, 1) # Variable batch sizes
new_values <- cumsum(rnorm(batch_size, 0, 0.01)) +
tail(performance_manager$data_buffer[!is.na(performance_manager$data_buffer)], 1) %||% 0
new_timestamps <- seq(Sys.time(), by = 0.01, length.out = batch_size)
# Ingest data using optimized function
ingest_high_frequency_data(new_values, new_timestamps)
})
}
return(server)
}WebSocket Performance Optimization
Advanced WebSocket configurations ensure reliable high-frequency data transmission:
# Production-grade WebSocket configuration
optimize_websocket_performance <- function() {
# Client-side optimization
websocket_client_config <- list(
# Connection settings
reconnect_attempts = 10,
reconnect_delay = c(1000, 2000, 4000, 8000, 16000), # Exponential backoff
heartbeat_interval = 30000, # 30 seconds
# Buffer settings
max_buffer_size = 1000000, # 1MB
batch_size = 100,
flush_interval = 100, # 100ms
# Performance settings
compression = TRUE,
binary_mode = FALSE,
keep_alive = TRUE
)
# Server-side configuration
websocket_server_config <- list(
# Resource management
max_connections = 1000,
connection_timeout = 300000, # 5 minutes
message_size_limit = 65536, # 64KB
# Performance tuning
enable_compression = TRUE,
tcp_nodelay = TRUE,
socket_keepalive = TRUE,
# Rate limiting
messages_per_second = 100,
burst_limit = 500,
# Memory management
gc_interval = 60000, # 1 minute
memory_threshold = 512 # 512MB
)
return(list(
client = websocket_client_config,
server = websocket_server_config
))
}
# Enhanced WebSocket message handling
create_optimized_websocket_handler <- function() {
# Message compression and batching
compress_message_batch <- function(messages) {
# Batch multiple small messages
if(length(messages) > 1) {
batch_message <- list(
type = "batch",
messages = messages,
timestamp = as.numeric(Sys.time()),
compression = "gzip"
)
# Compress JSON
json_data <- jsonlite::toJSON(batch_message, auto_unbox = TRUE)
compressed_data <- memCompress(charToRaw(json_data), type = "gzip")
return(compressed_data)
} else {
# Single message - no batching needed
return(jsonlite::toJSON(messages[[1]], auto_unbox = TRUE))
}
}
# Message decompression and processing
decompress_message_batch <- function(raw_data) {
tryCatch({
# Check if data is compressed
if(is.raw(raw_data)) {
# Decompress
decompressed <- rawToChar(memDecompress(raw_data, type = "gzip"))
message_data <- jsonlite::fromJSON(decompressed)
} else {
# Plain JSON
message_data <- jsonlite::fromJSON(raw_data)
}
# Handle batch messages
if(!is.null(message_data$type) && message_data$type == "batch") {
return(message_data$messages)
} else {
return(list(message_data))
}
}, error = function(e) {
cat("Message decompression error:", e$message, "\n")
return(list())
})
}
return(list(
compress = compress_message_batch,
decompress = decompress_message_batch
))
}Production Real-Time Deployment Strategies
Scalable Architecture for High-Volume Streams
# Enterprise-grade real-time application architecture
create_enterprise_realtime_app <- function() {
ui <- fluidPage(
# Production CSS and JS optimizations
tags$head(
tags$style(HTML("
.realtime-container {
position: relative;
min-height: 400px;
}
.performance-indicator {
position: absolute;
top: 10px;
right: 10px;
z-index: 1000;
}
.status-green { color: #27ae60; }
.status-yellow { color: #f39c12; }
.status-red { color: #e74c3c; }
.metrics-card {
background: #f8f9fa;
border-radius: 8px;
padding: 15px;
margin-bottom: 15px;
border-left: 4px solid #3498db;
}
")),
# WebSocket client with production features
tags$script(HTML("
class ProductionWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
reconnectAttempts: 10,
reconnectDelay: [1000, 2000, 4000, 8000, 16000],
heartbeatInterval: 30000,
messageQueue: [],
maxQueueSize: 1000,
...options
};
this.reconnectCount = 0;
this.isConnecting = false;
this.messageBuffer = [];
this.lastHeartbeat = null;
this.connect();
this.startHeartbeat();
}
connect() {
if(this.isConnecting) return;
this.isConnecting = true;
this.ws = new WebSocket(this.url);
this.ws.onopen = (event) => {
console.log('WebSocket connected');
this.isConnecting = false;
this.reconnectCount = 0;
// Flush message buffer
this.flushMessageBuffer();
// Notify Shiny
Shiny.setInputValue('websocket_status', 'connected');
};
this.ws.onmessage = (event) => {
this.handleMessage(event.data);
};
this.ws.onclose = (event) => {
console.log('WebSocket disconnected');
this.isConnecting = false;
Shiny.setInputValue('websocket_status', 'disconnected');
// Attempt reconnection
this.scheduleReconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
Shiny.setInputValue('websocket_status', 'error');
};
}
handleMessage(data) {
try {
// Update last heartbeat
this.lastHeartbeat = Date.now();
// Parse message
const message = JSON.parse(data);
// Handle different message types
if(message.type === 'heartbeat') {
// Heartbeat response - no action needed
return;
}
if(message.type === 'batch') {
// Process batch messages
message.messages.forEach(msg => {
this.processMessage(msg);
});
} else {
// Single message
this.processMessage(message);
}
} catch(error) {
console.error('Message processing error:', error);
}
}
processMessage(message) {
// Send to Shiny based on message type
if(message.type === 'realtime_data') {
Shiny.setInputValue('realtime_data_stream', {
data: message.data,
timestamp: message.timestamp,
source: message.source || 'websocket'
});
} else if(message.type === 'system_metrics') {
Shiny.setInputValue('system_metrics_stream', message.data);
} else if(message.type === 'alert') {
Shiny.setInputValue('realtime_alert', {
alert: message.data,
timestamp: Date.now()
});
}
}
send(data) {
if(this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
} else {
// Queue message for later
if(this.messageBuffer.length < this.options.maxQueueSize) {
this.messageBuffer.push(data);
}
}
}
flushMessageBuffer() {
while(this.messageBuffer.length > 0) {
const message = this.messageBuffer.shift();
this.send(message);
}
}
scheduleReconnect() {
if(this.reconnectCount >= this.options.reconnectAttempts) {
console.error('Max reconnection attempts reached');
return;
}
const delay = this.options.reconnectDelay[
Math.min(this.reconnectCount, this.options.reconnectDelay.length - 1)
];
setTimeout(() => {
this.reconnectCount++;
this.connect();
}, delay);
}
startHeartbeat() {
setInterval(() => {
if(this.ws && this.ws.readyState === WebSocket.OPEN) {
this.send({ type: 'heartbeat', timestamp: Date.now() });
}
// Check for connection health
if(this.lastHeartbeat && Date.now() - this.lastHeartbeat > 60000) {
console.warn('Connection appears stale, reconnecting...');
this.ws.close();
}
}, this.options.heartbeatInterval);
}
}
// Initialize production WebSocket
let productionWS;
$(document).ready(function() {
productionWS = new ProductionWebSocket('ws://localhost:8080', {
reconnectAttempts: 15,
heartbeatInterval: 20000
});
});
"))
),
titlePanel("Enterprise Real-Time Monitoring System"),
# Control panel
fluidRow(
column(3,
wellPanel(
h4("System Control"),
div(class = "metrics-card",
h5("Connection Status"),
textOutput("connection_health"),
br(),
actionButton("force_reconnect", "Force Reconnect",
class = "btn-warning btn-sm")
),
div(class = "metrics-card",
h5("Stream Configuration"),
selectInput("stream_frequency", "Update Frequency:",
choices = c("Ultra High (100ms)" = 100,
"High (250ms)" = 250,
"Normal (1s)" = 1000,
"Low (5s)" = 5000),
selected = 1000),
checkboxGroupInput("active_streams", "Active Data Streams:",
choices = c("Market Data" = "market",
"System Metrics" = "system",
"User Events" = "events",
"Performance Data" = "performance"),
selected = c("market", "system"))
),
div(class = "metrics-card",
h5("Performance Monitoring"),
textOutput("throughput_rate"),
textOutput("latency_stats"),
textOutput("error_rate")
)
)
),
# Main dashboard
column(9,
# Status indicators
fluidRow(
valueBoxOutput("current_throughput", width = 3),
valueBoxOutput("avg_latency", width = 3),
valueBoxOutput("active_connections", width = 3),
valueBoxOutput("system_health", width = 3)
),
# Real-time visualizations
div(class = "realtime-container",
div(class = "performance-indicator",
uiOutput("realtime_status_indicator")
),
tabsetPanel(
tabPanel("Live Data Stream",
plotlyOutput("enterprise_realtime_plot", height = "500px")
),
tabPanel("System Performance",
fluidRow(
column(8,
plotlyOutput("system_performance_plot", height = "400px")
),
column(4,
plotlyOutput("performance_gauge", height = "200px"),
br(),
plotlyOutput("throughput_gauge", height = "200px")
)
)
),
tabPanel("Analytics Dashboard",
fluidRow(
column(6,
plotlyOutput("analytics_trend", height = "300px")
),
column(6,
plotlyOutput("distribution_plot", height = "300px")
)
),
fluidRow(
column(12,
h4("Real-Time Analytics Summary"),
DT::dataTableOutput("analytics_summary_table")
)
)
),
tabPanel("Alert Management",
fluidRow(
column(8,
h4("Active Alerts"),
uiOutput("active_alerts_display")
),
column(4,
wellPanel(
h5("Alert Configuration"),
numericInput("alert_threshold", "Alert Threshold:",
value = 85, min = 0, max = 100),
selectInput("alert_severity", "Minimum Severity:",
choices = c("Info" = "info",
"Warning" = "warning",
"Critical" = "critical"),
selected = "warning"),
checkboxInput("enable_notifications", "Enable Notifications", TRUE),
actionButton("clear_alerts", "Clear All Alerts",
class = "btn-danger btn-sm")
)
)
)
)
)
)
)
)
)
server <- function(input, output, session) {
# Enterprise-grade reactive values
enterprise_data <- reactiveValues(
# Data streams
realtime_stream = data.frame(),
system_metrics = data.frame(),
performance_data = data.frame(),
# Performance tracking
throughput_history = numeric(100),
latency_history = numeric(100),
error_count = 0,
total_messages = 0,
# Connection management
connection_status = "disconnected",
last_message_time = NULL,
connection_quality = 100,
# Alert system
active_alerts = list(),
alert_history = data.frame()
)
# Handle WebSocket status updates
observeEvent(input$websocket_status, {
enterprise_data$connection_status <- input$websocket_status
if(input$websocket_status == "connected") {
enterprise_data$connection_quality <- 100
showNotification("WebSocket connected successfully",
type = "success", duration = 3)
} else if(input$websocket_status == "disconnected") {
enterprise_data$connection_quality <- max(0, enterprise_data$connection_quality - 20)
showNotification("WebSocket disconnected - attempting reconnection",
type = "warning", duration = 5)
}
})
# Process real-time data stream
observeEvent(input$realtime_data_stream, {
stream_data <- input$realtime_data_stream
if(!is.null(stream_data)) {
# Update message tracking
enterprise_data$total_messages <- enterprise_data$total_messages + 1
enterprise_data$last_message_time <- Sys.time()
# Calculate latency
if(!is.null(stream_data$timestamp)) {
latency <- as.numeric(Sys.time()) - stream_data$timestamp
enterprise_data$latency_history <- c(
tail(enterprise_data$latency_history, 99),
latency * 1000 # Convert to milliseconds
)
}
# Process data point
new_data_point <- data.frame(
timestamp = Sys.time(),
value = stream_data$data$value,
source = stream_data$source,
latency_ms = tail(enterprise_data$latency_history, 1)
)
# Update data stream
current_stream <- enterprise_data$realtime_stream
updated_stream <- rbind(current_stream, new_data_point)
# Maintain sliding window
if(nrow(updated_stream) > 2000) {
updated_stream <- tail(updated_stream, 2000)
}
enterprise_data$realtime_stream <- updated_stream
# Update throughput calculation
calculate_throughput()
}
})
# Throughput calculation
calculate_throughput <- function() {
current_time <- Sys.time()
# Calculate messages per second over last 10 seconds
recent_data <- enterprise_data$realtime_stream
if(nrow(recent_data) > 1) {
recent_window <- recent_data[recent_data$timestamp > (current_time - 10), ]
if(nrow(recent_window) > 0) {
time_span <- as.numeric(difftime(max(recent_window$timestamp),
min(recent_window$timestamp),
units = "secs"))
throughput <- if(time_span > 0) nrow(recent_window) / time_span else 0
enterprise_data$throughput_history <- c(
tail(enterprise_data$throughput_history, 99),
throughput
)
}
}
}
# Value boxes for enterprise metrics
output$current_throughput <- renderValueBox({
current_throughput <- tail(enterprise_data$throughput_history, 1)
if(length(current_throughput) == 0) current_throughput <- 0
valueBox(
value = paste0(round(current_throughput, 1), "/s"),
subtitle = "Message Throughput",
icon = icon("tachometer-alt"),
color = if(current_throughput > 10) "green" else if(current_throughput > 5) "yellow" else "red"
)
})
output$avg_latency <- renderValueBox({
recent_latency <- tail(enterprise_data$latency_history, 10)
avg_latency <- if(length(recent_latency) > 0) mean(recent_latency, na.rm = TRUE) else 0
valueBox(
value = paste0(round(avg_latency, 1), "ms"),
subtitle = "Average Latency",
icon = icon("clock"),
color = if(avg_latency < 100) "green" else if(avg_latency < 500) "yellow" else "red"
)
})
output$active_connections <- renderValueBox({
connection_count <- if(enterprise_data$connection_status == "connected") 1 else 0
valueBox(
value = connection_count,
subtitle = "Active Connections",
icon = icon("plug"),
color = if(connection_count > 0) "green" else "red"
)
})
output$system_health <- renderValueBox({
health_score <- enterprise_data$connection_quality
valueBox(
value = paste0(health_score, "%"),
subtitle = "System Health",
icon = icon("heartbeat"),
color = if(health_score > 80) "green" else if(health_score > 50) "yellow" else "red"
)
})
# Enterprise real-time plot
output$enterprise_realtime_plot <- renderPlotly({
plot_data <- enterprise_data$realtime_stream
if(nrow(plot_data) == 0) {
return(plot_ly() %>%
layout(title = "Waiting for enterprise data stream...") %>%
config(displayModeBar = FALSE))
}
# Use last 500 points for optimal performance
display_data <- tail(plot_data, 500)
plot_ly(
display_data,
x = ~timestamp,
y = ~value,
type = "scattergl",
mode = "lines",
line = list(color = "#2c3e50", width = 2),
hovertemplate = paste(
"<b>Time:</b> %{x}<br>",
"<b>Value:</b> %{y:.4f}<br>",
"<b>Latency:</b> %{customdata:.1f}ms<extra></extra>"
),
customdata = ~latency_ms
) %>%
layout(
title = "Enterprise Real-Time Data Stream",
xaxis = list(
title = "Time",
type = "date",
tickformat = "%H:%M:%S"
),
yaxis = list(title = "Value"),
hovermode = "x",
showlegend = FALSE
) %>%
config(
displayModeBar = TRUE,
displaylogo = FALSE,
modeBarButtonsToRemove = c("pan2d", "lasso2d", "select2d", "autoScale2d")
)
})
# Status indicator
output$realtime_status_indicator <- renderUI({
status <- enterprise_data$connection_status
throughput <- tail(enterprise_data$throughput_history, 1)
if(length(throughput) == 0) throughput <- 0
indicator_class <- switch(status,
"connected" = if(throughput > 5) "status-green" else "status-yellow",
"disconnected" = "status-red",
"error" = "status-red",
"status-yellow"
)
indicator_text <- switch(status,
"connected" = "● LIVE",
"disconnected" = "● OFFLINE",
"error" = "● ERROR",
"● UNKNOWN"
)
span(indicator_text, class = paste("h5", indicator_class))
})
# Performance monitoring outputs
output$throughput_rate <- renderText({
current_throughput <- tail(enterprise_data$throughput_history, 1)
if(length(current_throughput) == 0) current_throughput <- 0
paste("Throughput:", round(current_throughput, 2), "msg/sec")
})
output$latency_stats <- renderText({
recent_latency <- tail(enterprise_data$latency_history, 20)
if(length(recent_latency) > 0) {
avg_latency <- mean(recent_latency, na.rm = TRUE)
paste("Avg Latency:", round(avg_latency, 1), "ms")
} else {
"Latency: N/A"
}
})
output$error_rate <- renderText({
total_messages <- enterprise_data$total_messages
error_count <- enterprise_data$error_count
error_rate <- if(total_messages > 0) (error_count / total_messages) * 100 else 0
paste("Error Rate:", round(error_rate, 2), "%")
})
# Connection health display
output$connection_health <- renderText({
status <- enterprise_data$connection_status
quality <- enterprise_data$connection_quality
health_text <- switch(status,
"connected" = paste("🟢 Connected (", quality, "%)"),
"disconnected" = "🔴 Disconnected",
"error" = "🟡 Connection Error",
"🔵 Unknown Status"
)
health_text
})
# Force reconnect handler
observeEvent(input$force_reconnect, {
runjs("
if(typeof productionWS !== 'undefined') {
productionWS.ws.close();
setTimeout(() => productionWS.connect(), 1000);
}
")
showNotification("Forcing WebSocket reconnection...", type = "message", duration = 3)
})
}
return(list(ui = ui, server = server))
}Common Issues and Solutions
Issue 1: Memory Leaks in Long-Running Applications
Problem: Real-time applications accumulate data over time, leading to memory exhaustion and performance degradation.
Solution:
Implement proper memory management with sliding windows and garbage collection:
# Memory-efficient data management
manage_memory_efficiently <- function(reactive_values, max_points = 1000) {
# Sliding window for time series data
if(nrow(reactive_values$data) > max_points) {
reactive_values$data <- tail(reactive_values$data, max_points)
}
# Periodic cleanup of old objects
observe({
invalidateLater(300000) # Every 5 minutes
# Force garbage collection
gc()
# Clear old cached calculations
if(length(reactive_values$cache) > 100) {
reactive_values$cache <- tail(reactive_values$cache, 50)
}
})
}Issue 2: WebSocket Connection Drops and Recovery
Problem: Network interruptions cause WebSocket disconnections, leading to data loss and poor user experience.
Solution:
Implement robust reconnection logic with exponential backoff:
# Robust WebSocket reconnection strategy
implement_connection_recovery <- function() {
reconnection_config <- list(
max_attempts = 10,
base_delay = 1000, # 1 second
max_delay = 30000, # 30 seconds
backoff_factor = 1.5,
jitter = TRUE
)
# JavaScript reconnection logic
reconnection_js <- "
function attemptReconnection(attempt = 1) {
if(attempt > config.max_attempts) {
console.error('Max reconnection attempts reached');
return;
}
const delay = Math.min(
config.base_delay * Math.pow(config.backoff_factor, attempt - 1),
config.max_delay
);
const jitteredDelay = config.jitter ?
delay + (Math.random() * 1000) : delay;
setTimeout(() => {
console.log(`Reconnection attempt ${attempt}`);
connectWebSocket();
}, jitteredDelay);
}
"
return(reconnection_js)
}Issue 3: High-Frequency Update Performance
Problem: Very frequent updates (multiple per second) cause UI lag and poor responsiveness.
Solution:
Implement intelligent update throttling and batching:
# Performance optimization for high-frequency updates
optimize_high_frequency_updates <- function() {
# Throttling mechanism
last_update <- reactiveVal(Sys.time())
update_throttle <- 100 # Minimum 100ms between UI updates
# Batched update processing
update_queue <- reactiveValues(
pending_updates = list(),
last_flush = Sys.time()
)
# Process updates in batches
observe({
invalidateLater(update_throttle)
current_time <- Sys.time()
time_since_last <- difftime(current_time, last_update(), units = "secs")
# Only update if enough time has passed
if(time_since_last >= (update_throttle / 1000)) {
# Process all pending updates at once
if(length(update_queue$pending_updates) > 0) {
# Combine updates
combined_updates <- do.call(rbind, update_queue$pending_updates)
# Update display
process_combined_updates(combined_updates)
# Clear queue
update_queue$pending_updates <- list()
last_update(current_time)
}
}
})
}When deploying real-time applications to production, ensure proper resource allocation, implement connection pooling, configure load balancing for WebSocket traffic, and establish comprehensive monitoring and alerting systems.
Common Questions About Real-Time Shiny Applications
You can implement multiple invalidation timers for different parts of your application. Fast-updating components (like live charts) can refresh every second, while summary statistics might update every 30 seconds. Use invalidateLater() with different intervals in separate observe() blocks, and consider using reactive expressions to share data processing between components with different update schedules.
Polling uses invalidateLater() to periodically fetch new data from databases or APIs - it’s simple to implement but creates constant server load. WebSockets establish persistent connections for bidirectional communication, offering lower latency and reduced server overhead for high-frequency updates. Choose polling for moderate update frequencies (every few seconds) and WebSockets for sub-second updates or when you need server-initiated communications.
Implement sliding window data storage using circular buffers or by limiting data frames to a maximum number of rows. Use gc() periodically to force garbage collection, clear old reactive values and cached calculations, and monitor memory usage with object.size(). Consider storing only essential data in reactive values and moving historical data to databases or files for long-term storage.
Yes, you can create effective real-time applications using only Shiny’s built-in reactivity with invalidateLater(), reactiveTimer(), and database polling. This approach works well for update frequencies of 1-10 seconds and is much simpler to deploy. For sub-second updates or when you need to handle thousands of concurrent users, external WebSocket infrastructure becomes necessary.
Implement exponential backoff reconnection strategies, queue messages during disconnections for later transmission, use heartbeat/ping mechanisms to detect stale connections, and implement data checksums or sequence numbers to detect missing updates. Always include error handling in your data processing pipelines and provide clear user feedback about connection status and data freshness.
Test Your Understanding
You’re building a financial trading dashboard that needs to display stock prices updating multiple times per second. Which architectural approach would provide the best performance and user experience?
- Use
invalidateLater(100)to poll a REST API every 100ms
- Implement WebSocket connections with client-side data buffering
- Use
reactiveTimer()with database queries every 500ms
- Poll multiple APIs simultaneously using
invalidateLater(50)
- Consider the frequency of updates needed (multiple times per second)
- Think about server load and network efficiency
- Consider the user experience and responsiveness requirements
- Remember the scalability implications of each approach
B) Implement WebSocket connections with client-side data buffering
For high-frequency financial data (multiple updates per second), WebSocket connections provide the optimal solution:
Why WebSockets are ideal:
- Real-time bidirectional communication eliminates polling overhead
- Lower latency for time-critical financial data
- Efficient bandwidth usage compared to repeated HTTP requests
- Server-initiated updates ensure immediate data delivery
Why other options are suboptimal:
- Option A: 100ms polling creates excessive server load and network traffic
- Option C: 500ms intervals are too slow for high-frequency trading data
- Option D: 50ms polling would overwhelm servers and networks
Implementation considerations: Use message batching, implement connection recovery, and buffer data client-side to handle temporary network issues without losing critical price updates.
Your real-time monitoring application runs 24/7 and accumulates data continuously. After 6 hours, the application becomes slow and eventually crashes. Complete this memory management solution:
server <- function(input, output, session) {
realtime_data <- reactiveValues(
current_data = data.frame(),
update_counter = 0
)
# Memory management strategy
observe({
_______(______) # How often to clean up?
# Maintain sliding window
if(nrow(realtime_data$current_data) > ______) {
realtime_data$current_data <- ______(realtime_data$current_data, ______)
}
# Force garbage collection
______()
})
}- Consider how frequently cleanup should occur (balance performance vs memory)
- Think about appropriate data window sizes for real-time applications
- Remember R’s garbage collection function
- Consider what function keeps the most recent data
server <- function(input, output, session) {
realtime_data <- reactiveValues(
current_data = data.frame(),
update_counter = 0
)
# Memory management strategy
observe({
invalidateLater(300000) # Clean up every 5 minutes
# Maintain sliding window
if(nrow(realtime_data$current_data) > 1000) {
realtime_data$current_data <- tail(realtime_data$current_data, 1000)
}
# Force garbage collection
gc()
})
}Key principles:
- Regular cleanup intervals: 5-minute intervals balance memory management with performance
- Sliding window size: 1000 points provides sufficient history without excessive memory usage
- Forced garbage collection:
gc()ensures memory is actually freed - Proactive management: Clean up before memory issues occur, not after
Your WebSocket connection frequently drops due to network issues. Design a robust reconnection strategy that handles temporary outages gracefully while preventing overwhelming the server during extended outages.
What elements should your reconnection strategy include, and in what order should they be implemented?
- Immediate reconnection attempts with fixed 1-second intervals
- Exponential backoff with maximum delay limits and jitter
- Linear backoff with connection health monitoring
- Random delay intervals with no maximum attempt limits
- Consider what happens if many clients reconnect simultaneously
- Think about server load during network outages
- Consider how to handle both brief and extended outages
- Remember the importance of preventing connection storms
B) Exponential backoff with maximum delay limits and jitter
A robust reconnection strategy should include:
1. Exponential Backoff Pattern:
const delays = [1000, 2000, 4000, 8000, 16000, 30000]; // 1s to 30s max
const attempt_delay = delays[Math.min(attempt - 1, delays.length - 1)];2. Jitter Addition:
const jittered_delay = attempt_delay + (Math.random() * 1000);3. Maximum Attempt Limits:
if(reconnect_attempts > 10) {
console.error('Max reconnection attempts reached');
return;
}4. Connection Health Monitoring:
// Heartbeat mechanism
setInterval(() => {
if(websocket.readyState === WebSocket.OPEN) {
websocket.send(JSON.stringify({type: 'heartbeat'}));
}
}, 30000);Why this approach works:
- Exponential backoff reduces server load during outages
- Jitter prevents connection storms when networks recover
- Maximum delays ensure eventual reconnection attempts
- Attempt limits prevent infinite retry loops
- Health monitoring detects stale connections proactively
Conclusion
Real-time data applications represent the pinnacle of interactive analytics, transforming static dashboards into dynamic monitoring systems that provide immediate insights and enable rapid decision-making. Through this comprehensive guide, you’ve mastered the complete spectrum of real-time application development, from basic automatic refresh patterns to sophisticated WebSocket architectures that handle high-frequency data streams with enterprise-grade reliability.
The techniques you’ve learned—streaming data processing, memory-efficient data management, robust connection handling, and performance optimization—form the foundation for building monitoring systems that rival commercial solutions. Whether you’re creating financial trading platforms, IoT monitoring dashboards, or operational intelligence systems, these real-time capabilities enable you to deliver applications that provide genuine competitive advantage through timely information delivery.
Your understanding of real-time architecture patterns, combined with practical implementation experience, positions you to tackle the most demanding interactive application requirements while maintaining the analytical power and flexibility that makes R-based solutions superior for data-driven organizations.
Next Steps
Based on your mastery of real-time data handling, here are the recommended paths for continuing your advanced Shiny development journey:
Immediate Next Steps (Complete These First)
- Advanced Shiny Modules for Scalable Apps - Build modular real-time systems that scale across enterprise applications
- Production Deployment Strategies - Deploy your real-time applications to production environments with proper scaling and monitoring
- Practice Exercise: Extend your real-time application to include multiple data streams with different update frequencies and implement a comprehensive alerting system
Building on Your Foundation (Choose Your Path)
For High-Performance Applications:
For Enterprise Integration:
For Advanced Interactivity:
Long-term Goals (2-4 Weeks)
- Build a complete real-time monitoring system for your domain (financial, IoT, operational, etc.)
- Implement a multi-user real-time collaboration platform using Shiny
- Create a high-frequency data processing pipeline that scales to thousands of concurrent users
- Contribute to the Shiny community by sharing your real-time application patterns and optimizations
Explore More Articles
Here are more articles from the same category to help you dive deeper into the topic.
Reuse
Citation
@online{kassambara2025,
author = {Kassambara, Alboukadel},
title = {Real-Time {Data} and {Live} {Updates} in {Shiny:} {Build}
{Dynamic} {Monitoring} {Systems}},
date = {2025-05-23},
url = {https://www.datanovia.com/learn/tools/shiny-apps/interactive-features/real-time-updates.html},
langid = {en}
}
