Introducció
Google Cloud Dataflow és un servei de processament de dades en temps real i per lots que permet crear i gestionar canals de dades. Integrar BigQuery amb Dataflow permet processar grans volums de dades de manera eficient i flexible. En aquest tema, aprendrem com utilitzar Dataflow per llegir dades de BigQuery, processar-les i escriure els resultats de nou a BigQuery.
Objectius
- Comprendre què és Google Cloud Dataflow i com es pot utilitzar amb BigQuery.
- Aprendre a configurar un pipeline de Dataflow per llegir dades de BigQuery.
- Processar dades utilitzant Dataflow.
- Escriure els resultats processats de nou a BigQuery.
Requisits previs
- Coneixements bàsics de BigQuery i SQL.
- Familiaritat amb Google Cloud Platform (GCP).
- Coneixements bàsics de programació en Python o Java (Dataflow suporta ambdós llenguatges).
Configuració de l'entorn
Passos previs
- Crear un projecte a Google Cloud Platform (GCP): Si encara no tens un projecte a GCP, crea'n un des del Google Cloud Console.
- Activar l'API de Dataflow i BigQuery: Assegura't que les APIs de Dataflow i BigQuery estiguin activades al teu projecte.
- Instal·lar el SDK de Dataflow: Pots instal·lar el SDK de Dataflow utilitzant pip (per Python) o Maven (per Java).
Instal·lació del SDK de Dataflow (Python)
Instal·lació del SDK de Dataflow (Java)
Afegeix la següent dependència al teu fitxer pom.xml:
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>2.34.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>2.34.0</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>2.34.0</version> </dependency>
Creació d'un pipeline de Dataflow
Exemple en Python
A continuació, es mostra un exemple de com crear un pipeline de Dataflow en Python que llegeix dades de BigQuery, les processa i escriu els resultats de nou a BigQuery.
Pas 1: Importar les biblioteques necessàries
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions
Pas 2: Configurar les opcions del pipeline
options = PipelineOptions() google_cloud_options = options.view_as(GoogleCloudOptions) google_cloud_options.project = 'el-teu-projecte' google_cloud_options.job_name = 'bigquery-dataflow-job' google_cloud_options.staging_location = 'gs://el-teu-bucket/staging' google_cloud_options.temp_location = 'gs://el-teu-bucket/temp' options.view_as(StandardOptions).runner = 'DataflowRunner'
Pas 3: Definir el pipeline
p = beam.Pipeline(options=options)
query = 'SELECT * FROM `el-teu-projecte.el-teu-dataset.la-teva-taula`'
(p
| 'ReadFromBigQuery' >> beam.io.Read(beam.io.BigQuerySource(query=query))
| 'ProcessData' >> beam.Map(lambda record: {'field1': record['field1'], 'field2': record['field2'] * 2})
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'el-teu-projecte:el-teu-dataset.resultats',
schema='field1:STRING, field2:INTEGER',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
)
p.run().wait_until_finish()Exemple en Java
A continuació, es mostra un exemple de com crear un pipeline de Dataflow en Java que llegeix dades de BigQuery, les processa i escriu els resultats de nou a BigQuery.
Pas 1: Importar les biblioteques necessàries
import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.TypeDescriptor;
Pas 2: Configurar les opcions del pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.setJobName("bigquery-dataflow-job");
options.setProject("el-teu-projecte");
options.setTempLocation("gs://el-teu-bucket/temp");Pas 3: Definir el pipeline
Pipeline p = Pipeline.create(options);
String query = "SELECT * FROM `el-teu-projecte.el-teu-dataset.la-teva-taula`";
p.apply("ReadFromBigQuery", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
.apply("ProcessData", MapElements.into(TypeDescriptor.of(TableRow.class))
.via((TableRow row) -> {
TableRow result = new TableRow();
result.set("field1", row.get("field1"));
result.set("field2", (Long) row.get("field2") * 2);
return result;
}))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.to("el-teu-projecte:el-teu-dataset.resultats")
.withSchema(new TableSchema().setFields(Arrays.asList(
new TableFieldSchema().setName("field1").setType("STRING"),
new TableFieldSchema().setName("field2").setType("INTEGER")
)))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
p.run().waitUntilFinish();Exercici pràctic
Descripció
Crea un pipeline de Dataflow que llegeixi dades d'una taula de BigQuery, realitzi una transformació simple (per exemple, multiplicar un camp numèric per 2) i escrigui els resultats en una nova taula de BigQuery.
Solució
La solució es troba en els exemples de codi proporcionats anteriorment. Pots adaptar-los segons les teves necessitats específiques.
Errors comuns i consells
- Error de permisos: Assegura't que el compte de servei que utilitza Dataflow tingui els permisos necessaris per accedir a BigQuery i als buckets de Google Cloud Storage.
- Especificació incorrecta del runner: Si estàs provant el pipeline localment, utilitza
DirectRunneren lloc deDataflowRunner. - Esquema incorrecte: Assegura't que l'esquema de la taula de destinació a BigQuery coincideixi amb les dades que estàs escrivint.
Conclusió
Integrar BigQuery amb Dataflow permet processar grans volums de dades de manera eficient i flexible. En aquest tema, hem après a configurar un pipeline de Dataflow per llegir dades de BigQuery, processar-les i escriure els resultats de nou a BigQuery. Aquesta habilitat és essencial per a qualsevol professional que treballi amb grans volums de dades i necessiti processar-les de manera eficient.
Curs de BigQuery
Mòdul 1: Introducció a BigQuery
- Què és BigQuery?
- Configurar el teu entorn de BigQuery
- Comprendre l'arquitectura de BigQuery
- Visió general de la consola de BigQuery
Mòdul 2: SQL bàsic a BigQuery
Mòdul 3: SQL intermedi a BigQuery
Mòdul 4: SQL avançat a BigQuery
- Unions avançades
- Camps niats i repetits
- Funcions definides per l'usuari (UDFs)
- Particionament i agrupament
Mòdul 5: Gestió de dades a BigQuery
- Carregar dades a BigQuery
- Exportar dades de BigQuery
- Transformació i neteja de dades
- Gestió de conjunts de dades i taules
Mòdul 6: Optimització del rendiment de BigQuery
- Tècniques d'optimització de consultes
- Comprendre els plans d'execució de consultes
- Ús de vistes materialitzades
- Optimització de l'emmagatzematge
Mòdul 7: Seguretat i compliment de BigQuery
Mòdul 8: Integració i automatització de BigQuery
- Integració amb serveis de Google Cloud
- Ús de BigQuery amb Dataflow
- Automatització de fluxos de treball amb Cloud Functions
- Programació de consultes amb Cloud Scheduler
Mòdul 9: Aprenentatge automàtic a BigQuery (BQML)
- Introducció a BigQuery ML
- Creació i entrenament de models
- Avaluació i predicció amb models
- Funcions avançades de BQML
