Instead of relying on Replication Monitor to assess subscriber health and identify pending commands, we can query the msrepl_commands table in the distribution database directly. This approach allows us to analyze undistributed commands by various attributes, providing a deeper understanding of the throughput across the entire replication environment. Furthermore, we can port these metrics into a custom monitoring tool like Grafana to centralize key data, analyze long-term trends, and provide self-service monitoring for non-DBAs.
This is the third post in our Replication Monitoring series, where we share tips to help database teams manage replication environments at scale.
Our objective in this post is to develop a dashboard that analyzes undistributed commands from multiple angles. These insights help answer critical questions, such as:
- How far behind a subscriber is in terms of processing commands?
- Which subscriber is performing slowly that might need some tuning?
- Is a Distribution Agent stalled or failing to process commands?
- Is there a Publication that needs to broken up into smaller publications?

To generate the dashboard above and demonstrate these metrics, I intentionally disabled the subscriber’s Distribution Agent. This caused commands to accumulate in the distribution database rather than being processed by the subscriber. Additionally, I inserted new data into the Publisher database to increase the volume of pending transactions.
After some time, I re-enabled the Distribution Agent, allowing replication to resume. Consequently, the monitoring metrics dropped significantly as the backlog of undistributed commands cleared.
While this demo features only one publisher, one distributor, and one subscriber, the true value of this dashboard lies in its scalability. In a production environment, it provides a centralized view, allowing teams to assess the health of the entire replication topology at a glance.
How to implement this centralize Undistributed Commands Dashboard?
There are two main components involved in extracting undistributed commands from a distribution database.
The First Component: msrepl_commands We begin with the msrepl_commands table, focusing specifically on the xact_seqno column. This blog post explores the importance of this column in detail. To summarize, the xact_seqno column provides a sequential transaction ID for each replication command read from the Publisher’s log file and stored in msrepl_commands for processing by Distribution Agents. The “sequential” nature of this ID is key; it allows us to identify exactly which transactions have been processed by the Distribution Agents and which remain pending.
The Second Component: The Watermark The second component involves identifying the “watermark,” or the specific position where each Subscriber’s Distribution Agent is currently processing commands. This information is found in the msdistribution_history table. This table tracks the activity history for each Distribution Agent, including the xact_seqno of the last successfully processed transaction.
Building the Query With both components identified, we can build a query to extract the undistributed commands. First, we define a query to pull data from the msdistribution_history table along with the metadata for the Publisher and Publication.
select
rsp.srvname publisher,
da.publisher_database_id,
da.publisher_db,
da.publication,
agent_id, da.name
distribution_agent,
rss.srvname subscriber,
max(xact_seqno) max_xact_seqno
from MSdistribution_agents da
join MSdistribution_history dh on da.id = dh.agent_id
join MSreplservers rsp on da.publisher_id = rsp.srvid
join MSreplservers rss on da.subscriber_id = rss.srvid
group by
rsp.srvname,
da.publisher_database_id,
da.publisher_db,
da.publication,
agent_id,
da.name,
rsp.srvname,
rss.srvname

As shown in this query, we are extracting the MAX history record—the most recent entry added to the table.
From here, we can wrap this logic into a Common Table Expression (CTE) and join it to the msrepl_commands table. The critical join condition is c.xact_seqno > t.max_xact_seqno. By filtering for any xact_seqno greater than the maximum sequence number identified in our first query, we isolate the commands that have not yet been processed. Additionally, we are pulling article information to provide deeper insight into these undistributed commands.
;with cte as (
select
rsp.srvname publisher,
da.publisher_database_id,
da.publisher_db,
da.publication,
agent_id, da.name
distribution_agent,
rss.srvname subscriber,
max(xact_seqno) max_xact_seqno
from MSdistribution_agents da
join MSdistribution_history dh on da.id = dh.agent_id
join MSreplservers rsp on da.publisher_id = rsp.srvid
join MSreplservers rss on da.subscriber_id = rss.srvid
group by
rsp.srvname,
da.publisher_database_id,
da.publisher_db,
da.publication,
agent_id,
da.name,
rsp.srvname,
rss.srvname
)
select
c.publisher,
c.publisher_db,
c.publication,
c.distribution_agent,
c.subscriber,
a.article,
count(t.xact_seqno) total_undistributed_commands
from cte c
join MSrepl_commands t on c.publisher_database_id = t.publisher_database_id
and t.xact_seqno > c.max_xact_seqno
join MSpublisher_databases pd on t.publisher_database_id = pd.id
join MSarticles a on t.article_id = a.article_id
and pd.publisher_db = a.publisher_db
group by
c.publisher,
c.publisher_db,
c.publication,
c.distribution_agent,
c.subscriber,
a.article

I intentionally stopped the subscriber’s Distribution Agent so that we would have commands queued up in the msrepl_commands table.
This is a great result: we now have valuable information about the undistributed commands without relying on the Replication Monitor tool. We can use these insights to answer the questions shared at the start of this post, along with many others. Below are the attributes that we can slice and dice the Undistributed Commands metric:
- Publisher
- Publication
- Article
- Subscriber
- and Distribution Agent
Let’s send these metrics to a monitoring tool
If you are interested in following along and would like to get started with InfluxDB and Grafana, check out the guide, “Get started with Grafana and InfluxDB,” for assistance.
In this walkthrough, we will use PowerShell to run the undistributed commands query we developed previously, prepare the metrics for InfluxDB consumption, and then send them to InfluxDB. While we won’t go into detail on how the script works, we are sharing it to help you and others get a head start. Gemini was used to assist with the InfluxDB steps. Below is a quick highlight of what the script does:
- Defines the InfluxDB connection/tokens
- Defines the SQL Server Distributor connections and SQL queries
- Executes a loop every 10 seconds to:
- Identify all distribution databases
- Run the ‘Undistributed Commands’ query against each distribution database
- Format metrics for InfluxDB
- Send the metrics to InfluxDB
# Requires -Module dbatools
# ==============================================================================
# 1. INFLUXDB 2.x CONFIGURATION (MUST BE UPDATED)
# ==============================================================================
$InfluxUrl = "http://localhost:8086/api/v2/write"
$Bucket = "sqlmonitoring" # Must match the bucket name in your telegraf.conf
$Org = "" # Must match the organization name in your telegraf.conf
# This is a REQUIRED token generated in the InfluxDB UI with 'write' permissions for the bucket.
$Token = ""
$Measurement = "sql_replication_undistributed_commands"
# ==============================================================================
# 2. SQL SERVER CONFIGURATION
# ==============================================================================
$SqlInstance = "distributor server"
$distributionDbQuery = "
select name db_name
from sys.databases
where is_distributor = 1
"
$sqlQuery = "
;with cte as (
select rsp.srvname publisher, da.publisher_database_id, da.publisher_db, da.publication, agent_id, da.name distribution_agent, rss.srvname subscriber, max(xact_seqno) max_xact_seqno
from MSdistribution_agents da
join MSdistribution_history dh on da.id = dh.agent_id
join MSreplservers rsp on da.publisher_id = rsp.srvid
join MSreplservers rss on da.subscriber_id = rss.srvid
where agent_id = 6
group by rsp.srvname, da.publisher_database_id, da.publisher_db, da.publication, agent_id, da.name, rsp.srvname, rss.srvname
)
select
c.publisher,
c.publisher_db,
c.publication,
c.distribution_agent,
c.subscriber,
a.article,
count(t.xact_seqno) total_undistributed_commands
from cte c
join MSrepl_commands t on c.publisher_database_id = t.publisher_database_id
and t.xact_seqno > c.max_xact_seqno
join MSpublisher_databases pd on t.publisher_database_id = pd.id
join MSarticles a on t.article_id = a.article_id
and pd.publisher_db = a.publisher_db
group by
c.publisher,
c.publisher_db,
c.publication,
c.distribution_agent,
c.subscriber,
a.article"
# ==============================================================================
# 3. SCRIPT EXECUTION AND DATA PREPARATION
# ==============================================================================
$count = 0
while ($count -lt 10000)
{
try {
# Silence dbatools messages during module import
$DistributionDBs = Invoke-DbaQuery -SqlInstance $SqlInstance -Database master -Query $distributionDbQuery
$UndistributedCommandResults = @(foreach($DistributionDB in $DistributionDBs) {
# Every object produced here is "emitted" to the variable above
Invoke-DbaQuery -SqlInstance $SqlInstance -Database $DistributionDB.db_name -Query $sqlQuery |
Select-Object @{n='PublisherServer'; e={$_.publisher}},
@{n='PublisherDatabase'; e={$_.publisher_db}},
@{n='Publication'; e={$_.publication}},
@{n='DistributionAgent'; e={$_.distribution_agent}},
@{n='Subscriber'; e={$_.subscriber}},
@{n='Article'; e={$_.article}},
@{n='TotalUndistributedCommands'; e={$_.total_undistributed_commands}}
})
$LineProtocolBatch = @()
# Process results and format as InfluxDB Line Protocol
foreach ($Row in $UndistributedCommandResults) {
# --- 1. Retrieve Values ---
# We ensure these are strings and handle potential nulls
$PublisherServer = [string]$Row.PublisherServer
$PublisherDatabase = [string]$Row.PublisherDatabase
$Publication = [string]$Row.Publication
$DistributionAgent = [string]$Row.DistributionAgent
$Subscriber = [string]$Row.Subscriber
$Article = [string]$Row.Article
$TotalCommands = [double]$Row.TotalUndistributedCommands
# --- 2. Format Tags (Identifiers) ---
# InfluxDB Line Protocol requires escaping spaces, commas, and equals signs in tags
$TagPublisher = "publisher=" + $PublisherServer.Replace(" ", "\ ").Replace(",", "\,")
$TagPubDB = "publisher_db=" + $PublisherDatabase.Replace(" ", "\ ").Replace(",", "\,")
$TagPublication = "publication=" + $Publication.Replace(" ", "\ ").Replace(",", "\,")
$TagAgent = "dist_agent=" + $DistributionAgent.Replace(" ", "\ ").Replace(",", "\,")
$TagSubscriber = "subscriber=" + $Subscriber.Replace(" ", "\ ").Replace(",", "\,")
$TagArticle = "article=" + $Article.Replace(" ", "\ ").Replace(",", "\,")
# Combine all tags into the TagSet (Comma separated, NO spaces)
$TagSet = "$TagPublisher,$TagPubDB,$TagPublication,$TagAgent,$TagSubscriber,$TagArticle"
# --- 3. Format Fields (The Measured Metric) ---
# Fields are separated from tags by a single SPACE.
$FieldSet = "undistributed_commands=$TotalCommands"
# --- 4. Timestamp ---
$Timestamp = [System.DateTimeOffset]::UtcNow.ToUnixTimeMilliseconds() * 1000000
# --- 5. Construct Line Protocol String ---
# Format: measurement,tagset fieldset timestamp
$Line = "$Measurement,$TagSet $FieldSet $Timestamp"
$LineProtocolBatch += $Line
}
# ==========================================================================
# 4. HTTP API POST REQUEST TO INFLUXDB
# ==========================================================================
if ($LineProtocolBatch.Count -gt 0) {
# Join all Line Protocol strings with a newline character for a single batch write
$Body = $LineProtocolBatch -join "`n"
# Headers for InfluxDB 2.x API write
$Headers = @{
"Authorization" = "Token $Token"
"Content-Type" = "text/plain"
}
# --- DEBUGGING STEP ---
# --- NEW DIAGNOSTIC LINES ---
Write-Host "URL: $InfluxUrl"
Write-Host "Bucket: $Bucket"
Write-Host "Org: $Org"
# ----------------------------
# Inside the try block, where the URI is constructed:
# --- DEBUGGING STEP (KEEP THIS LINE) ---
$FullUri = $InfluxUrl + "?bucket=" + $Bucket + "&org=" + $Org + "&precision=ns"
Write-Host "DEBUG: Constructed URI is: $FullUri"
# If the URI is invalid, output the error and stop the script early
if ($FullUri -notmatch "^http(s)?://[^\s/$.?#].[^\s]*$") {
Write-Error "DEBUG: URI validation FAILED for: $FullUri"
exit 1
}
# --- END DEBUGGING STEP ---
# Send the data to InfluxDB using the Write API
Invoke-RestMethod `
-Uri $FullUri `
-Method Post `
-Headers $Headers `
-Body $Body | Out-Null
Write-Host "SUCCESS: Sent $($LineProtocolBatch.Count) metrics to InfluxDB."
}
else {
Write-Host "INFO: SQL query returned no metrics to send."
}
}
catch {
# Output the error to the standard error stream for robust logging
Write-Error "CRITICAL ERROR during SQL or InfluxDB operation: $($_.Exception.Message)"
exit 1 # Exit with non-zero code to indicate failure if run via scheduler
}
Start-Sleep -Seconds 10
$count++
}

Great, we can see from the output message above that we have successfully sent a metric to InfluxDB.
Next, we can jump over to Grafana to pull in the new data. To set up the graph, I used the InfluxDB query builder to generate the code, which I then copied and pasted into the Grafana data source.
from(bucket: "sqlmonitoring")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sql_replication_undistributed_commands")
|> filter(fn: (r) => r["_field"] == "undistributed_commands")
|> group(columns: ["article"])
|> aggregateWindow(every: v.windowPeriod, fn: sum, createEmpty: false)
|> yield(name: "total_per_article")

For this query, I am analyzing undistributed commands by Article. The group(columns: [“”]) line is where we specify the attributes used to aggregate the metrics.

And that is it, we now can build our own custom dashboard.
Summary
When managing a large replication environment, centralizing Undistributed Commands metrics from every distribution database provides a significant boost to monitoring and troubleshooting efficiency. Additionally, we were able to share a simplified version of the dashboard with non-DBAs, empowering them to self-monitor their own replication environments. For example, if an engineering team performs a high-volume data update, they can use the dashboard to confirm in real-time when all changes have been successfully propagated downstream.

Leave a Reply