Previous
Query data
Data pipelines automatically transform raw sensor readings into summaries and insights at scheduled intervals. Precomputing these results makes subsequent queries more efficient.
For example, you might often query the average temperature across multiple sensors for each hour of the day. To make these queries faster, you can use a data pipeline to precalculate the results, saving significant computational resources.
Data pipelines work with all available data, even when the data is incomplete.
If a machine goes offline, data collection continues but sync pauses.
viam-server
stores the data locally and syncs later, when the machine reconnects to Viam.
Once the machine reconnects and syncs the stored data, Viam automatically re-runs any pipeline whose results would change based on the new data.
To define a data pipeline, specify a name, the associated organization, a schedule, a data source type, and the query:
Use datapipelines create
:
viam datapipelines create \
--org-id=<org-id> \
--name=sensor-counts \
--schedule="0 * * * *" \
--data-source-type="standard" \
--mql='[{"$match": {"component_name": "sensor"}}, {"$group": {"_id": "$location_id", "avg_temp": {"$avg": "$data.readings.temperature"}, "count": {"$sum": 1}}}, {"$project": {"location": "$_id", "avg_temp": 1, "count": 1, "_id": 0}}]' \
--enable-backfill=True
To pass your query as a file instead of specifying it inline, pass the --mql-path
flag instead of --mql
.
To create a pipeline that reads data from the hot data store, specify --data-source-type hotstorage
.
Use DataClient.CreateDataPipeline
:
import asyncio
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType
# Configuration constants – replace with your actual values
API_KEY = "" # API key, find or create in your organization settings
API_KEY_ID = "" # API key ID, find or create in your organization settings
ORG_ID = "" # Organization ID, find or create in your organization settings
async def connect() -> ViamClient:
"""Establish a connection to the Viam client using API credentials."""
dial_options = DialOptions(
credentials=Credentials(
type="api-key",
payload=API_KEY,
),
auth_entity=API_KEY_ID
)
return await ViamClient.create_from_dial_options(dial_options)
async def main() -> int:
viam_client = await connect()
data_client = viam_client.data_client
pipeline_id = await data_client.create_data_pipeline(
name="test-pipeline",
organization_id=ORG_ID,
mql_binary=[
{"$match": {"component_name": "temperature-sensor"}},
{
"$group": {
"_id": "$location_id",
"avg_temp": {"$avg": "$data.readings.temperature"},
"count": {"$sum": 1}
}
},
{
"$project": {
"location": "$_id",
"avg_temp": 1,
"count": 1,
"_id": 0
}
}
],
schedule="0 * * * *",
data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_STANDARD,
enable_backfill=False,
)
print(f"Pipeline created with ID: {pipeline_id}")
viam_client.close()
return 0
if __name__ == "__main__":
asyncio.run(main())
To create a pipeline that reads data from the hot data store, set your query’s data_source
to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
.
Use DataClient.CreateDataPipeline
:
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
apiKey := ""
apiKeyID := ""
orgID := ""
logger := logging.NewDebugLogger("client")
ctx := context.Background()
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
// Create MQL stages as map slices
mqlStages := []map[string]interface{}{
{"$match": map[string]interface{}{"component_name": "temperature-sensor"}},
{
"$group": map[string]interface{}{
"_id": "$location_id",
"avg_temp": map[string]interface{}{"$avg": "$data.readings.temperature"},
"count": map[string]interface{}{"$sum": 1},
},
},
{
"$project": map[string]interface{}{
"location": "$_id",
"avg_temp": 1,
"count": 1,
"_id": 0,
},
},
}
pipelineId, err := dataClient.CreateDataPipeline(
ctx,
orgID,
"test-pipeline",
mqlStages,
"0 * * * *",
false,
&app.CreateDataPipelineOptions{
TabularDataSourceType: 0,
},
)
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Pipeline created with ID: %s\n", pipelineId)
}
To create a pipeline that reads data from the hot data store, set your query’s data_source
field to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
.
Use dataClient.CreateDataPipeline
:
import { createViamClient } from "@viamrobotics/sdk";
// Configuration constants – replace with your actual values
let API_KEY = ""; // API key, find or create in your organization settings
let API_KEY_ID = ""; // API key ID, find or create in your organization settings
let ORG_ID = ""; // Organization ID, find or create in your organization settings
async function main(): Promise<void> {
// Create Viam client
const client = await createViamClient({
credentials: {
type: "api-key",
authEntity: API_KEY_ID,
payload: API_KEY,
},
});
const pipelineId = await client.dataClient.createDataPipeline(
ORG_ID,
"test-pipeline",
[
{ "$match": { "component_name": "temperature-sensor" } },
{ "$group": { "_id": "$location_id", "avg_temp": { "$avg": "$data.readings.temperature" }, "count": { "$sum": 1 } } },
{ "$project": { "location": "$_id", "avg_temp": 1, "count": 1, "_id": 0 } }
],
"0 * * * *",
0,
false,
);
console.log(`Pipeline created with ID: ${pipelineId}`);
}
// Run the script
main().catch((error) => {
console.error("Script failed:", error);
process.exit(1);
});
To create a pipeline that reads data from the hot data store, set your query’s dataSource
field to TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_HOT_STORAGE
.
Once configured, the pipeline will be run based on the defined schedule.
To create a schedule for your pipeline, specify a cron expression in UTC. The schedule determines both execution frequency and the range of time queried by each execution. The following table contains some common schedules:
Schedule | Frequency | Query Time Range |
---|---|---|
0 * * * * | Hourly | Previous hour |
0 0 * * * | Daily | Previous day |
*/15 * * * * | Every 15 minutes | Previous 15 minutes |
Data pipeline queries only support a subset of MQL aggregation operators. For more information, see Supported aggregation operators.
Non-unique IDs will trigger duplicate key errors, preventing the pipeline from saving subsequent results.
Avoid returning an _id
value in your pipeline’s final group stage unless you can guarantee its uniqueness across all pipeline runs.
The $group
stage returns an _id
value by default.
To remove it, follow any final $group
stage with a $project
stage that renames the _id
field to a different name.
For optimal performance when querying large datasets, see query optimization and performance best practices.
Once the pipeline has run at least once, you can query its results.
To query the processed results of your data pipeline, call DataClient.TabularDataByMQL
, using the following parameters:
type
: TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK
pipeline_id
: your pipeline ID
import asyncio
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType
# Configuration constants – replace with your actual values
API_KEY = "" # API key, find or create in your organization settings
API_KEY_ID = "" # API key ID, find or create in your organization settings
ORG_ID = "" # Organization ID, find or create in your organization settings
PIPELINE_ID = ""
async def connect() -> ViamClient:
"""Establish a connection to the Viam client using API credentials."""
dial_options = DialOptions(
credentials=Credentials(
type="api-key",
payload=API_KEY,
),
auth_entity=API_KEY_ID
)
return await ViamClient.create_from_dial_options(dial_options)
async def main() -> int:
viam_client = await connect()
data_client = viam_client.data_client
tabular_data = await data_client.tabular_data_by_mql(
organization_id=ORG_ID,
query=[
{
"$match": {
"location": { "$exists": True }
},
}, {
"$limit": 10
}
],
tabular_data_source_type=TabularDataSourceType.TABULAR_DATA_SOURCE_TYPE_PIPELINE_SINK,
pipeline_id=PIPELINE_ID
)
print(f"Tabular Data: {tabular_data}")
viam_client.close()
return 0
if __name__ == "__main__":
asyncio.run(main())
To query the processed results of your data pipeline, call DataClient.TabularDataByMQL
, using the following parameters:
Type
: app.TabularDataSourceTypePipelineSink
PipelineId
: your pipeline ID
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
apiKey := ""
apiKeyID := ""
orgID := ""
pipelineId := ""
logger := logging.NewDebugLogger("client")
ctx := context.Background()
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
// Create MQL stages as map slices
mqlStages := []map[string]interface{}{
{"$match": map[string]interface{}{"location": map[string]interface{}{"$exists": true}}},
{"$limit": 10},
}
tabularData, err := dataClient.TabularDataByMQL(ctx, orgID, mqlStages, &app.TabularDataByMQLOptions{
TabularDataSourceType: app.TabularDataSourceTypePipelineSink,
PipelineID: pipelineId,
})
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Tabular Data: %v\n", tabularData)
}
To query the processed results of your data pipeline, call dataClient.TabularDataByMQL
, using the following parameters:
type
: 3
pipelineId
: your pipeline ID
import { createViamClient } from "@viamrobotics/sdk";
// Configuration constants – replace with your actual values
let API_KEY = ""; // API key, find or create in your organization settings
let API_KEY_ID = ""; // API key ID, find or create in your organization settings
let ORG_ID = ""; // Organization ID, find or create in your organization settings
let PIPELINE_ID = "";
async function main(): Promise<void> {
// Create Viam client
const client = await createViamClient({
credentials: {
type: "api-key",
authEntity: API_KEY_ID,
payload: API_KEY,
},
});
const tabularData = await client.dataClient.tabularDataByMQL(
ORG_ID,
[
{ "$match": { "location": { "$exists": true } } },
{ "$limit": 10 }
],
false,
{
type: 3,
pipelineId: PIPELINE_ID
}
);
console.log(tabularData);
}
// Run the script
main().catch((error) => {
console.error("Script failed:", error);
process.exit(1);
});
Use datapipelines list
to fetch a list of pipeline configurations in an organization:
viam datapipelines list --org-id=<org-id>
Use DataClient.ListDataPipelines
to fetch a list of pipeline configurations in an organization:
import asyncio
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType
# Configuration constants – replace with your actual values
API_KEY = "" # API key, find or create in your organization settings
API_KEY_ID = "" # API key ID, find or create in your organization settings
ORG_ID = "" # Organization ID, find or create in your organization settings
async def connect() -> ViamClient:
"""Establish a connection to the Viam client using API credentials."""
dial_options = DialOptions(
credentials=Credentials(
type="api-key",
payload=API_KEY,
),
auth_entity=API_KEY_ID
)
return await ViamClient.create_from_dial_options(dial_options)
async def main() -> int:
viam_client = await connect()
data_client = viam_client.data_client
pipelines = await data_client.list_data_pipelines(ORG_ID)
for pipeline in pipelines:
print(f"Pipeline: {pipeline.name}, ID: {pipeline.id}, schedule: {pipeline.schedule}, data_source_type: {pipeline.data_source_type}")
viam_client.close()
return 0
if __name__ == "__main__":
asyncio.run(main())
Use DataClient.ListDataPipelines
to fetch a list of pipeline configurations in an organization:
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
apiKey := ""
apiKeyID := ""
orgID := ""
logger := logging.NewDebugLogger("client")
ctx := context.Background()
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
pipelines, err := dataClient.ListDataPipelines(ctx, orgID)
if err != nil {
logger.Fatal(err)
}
for _, pipeline := range pipelines {
fmt.Printf("Pipeline: %s, ID: %s, schedule: %s, data_source_type: %s, enable_backfill: %t\n", pipeline.Name, pipeline.ID, pipeline.Schedule, pipeline.DataSourceType)
}
}
Use dataClient.ListDataPipelines
to fetch a list of pipeline configurations in an organization:
import { createViamClient } from "@viamrobotics/sdk";
// Configuration constants – replace with your actual values
let API_KEY = ""; // API key, find or create in your organization settings
let API_KEY_ID = ""; // API key ID, find or create in your organization settings
let ORG_ID = ""; // Organization ID, find or create in your organization settings
async function main(): Promise<void> {
// Create Viam client
const client = await createViamClient({
credentials: {
type: "api-key",
authEntity: API_KEY_ID,
payload: API_KEY,
},
});
const pipelines = await client.dataClient.listDataPipelines(ORG_ID);
for (const pipeline of pipelines) {
console.log(`Pipeline: ${pipeline.name}, ID: ${pipeline.id}, schedule: ${pipeline.schedule}, data_source_type: ${pipeline.dataSourceType}`);
}
}
// Run the script
main().catch((error) => {
console.error("Script failed:", error);
process.exit(1);
});
Use datapipelines enable
to enable a disabled data pipeline:
viam datapipelines enable --id=<pipeline-id>
Use DataClient.EnableDataPipeline
to enable a disabled data pipeline:
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
apiKey := ""
apiKeyID := ""
pipelineId := ""
logger := logging.NewDebugLogger("client")
ctx := context.Background()
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
err = dataClient.EnableDataPipeline(ctx, pipelineId)
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Pipeline enabled with ID: %s\n", pipelineId)
}
Disabling a data pipeline lets you pause data pipeline execution without fully deleting the pipeline configuration from your organization. The pipeline immediately stops aggregating data.
You can re-enable the pipeline at any time to resume execution. When a pipeline is re-enabled, Viam does not backfill missed time windows from the period of time when a pipeline was disabled.
Use datapipelines disable
to disable a data pipeline:
viam datapipelines disable --id=<pipeline-id>
Use DataClient.DisableDataPipeline
to disable a data pipeline:
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
apiKey := ""
apiKeyID := ""
pipelineId := ""
logger := logging.NewDebugLogger("client")
ctx := context.Background()
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
err = dataClient.DisableDataPipeline(ctx, pipelineId)
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Pipeline disabled with ID: %s\n", pipelineId)
}
Use datapipelines delete
to delete a data pipeline, its execution history, and all output generated by that pipeline:
viam datapipelines delete --id=<pipeline-id>
Use DataClient.DeleteDataPipeline
to delete a data pipeline:
import asyncio
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType
# Configuration constants – replace with your actual values
API_KEY = "" # API key, find or create in your organization settings
API_KEY_ID = "" # API key ID, find or create in your organization settings
PIPELINE_ID = ""
async def connect() -> ViamClient:
"""Establish a connection to the Viam client using API credentials."""
dial_options = DialOptions(
credentials=Credentials(
type="api-key",
payload=API_KEY,
),
auth_entity=API_KEY_ID
)
return await ViamClient.create_from_dial_options(dial_options)
async def main() -> int:
viam_client = await connect()
data_client = viam_client.data_client
await data_client.delete_data_pipeline(PIPELINE_ID)
print(f"Pipeline deleted with ID: {PIPELINE_ID}")
viam_client.close()
return 0
if __name__ == "__main__":
asyncio.run(main())
Use DataClient.DeleteDataPipeline
to delete a data pipeline:
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
apiKey := ""
apiKeyID := ""
pipelineId := ""
logger := logging.NewDebugLogger("client")
ctx := context.Background()
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
err = dataClient.DeleteDataPipeline(ctx, pipelineId)
if err != nil {
logger.Fatal(err)
}
fmt.Printf("Pipeline deleted with ID: %s\n", pipelineId)
}
Use dataClient.DeleteDataPipeline
to delete a data pipeline:
import { createViamClient } from "@viamrobotics/sdk";
// Configuration constants – replace with your actual values
let API_KEY = ""; // API key, find or create in your organization settings
let API_KEY_ID = ""; // API key ID, find or create in your organization settings
let PIPELINE_ID = "";
async function main(): Promise<void> {
// Create Viam client
const client = await createViamClient({
credentials: {
type: "api-key",
authEntity: API_KEY_ID,
payload: API_KEY,
},
});
await client.dataClient.deleteDataPipeline(PIPELINE_ID);
console.log(`Pipeline deleted with ID: ${PIPELINE_ID}`);
}
// Run the script
main().catch((error) => {
console.error("Script failed:", error);
process.exit(1);
});
Data pipeline executions may have any of the following statuses:
SCHEDULED
: pending executionSTARTED
: currently processingCOMPLETED
: successfully finishedFAILED
: execution errorUse DataClient.ListDataPipelineRuns
to view information about past and in-progress executions of a pipeline:
import asyncio
from viam.rpc.dial import DialOptions, Credentials
from viam.app.viam_client import ViamClient
from viam.gen.app.data.v1.data_pb2 import TabularDataSourceType
# Configuration constants – replace with your actual values
API_KEY = "" # API key, find or create in your organization settings
API_KEY_ID = "" # API key ID, find or create in your organization settings
PIPELINE_ID = ""
async def connect() -> ViamClient:
"""Establish a connection to the Viam client using API credentials."""
dial_options = DialOptions(
credentials=Credentials(
type="api-key",
payload=API_KEY,
),
auth_entity=API_KEY_ID
)
return await ViamClient.create_from_dial_options(dial_options)
async def main() -> int:
viam_client = await connect()
data_client = viam_client.data_client
pipeline_runs = await data_client.list_data_pipeline_runs(PIPELINE_ID, 10)
for run in pipeline_runs.runs:
print(f"Run: ID: {run.id}, status: {run.status}, start_time: {run.start_time}, end_time: {run.end_time}, data_start_time: {run.data_start_time}, data_end_time: {run.data_end_time}")
viam_client.close()
return 0
if __name__ == "__main__":
asyncio.run(main())
Use DataClient.ListDataPipelineRuns
to view information about past executions of a pipeline:
package main
import (
"context"
"fmt"
"go.viam.com/rdk/app"
"go.viam.com/rdk/logging"
)
func main() {
apiKey := ""
apiKeyID := ""
pipelineId := ""
logger := logging.NewDebugLogger("client")
ctx := context.Background()
viamClient, err := app.CreateViamClientWithAPIKey(
ctx, app.Options{}, apiKey, apiKeyID, logger)
if err != nil {
logger.Fatal(err)
}
defer viamClient.Close()
dataClient := viamClient.DataClient()
pipelineRuns, err := dataClient.ListDataPipelineRuns(ctx, pipelineId, 10)
if err != nil {
logger.Fatal(err)
}
for _, run := range pipelineRuns.Runs {
fmt.Printf("Run: ID: %s, status: %s, start_time: %s, end_time: %s, data_start_time: %s, data_end_time: %s\n", run.ID, run.Status, run.StartTime, run.EndTime, run.DataStartTime, run.DataEndTime)
}
}
Use dataClient.ListDataPipelineRuns
to view information about past executions of a pipeline:
import { createViamClient } from "@viamrobotics/sdk";
// Configuration constants – replace with your actual values
let API_KEY = ""; // API key, find or create in your organization settings
let API_KEY_ID = ""; // API key ID, find or create in your organization settings
let PIPELINE_ID = "";
async function main(): Promise<void> {
// Create Viam client
const client = await createViamClient({
credentials: {
type: "api-key",
authEntity: API_KEY_ID,
payload: API_KEY,
},
});
const pipelineRuns = await client.dataClient.listDataPipelineRuns(PIPELINE_ID, 10);
for (const run of pipelineRuns.runs) {
console.log(
`Run: ID: ${run.id}, status: ${run.status}, start_time: ${run.startTime}, ` +
`end_time: ${run.endTime}, data_start_time: ${run.dataStartTime}, data_end_time: ${run.dataEndTime}`
);
}
}
// Run the script
main().catch((error) => {
console.error("Script failed:", error);
process.exit(1);
});
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!