Google Cloud Dataflow
Author: t | 2025-04-25
Using the Google Cloud Dataflow Runner Adapt for: Java SDK; Python SDK; The Google Cloud Dataflow Runner uses the Cloud Dataflow managed service.When you run your pipeline with the Cloud Dataflow service, the runner uploads your executable code and dependencies to a Google Cloud Storage bucket and creates a Cloud Dataflow job, which
Google Cloud Dataflow - celerdata.com
This page describes how to use the Dataflow connector forSpanner to import, export, and modify data in SpannerGoogleSQL-dialect databases and PostgreSQL-dialect databases.Dataflow is a managed service for transforming and enrichingdata. The Dataflow connector for Spanner lets you readdata from and write data to Spanner in a Dataflowpipeline, optionally transforming or modifying the data. You can also createpipelines that transfer data between Spanner and otherGoogle Cloud products.The Dataflow connector is the recommended method for efficientlymoving data into and out of Spanner in bulk. It's also therecommended method for performing large transformations to a database which arenot supported by Partitioned DML, such as table moves and bulk deletesthat require a JOIN. When working with individual databases, there are othermethods you can use to import and export data:Use the Google Cloud console to export an individual database fromSpanner to Cloud Storage in Avroformat.Use the Google Cloud console to import a database back intoSpanner from files you exported to Cloud Storage.Use the REST API or Google Cloud CLI to run export or importjobs from Spanner to Cloud Storage and back also usingAvro format.The Dataflow connector for Spanner is part of theApache Beam Java SDK, and it provides an API for performing the previousactions. For more information about some of the concepts discussed in this page,such as PCollection objects and transforms, see the Apache Beam programmingguide.Add the connector to your Maven projectTo add the Google Cloud Dataflow connector to a Mavenproject, add the beam-sdks-java-io-google-cloud-platform Maven artifact toyour pom.xml file as a dependency.For example, assuming that your pom.xml file sets beam.version to theappropriate version number, you would add the following dependency: org.apache.beam beam-sdks-java-io-google-cloud-platform ${beam.version}Read data from SpannerTo read from Spanner, apply the SpannerIO.readtransform. Configure the read using the methods in theSpannerIO.Read class. Applying the transform returns aPCollection, where each element in the collectionrepresents an individual row
google-cloud-dataflow - riptutorial.com
Us-central1 VERSION: the version of the template that you want to useYou can use the following values: latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/ the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/ GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/ CLOUDSPANNER_INSTANCE: your Spanner instance. CLOUDSPANNER_DATABASE: your Spanner database. DLQ: the Cloud Storage path for the error queue directory. API To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch. POST "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "streamName": "STREAM_NAME" "instanceId": "CLOUDSPANNER_INSTANCE" "databaseId": "CLOUDSPANNER_DATABASE" "deadLetterQueueDirectory": "DLQ" } }} Replace the following: PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job JOB_NAME: a unique job name of your choice LOCATION: the region where you want todeploy your Dataflow job—for example, us-central1 VERSION: the version of the template that you want to useYou can use the following values: latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/ the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/ GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/ CLOUDSPANNER_INSTANCE: your Spanner instance. CLOUDSPANNER_DATABASE: your Spanner database. DLQ: the Cloud Storage path for the error queue directory. Template source code Java What's next Learn about Dataflow templates. See the list of Google-provided templates.Authenticate to Dataflow - Google Cloud
Object which then creates a transaction. Theresulting view can be passed to a read operation usingSpannerIO.Read.withTransaction. GoogleSQL PostgreSQL Read data from all available tablesYou can read data from all available tables in a Spanner database. GoogleSQL PostgreSQL Troubleshoot unsupported queriesThe Dataflow connector only supports Spanner SQLqueries where the first operator in the query execution plan is a DistributedUnion. If you attempt to read data from Spanner using aquery and you get an exception stating that the query does not have aDistributedUnion at the root, follow the steps in Understand howSpanner executes queries to retrieve an execution plan foryour query using the Google Cloud console.If your SQL query isn't supported, simplify it to a query that has a distributedunion as the first operator in the query execution plan. Remove aggregatefunctions, table joins, as well as the operators DISTINCT, GROUP BY, andORDER, as they are the operators that are most likely to prevent the queryfrom working.Create mutations for a writeUse the Mutation class'snewInsertOrUpdateBuilder method instead of thenewInsertBuilder methodunless absolutely necessary for Java pipelines. For Python pipelines, useSpannerInsertOrUpdate instead ofSpannerInsert. Dataflow providesat-least-once guarantees, meaning that the mutation might be writtenseveral times. As a result, INSERT only mutations might generatecom.google.cloud.spanner.SpannerException: ALREADY_EXISTS errors that cause the pipeline to fail. To prevent this error, use the INSERT_OR_UPDATEmutation instead, which adds a new row or updates column values if the rowalready exists. The INSERT_OR_UPDATE mutation can be applied more than once.Write to Spanner and transform dataYou can write data to Spanner with the Dataflowconnector by using a SpannerIO.write transform to execute acollection of input row mutations. The Dataflow connector groupsmutations into batches for efficiency.The following example shows how to apply a write transform to a PCollection ofmutations: GoogleSQL PostgreSQL If a transform unexpectedly stops before completion, mutations that have alreadybeen applied aren't rolled back.Apply groups of mutations. Using the Google Cloud Dataflow Runner Adapt for: Java SDK; Python SDK; The Google Cloud Dataflow Runner uses the Cloud Dataflow managed service.When you run your pipeline with the Cloud Dataflow service, the runner uploads your executable code and dependencies to a Google Cloud Storage bucket and creates a Cloud Dataflow job, which Using the Google Cloud Dataflow Runner Adapt for: Java SDK; Python SDK; The Google Cloud Dataflow Runner uses the Cloud Dataflow managed service.When you run your pipeline with the Cloud Dataflow service, the runner uploads your executable code and dependencies to a Google Cloud Storage bucket and creates a Cloud Dataflow job, whichCloud Dataflow - Google Cloud Video Tutorial
To empty.transformationCustomParameters: String containing any custom parameters to be passed to the custom transformation class. Defaults to empty.filteredEventsDirectory: This is the file path to store the events filtered via custom transformation. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.shardingContextFilePath: Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard.It is of the format Map>.tableOverrides: These are the table name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]This example shows mapping Singers table to Vocalists and Albums table to Records. For example, [{Singers, Vocalists}, {Albums, Records}]. Defaults to empty.columnOverrides: These are the column name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides.The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively. For example, [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]. Defaults to empty.schemaOverridesFilePath: A file which specifies the table and the column name overrides from source to spanner. Defaults to empty.shadowTableSpannerDatabaseId: Optional separate database for shadow tables. If not specified, shadow tables will be created in the main database. If specified, ensure shadowTableSpannerInstanceId is specified as well. Defaults to empty.shadowTableSpannerInstanceId: Optional separate instance for shadow tables. If not specified, shadow tables will be created in the main instance. If specified, ensure shadowTableSpannerDatabaseId is specified as well. Defaults to empty.Run the template Console Go to the Dataflow Create job from template page. Go to Create job from template In the Job name field, enter a unique job name. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1. For a list of regions where you can run a Dataflow job, see Dataflow locations. From the Dataflow template drop-down menu, select the Cloud Datastream to Spanner template. In the provided parameter fields, enter your parameter values. Click Run job. gcloud In your shell or terminal, run the template: gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \ --parameters \inputFilePattern=GCS_FILE_PATH,\streamName=STREAM_NAME,\instanceId=CLOUDSPANNER_INSTANCE,\databaseId=CLOUDSPANNER_DATABASE,\deadLetterQueueDirectory=DLQ Replace the following: PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job JOB_NAME: a unique job name of your choice REGION_NAME: the region where you want todeploy your Dataflow job—for example,Cloud Dataflow - Google Cloud Platform Console
Folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/ the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/ STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging) INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name TOKEN: Splunk's Http Event Collector token URL: the URL path for Splunk's Http Event Collector (for example, DEADLETTER_TOPIC_NAME: the Pub/Sub topic name JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code ismyTransform(inJson) { /*...do stuff...*/ }, then the function name ismyTransform. For sample JavaScript UDFs, seeUDF Examples. PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-definedfunction (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js BATCH_COUNT: the batch size to use for sending multiple events to Splunk PARALLELISM: the number of parallel requests to use for sending events to Splunk DISABLE_VALIDATION: true if you want to disable SSL certificate validation ROOT_CA_CERTIFICATE_PATH: the path to root CA certificate in Cloud Storage (for example, gs://your-bucket/privateCA.crt) API To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.POST "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION", "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH" }} Replace the following: PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job JOB_NAME: a unique job name of your choice LOCATION: the region where you want todeploy your Dataflow job—for example, us-central1 VERSION: the version of the template that you want to useYou can use the following values: latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/ the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/ STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging) INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name TOKEN: Splunk's Http Event Collector token URL: the URL path for Splunk's Http Event Collector (for example, DEADLETTER_TOPIC_NAME: the Pub/Sub topic name JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code ismyTransform(inJson) { /*...do stuff...*/ }, then the function name ismyTransform. For sample JavaScript UDFs, seeUDF Examples. PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-definedfunction (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js BATCH_COUNT: the batch size to use for sending multiple events to Splunk PARALLELISM: the number of parallel requests to use forWhat is Google Cloud Dataflow and use
Book description Learn how easy it is to apply sophisticated statistical and machine learning methods to real-world problems when you build using Google Cloud Platform (GCP). This hands-on guide shows data engineers and data scientists how to implement an end-to-end data pipeline with cloud native tools on GCP.Throughout this updated second edition, you'll work through a sample business decision by employing a variety of data science approaches. Follow along by building a data pipeline in your own project on GCP, and discover how to solve data science problems in a transformative and more collaborative way.You'll learn how to:Employ best practices in building highly scalable data and ML pipelines on Google CloudAutomate and schedule data ingest using Cloud RunCreate and populate a dashboard in Data StudioBuild a real-time analytics pipeline using Pub/Sub, Dataflow, and BigQueryConduct interactive data exploration with BigQueryCreate a Bayesian model with Spark on Cloud DataprocForecast time series and do anomaly detection with BigQuery MLAggregate within time windows with DataflowTrain explainable machine learning models with Vertex AIOperationalize ML with Vertex AI PipelinesGoogle Cloud Dataflow Examples - GitHub
--worker-utilization-hint=TARGET_UTILIZATION \ JOB_IDReplace the following:REGION: the region ID of the job's regional endpointJOB_ID: the ID of the job to updateTARGET_UTILIZATION: a value in the range [0.1, 0.9]To reset the utilization hint to the default value, use the followinggcloud command:gcloud dataflow jobs update-options \ --unset-worker-utilization-hint \ --region=REGION \ --project=PROJECT_ID \ JOB_ID REST Use theprojects.locations.jobs.updatemethod:PUT "runtime_updatable_params": { "worker_utilization_hint": TARGET_UTILIZATION }}Replace the following:PROJECT_ID: the Google Cloud project ID of theDataflow job.REGION: the region ID of the job's regional endpoint.JOB_ID: the ID of the job to update.TARGET_UTILIZATION: a value in the range [0.1, 0.9]Streaming autoscaling heuristicsFor streaming pipelines, the objective of Horizontal Autoscaling is to minimizebacklog while maximizing worker utilization and throughput and to react quicklyto spikes in load.Dataflow takes several factors into account when autoscaling,including:Backlog. The estimated backlog time is calculated from the throughput andthe backlog bytes still to be processed from the input source. A pipeline isconsidered backlogged when the estimated backlog time stays above 15 seconds.Target CPU utilization. The default target for average CPU utilization is0.8. You can override this value.Available keys. Keys are the fundamental unit of parallelism inDataflow.In some cases, Dataflow uses the following factors inautoscaling decisions. If these factors are used for your job,you can see that information in theAutoscaling metrics tab.Key-based throttling uses the number of processing keys received by the jobto calculate the cap for user workers, because each key can only be processedby one worker at a time.Downscale dampening. If Dataflow detects that unstableautoscaling decisions have occurred, it slows the rate of downscaling inorder to improve stability.CPU-based upscale uses high CPU utilization as an upscaling criteria.For streaming jobs that don't useStreaming Engine, scaling might beconstrained by the number of Persistent Disks. For more information, seeSet the autoscaling range.Upscaling. If a streaming pipeline remains backlogged with sufficientparallelism on the workers for several minutes, Dataflow scalesup. Dataflow. Using the Google Cloud Dataflow Runner Adapt for: Java SDK; Python SDK; The Google Cloud Dataflow Runner uses the Cloud Dataflow managed service.When you run your pipeline with the Cloud Dataflow service, the runner uploads your executable code and dependencies to a Google Cloud Storage bucket and creates a Cloud Dataflow job, which Using the Google Cloud Dataflow Runner Adapt for: Java SDK; Python SDK; The Google Cloud Dataflow Runner uses the Cloud Dataflow managed service.When you run your pipeline with the Cloud Dataflow service, the runner uploads your executable code and dependencies to a Google Cloud Storage bucket and creates a Cloud Dataflow job, which
Troubleshoot Dataflow errors - Google Cloud
The same input rate resulted in a monthly cost reduction of 37% in Compute Engine, and peak backlog dropped from ~4.5 days to ~5.2 hours)Note: Each color represents a single worker and its assigned workload, not yet available externallyCase 3 - Load Balancing allowed for +27% increased throughput with reduced backlog of ~1 day for the same number of workers.Load balancing at workWhen a pipeline starts up, Dataflow doesn’t know in advance the amount of data coming in on any particular data source. In fact, it can change throughout the life of the pipeline. Therefore, when there are multiple topics involved, you may end up in the following situation:If worker 1 is unable to keep up with the 30 MB/s load, then you will need to bring up a third worker to handle topic 2. You can achieve a better solution with load balancing: rebalance and let the pipeline keep up with just two workers.With load balancing enabled, work is automatically and intelligently distributed by looking at the live input rate of each topic, preventing hot workers from bottlenecking the entire pipeline. This extends beyond unbalanced topics; it can also find per-key-level imbalances and redistribute keys among workers*, achieving balance at the core.On by defaultWe turned on custom source load balancing in Dataflow’s production environment across all regions in July. This is available to all customers by default for all Dataflow streaming engine pipelines. Get started with Dataflow and Google Cloud Managed Service for Apache Kafka right from the GoogleMonitor Dataflow autoscaling - Google Cloud
Passer au contenu principal 1. Introduction Bonjour à tous et merci d'être venus aujourd'hui. Prêt à vous familiariser avec Google Compute Engine ?Dans cet atelier de programmation, nous allons explorer Compute Engine à l'aide d'un exemple d'application de livre d'or.Vous allez créer des instances Compute Engine, déployer nginx et enfin mettre un équilibreur de charge réseau à l'avant. Vous pouvez créer une instance Compute Engine à partir de la console graphique ou de la ligne de commande. Dans cet atelier, vous allez apprendre à utiliser la ligne de commande.Google Compute Engine propose des machines virtuelles qui s'exécutent dans les centres de données de Google connectés à son réseau de fibre optique mondial. Les outils et le workflow proposés permettent de passer d'instances individuelles à un cloud computing global avec équilibrage de charge.Ces VM démarrent rapidement, sont dotées d'un espace de stockage sur disque persistant et offrent des performances constantes. Les machines sont disponibles dans de nombreuses configurations, y compris des tailles prédéfinies, et peuvent également être créées avec des types de machines personnalisés optimisés pour vos besoins spécifiques.Enfin, les machines virtuelles Compute Engine sont également la technologie utilisée par plusieurs autres produits Google Cloud (Kubernetes Engine, Cloud Dataproc, Cloud Dataflow, etc.). 2. Préparation Configuration de l'environnement au rythme de chacunConnectez-vous à la console Google Cloud, puis créez un projet ou réutilisez un projet existant. (Si vous ne possédez pas encore de compte Gmail ou Google Workspace, vous devez en créer un.)Le nom du projet est le nom à afficher pour les participants au projet. Il s'agit d'une chaîne de caractères qui n'est pas utilisée par les API Google, et que vous pouvez modifier à tout moment.L'ID du projet doit être unique sur l'ensemble des projets Google Cloud et doit être immuable (vous ne pouvez pas le modifier une fois que. Using the Google Cloud Dataflow Runner Adapt for: Java SDK; Python SDK; The Google Cloud Dataflow Runner uses the Cloud Dataflow managed service.When you run your pipeline with the Cloud Dataflow service, the runner uploads your executable code and dependencies to a Google Cloud Storage bucket and creates a Cloud Dataflow job, which Using the Google Cloud Dataflow Runner Adapt for: Java SDK; Python SDK; The Google Cloud Dataflow Runner uses the Cloud Dataflow managed service.When you run your pipeline with the Cloud Dataflow service, the runner uploads your executable code and dependencies to a Google Cloud Storage bucket and creates a Cloud Dataflow job, whichUse Dataflow Prime - Google Cloud
Storage. The certificate provided in Cloud Storage must be DER-encoded and can be supplied in binary or printable (Base64) encoding. If the certificate is provided in Base64 encoding, it must be bounded at the beginning by -----BEGIN CERTIFICATE-----, and must be bounded at the end by -----END CERTIFICATE-----. If this parameter is provided, this private CA certificate file is fetched and added to the Dataflow worker's trust store in order to verify the Splunk HEC endpoint's SSL certificate. If this parameter is not provided, the default trust store is used. For example, gs://mybucket/mycerts/privateCA.crt.enableBatchLogs: Specifies whether logs should be enabled for batches written to Splunk. Default: true.enableGzipHttpCompression: Specifies whether HTTP requests sent to Splunk HEC should be compressed (gzip content encoded). Default: true.javascriptTextTransformGcsPath: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. For example, gs://my-bucket/my-udfs/my_file.js.javascriptTextTransformFunctionName: The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples ( Define the interval that workers may check for JavaScript UDF changes to reload the files. Defaults to: 0.User-defined functionOptionally, you can extend this template by writing a user-defined function(UDF). The template calls the UDF for each input element. Element payloads areserialized as JSON strings. For more information, seeCreateuser-defined functions for Dataflow templates.Function specificationThe UDF has the following specification: Input: the Pub/Sub message data field, serialized as a JSON string. Output: the event data to be sent to the Splunk HEC events endpoint. The output must be a string or a stringified JSON object. Run the template Console Go to the Dataflow Create job from template page. Go to Create job from template In the Job name field, enter a unique job name. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1. For a list of regions where you can run a Dataflow job, see Dataflow locations. From the Dataflow template drop-down menu, select the Pub/Sub to Splunk template. In the provided parameter fields, enter your parameter values. Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once. Click Run job. gcloud In your shell or terminal, run the template:gcloud dataflow jobs run JOB_NAME \ --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Splunk \ --region REGION_NAME \ --staging-location STAGING_LOCATION \ --parameters \inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\token=TOKEN,\url=URL,\outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\batchCount=BATCH_COUNT,\parallelism=PARALLELISM,\disableCertificateValidation=DISABLE_VALIDATION,\rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH Replace the following: JOB_NAME: a unique job name of your choice REGION_NAME: the region where you want todeploy your Dataflow job—for example, us-central1 VERSION: the version of the template that you want to useYou can use the following values: latest to use the latest version of the template, which is available in the non-dated parentComments
This page describes how to use the Dataflow connector forSpanner to import, export, and modify data in SpannerGoogleSQL-dialect databases and PostgreSQL-dialect databases.Dataflow is a managed service for transforming and enrichingdata. The Dataflow connector for Spanner lets you readdata from and write data to Spanner in a Dataflowpipeline, optionally transforming or modifying the data. You can also createpipelines that transfer data between Spanner and otherGoogle Cloud products.The Dataflow connector is the recommended method for efficientlymoving data into and out of Spanner in bulk. It's also therecommended method for performing large transformations to a database which arenot supported by Partitioned DML, such as table moves and bulk deletesthat require a JOIN. When working with individual databases, there are othermethods you can use to import and export data:Use the Google Cloud console to export an individual database fromSpanner to Cloud Storage in Avroformat.Use the Google Cloud console to import a database back intoSpanner from files you exported to Cloud Storage.Use the REST API or Google Cloud CLI to run export or importjobs from Spanner to Cloud Storage and back also usingAvro format.The Dataflow connector for Spanner is part of theApache Beam Java SDK, and it provides an API for performing the previousactions. For more information about some of the concepts discussed in this page,such as PCollection objects and transforms, see the Apache Beam programmingguide.Add the connector to your Maven projectTo add the Google Cloud Dataflow connector to a Mavenproject, add the beam-sdks-java-io-google-cloud-platform Maven artifact toyour pom.xml file as a dependency.For example, assuming that your pom.xml file sets beam.version to theappropriate version number, you would add the following dependency: org.apache.beam beam-sdks-java-io-google-cloud-platform ${beam.version}Read data from SpannerTo read from Spanner, apply the SpannerIO.readtransform. Configure the read using the methods in theSpannerIO.Read class. Applying the transform returns aPCollection, where each element in the collectionrepresents an individual row
2025-04-22Us-central1 VERSION: the version of the template that you want to useYou can use the following values: latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/ the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/ GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/ CLOUDSPANNER_INSTANCE: your Spanner instance. CLOUDSPANNER_DATABASE: your Spanner database. DLQ: the Cloud Storage path for the error queue directory. API To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch. POST "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "streamName": "STREAM_NAME" "instanceId": "CLOUDSPANNER_INSTANCE" "databaseId": "CLOUDSPANNER_DATABASE" "deadLetterQueueDirectory": "DLQ" } }} Replace the following: PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job JOB_NAME: a unique job name of your choice LOCATION: the region where you want todeploy your Dataflow job—for example, us-central1 VERSION: the version of the template that you want to useYou can use the following values: latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/ the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/ GCS_FILE_PATH: the Cloud Storage path that is used to store datastream events. For example: gs://bucket/path/to/data/ CLOUDSPANNER_INSTANCE: your Spanner instance. CLOUDSPANNER_DATABASE: your Spanner database. DLQ: the Cloud Storage path for the error queue directory. Template source code Java What's next Learn about Dataflow templates. See the list of Google-provided templates.
2025-04-15To empty.transformationCustomParameters: String containing any custom parameters to be passed to the custom transformation class. Defaults to empty.filteredEventsDirectory: This is the file path to store the events filtered via custom transformation. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.shardingContextFilePath: Sharding context file path in cloud storage is used to populate the shard id in spanner database for each source shard.It is of the format Map>.tableOverrides: These are the table name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]This example shows mapping Singers table to Vocalists and Albums table to Records. For example, [{Singers, Vocalists}, {Albums, Records}]. Defaults to empty.columnOverrides: These are the column name overrides from source to spanner. They are written in thefollowing format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides.The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively. For example, [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]. Defaults to empty.schemaOverridesFilePath: A file which specifies the table and the column name overrides from source to spanner. Defaults to empty.shadowTableSpannerDatabaseId: Optional separate database for shadow tables. If not specified, shadow tables will be created in the main database. If specified, ensure shadowTableSpannerInstanceId is specified as well. Defaults to empty.shadowTableSpannerInstanceId: Optional separate instance for shadow tables. If not specified, shadow tables will be created in the main instance. If specified, ensure shadowTableSpannerDatabaseId is specified as well. Defaults to empty.Run the template Console Go to the Dataflow Create job from template page. Go to Create job from template In the Job name field, enter a unique job name. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1. For a list of regions where you can run a Dataflow job, see Dataflow locations. From the Dataflow template drop-down menu, select the Cloud Datastream to Spanner template. In the provided parameter fields, enter your parameter values. Click Run job. gcloud In your shell or terminal, run the template: gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \ --parameters \inputFilePattern=GCS_FILE_PATH,\streamName=STREAM_NAME,\instanceId=CLOUDSPANNER_INSTANCE,\databaseId=CLOUDSPANNER_DATABASE,\deadLetterQueueDirectory=DLQ Replace the following: PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job JOB_NAME: a unique job name of your choice REGION_NAME: the region where you want todeploy your Dataflow job—for example,
2025-04-01Folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/ the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/ STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging) INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name TOKEN: Splunk's Http Event Collector token URL: the URL path for Splunk's Http Event Collector (for example, DEADLETTER_TOPIC_NAME: the Pub/Sub topic name JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code ismyTransform(inJson) { /*...do stuff...*/ }, then the function name ismyTransform. For sample JavaScript UDFs, seeUDF Examples. PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-definedfunction (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js BATCH_COUNT: the batch size to use for sending multiple events to Splunk PARALLELISM: the number of parallel requests to use for sending events to Splunk DISABLE_VALIDATION: true if you want to disable SSL certificate validation ROOT_CA_CERTIFICATE_PATH: the path to root CA certificate in Cloud Storage (for example, gs://your-bucket/privateCA.crt) API To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.POST "jobName": "JOB_NAME", "environment": { "ipConfiguration": "WORKER_IP_UNSPECIFIED", "additionalExperiments": [] }, "parameters": { "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME", "token": "TOKEN", "url": "URL", "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME", "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE", "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION", "batchCount": "BATCH_COUNT", "parallelism": "PARALLELISM", "disableCertificateValidation": "DISABLE_VALIDATION", "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH" }} Replace the following: PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job JOB_NAME: a unique job name of your choice LOCATION: the region where you want todeploy your Dataflow job—for example, us-central1 VERSION: the version of the template that you want to useYou can use the following values: latest to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/ the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/ STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging) INPUT_SUBSCRIPTION_NAME: the Pub/Sub subscription name TOKEN: Splunk's Http Event Collector token URL: the URL path for Splunk's Http Event Collector (for example, DEADLETTER_TOPIC_NAME: the Pub/Sub topic name JAVASCRIPT_FUNCTION: the name of the JavaScript user-defined function (UDF) that you want to useFor example, if your JavaScript function code ismyTransform(inJson) { /*...do stuff...*/ }, then the function name ismyTransform. For sample JavaScript UDFs, seeUDF Examples. PATH_TO_JAVASCRIPT_UDF_FILE: the Cloud Storage URI of the .js file that defines the JavaScript user-definedfunction (UDF) you want to use—for example, gs://my-bucket/my-udfs/my_file.js BATCH_COUNT: the batch size to use for sending multiple events to Splunk PARALLELISM: the number of parallel requests to use for
2025-04-23