From 08b6152bdbf6726d45aa2aecafaf12afac489178 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Sun, 22 Mar 2026 13:59:00 +0900 Subject: [PATCH 1/6] [ZEPPELIN-6408] Modernize BigQuery Interpreter with google-cloud-bigquery client --- bigquery/README.md | 41 ++- bigquery/pom.xml | 29 +- .../bigquery/BigQueryInterpreter.java | 343 ++++++++---------- .../bigquery/BigQueryInterpreterTest.java | 7 +- docs/interpreter/bigquery.md | 76 ++-- 5 files changed, 212 insertions(+), 284 deletions(-) diff --git a/bigquery/README.md b/bigquery/README.md index 024d81167da..73709de1b0d 100644 --- a/bigquery/README.md +++ b/bigquery/README.md @@ -1,23 +1,36 @@ # Overview -BigQuery interpreter for Apache Zeppelin +BigQuery interpreter for Apache Zeppelin using the modern [google-cloud-bigquery](https://github.com/googleapis/java-bigquery) library. -# Unit Tests -BigQuery Unit tests are excluded as these tests depend on the BigQuery external service. This is because BigQuery does not have a local mock at this point. +# Authentication +The interpreter supports multiple ways to authenticate with Google Cloud: -If you like to run these tests manually, please follow the following steps: -* [Create a new project](https://support.google.com/cloud/answer/6251787?hl=en) -* [Create a Google Compute Engine instance](https://cloud.google.com/compute/docs/instances/create-start-instance) -* Copy the project ID that you created and add it to the property "projectId" in `resources/constants.json` -* Run the command ./mvnw -Dbigquery.text.exclude='' test -pl bigquery -am +1. **Application Default Credentials (ADC)**: + This is the recommended way. If Zeppelin is running on GCE, GKE, or any environment where `gcloud auth application-default login` has been executed, the interpreter will automatically discover the credentials. -# Connection -The Interpreter opens a connection with the BigQuery Service using the supplied Google project ID and the compute environment variables. +2. **Service Account JSON Key (Manual Fallback)**: + If ADC is not available, the interpreter will prompt you to input your Service Account JSON key through the Zeppelin GUI. + - To get a JSON key: + 1. Go to the [GCP Console Service Accounts page](https://console.cloud.google.com/iam-admin/serviceaccounts). + 2. Select your project and service account. + 3. Click **Keys** -> **Add Key** -> **Create new key**. + 4. Select **JSON** and click **Create**. + 5. Copy the entire content of the downloaded JSON file and paste it into the Zeppelin input box when prompted. -# Google BigQuery API Javadoc -[API Javadocs](https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/) -[Source] (http://central.maven.org/maven2/com/google/apis/google-api-services-bigquery/v2-rev265-1.21.0/google-api-services-bigquery-v2-rev265-1.21.0-sources.jar) +# Configuration +| Property | Default | Description | +| --- | --- | --- | +| `zeppelin.bigquery.project_id` | | GCP Project ID | +| `zeppelin.bigquery.wait_time` | 5000 | Query Timeout in ms | +| `zeppelin.bigquery.max_no_of_rows` | 100000 | Max Result size | +| `zeppelin.bigquery.sql_dialect` | | SQL Dialect (standardsql or legacysql) | +| `zeppelin.bigquery.region` | | GCP Region | -We have used the curated veneer version of the Java APIs versus [Idiomatic Java client] (https://github.com/GoogleCloudPlatform/gcloud-java/tree/master/gcloud-java-bigquery) to build the interpreter. This is mainly for usability reasons. +# Unit Tests +BigQuery unit tests are integration tests that require access to a real GCP project. +By default, they are excluded. To run them: +1. Setup ADC locally (`gcloud auth application-default login`). +2. Create `src/test/resources/constants.json` with your project and test queries. +3. Run: `./mvnw test -pl bigquery -am -Dbigquery.test.exclude=""` # Sample Screenshot diff --git a/bigquery/pom.xml b/bigquery/pom.xml index 7277630c042..dd1bc61e88d 100644 --- a/bigquery/pom.xml +++ b/bigquery/pom.xml @@ -32,39 +32,18 @@ Zeppelin: BigQuery interpreter - 1.34.0 - 1.30.5 UTF-8 **/BigQueryInterpreterTest.java - - v2-rev20190917-1.30.3 - 24.1.1-jre - bigquery - com.google.apis - google-api-services-bigquery - ${bigquery.api.version} - - - com.google.oauth-client - google-oauth-client - ${project.oauth.version} - - - com.google.http-client - google-http-client-jackson2 - ${project.http.version} - - - com.google.oauth-client - google-oauth-client-jetty - ${project.oauth.version} + com.google.cloud + google-cloud-bigquery + 2.38.0 com.google.code.gson @@ -74,7 +53,7 @@ com.google.guava guava - ${guava.version} + 32.1.3-jre org.apache.commons diff --git a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java index a7446e6035d..66d1986a4a9 100644 --- a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java +++ b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java @@ -16,38 +16,30 @@ package org.apache.zeppelin.bigquery; -import com.google.api.client.googleapis.auth.oauth2.GoogleCredential; -import com.google.api.client.http.HttpTransport; -import com.google.api.client.http.javanet.NetHttpTransport; -import com.google.api.client.json.GenericJson; -import com.google.api.client.json.JsonFactory; -import com.google.api.client.json.jackson2.JacksonFactory; -import com.google.api.client.util.Joiner; -import com.google.api.services.bigquery.Bigquery; -import com.google.api.services.bigquery.Bigquery.Jobs.GetQueryResults; -import com.google.api.services.bigquery.BigqueryRequest; -import com.google.api.services.bigquery.BigqueryScopes; -import com.google.api.services.bigquery.model.GetQueryResultsResponse; -import com.google.api.services.bigquery.model.Job; -import com.google.api.services.bigquery.model.JobCancelResponse; -import com.google.api.services.bigquery.model.QueryRequest; -import com.google.api.services.bigquery.model.QueryResponse; -import com.google.api.services.bigquery.model.TableCell; -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableRow; -import com.google.common.base.Function; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.FieldValue; +import com.google.cloud.bigquery.FieldValueList; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.TableResult; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.Properties; +import java.util.UUID; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -56,9 +48,10 @@ import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.apache.zeppelin.user.AuthenticationInfo; /** - * BigQuery interpreter for Zeppelin. + * BigQuery interpreter for Zeppelin using modern google-cloud-bigquery client. * *
    *
  • {@code zeppelin.bigquery.project_id} - Project ID in GCP
  • @@ -71,7 +64,7 @@ * {@code %bigquery.sql
    * {@code * SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays - * FROM [bigquery-samples:airline_ontime_data.flights] + * FROM `bigquery-samples.airline_ontime_data.flights` * group by departure_airport * order by 2 desc * limit 10 @@ -83,9 +76,8 @@ public class BigQueryInterpreter extends Interpreter { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryInterpreter.class); private static final char NEWLINE = '\n'; private static final char TAB = '\t'; - private static Bigquery service = null; - //Mutex created to create the singleton in thread-safe fashion. - private static Object serviceLock = new Object(); + + private BigQuery service = null; static final String PROJECT_ID = "zeppelin.bigquery.project_id"; static final String WAIT_TIME = "zeppelin.bigquery.wait_time"; @@ -93,212 +85,181 @@ public class BigQueryInterpreter extends Interpreter { static final String SQL_DIALECT = "zeppelin.bigquery.sql_dialect"; static final String REGION = "zeppelin.bigquery.region"; - private static String jobId = null; - private static String projectId = null; - - private static final List NO_COMPLETION = new ArrayList<>(); + private JobId currentJobId = null; private Exception exceptionOnConnect; - private static final Function sequenceToStringTransformer = - new Function() { - public String apply(CharSequence seq) { - return seq.toString(); - } - }; + private static final List NO_COMPLETION = new ArrayList<>(); public BigQueryInterpreter(Properties property) { super(property); } - //Function to return valid BigQuery Service @Override public void open() { - if (service == null) { - synchronized (serviceLock) { - if (service == null) { - try { - service = createAuthorizedClient(); - exceptionOnConnect = null; - LOGGER.info("Opened BigQuery SQL Connection"); - } catch (IOException e) { - LOGGER.error("Cannot open connection", e); - exceptionOnConnect = e; - close(); - } - } + LOGGER.info("Opening BigQuery SQL Connection..."); + // Service initialization is lazy and depends on InterpreterContext in interpret() + // However, if we can init with ADC, we do it here. + try { + if (service == null) { + service = createDefaultClient(); + exceptionOnConnect = null; + LOGGER.info("Opened BigQuery SQL Connection with ADC"); } + } catch (Exception e) { + LOGGER.warn("Cannot open connection with Application Default Credentials. Will try user credentials on interpret.", e); + exceptionOnConnect = e; } } - //Function that Creates an authorized client to Google Bigquery. - private static Bigquery createAuthorizedClient() throws IOException { - HttpTransport transport = new NetHttpTransport(); - JsonFactory jsonFactory = new JacksonFactory(); - GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory); - - if (credential.createScopedRequired()) { - Collection bigqueryScopes = BigqueryScopes.all(); - credential = credential.createScoped(bigqueryScopes); + private BigQuery createDefaultClient() throws IOException { + BigQueryOptions.Builder builder = BigQueryOptions.newBuilder() + .setCredentials(GoogleCredentials.getApplicationDefault()); + + String projId = getProperty(PROJECT_ID); + if (StringUtils.isNotBlank(projId)) { + builder.setProjectId(projId); } - - return new Bigquery.Builder(transport, jsonFactory, credential) - .setApplicationName("Zeppelin/1.0 (GPN:Apache Zeppelin;)").build(); + + return builder.build().getService(); } - //Function that generates and returns the schema and the rows as string - public static String printRows(final GetQueryResultsResponse response) { - StringBuilder msg = new StringBuilder(); - try { - List schemNames = new ArrayList(); - for (TableFieldSchema schem: response.getSchema().getFields()) { - schemNames.add(schem.getName()); - } - msg.append(Joiner.on(TAB).join(schemNames)); - msg.append(NEWLINE); - for (TableRow row : response.getRows()) { - List fieldValues = new ArrayList(); - for (TableCell field : row.getF()) { - fieldValues.add(field.getV().toString()); - } - msg.append(Joiner.on(TAB).join(fieldValues)); - msg.append(NEWLINE); - } - return msg.toString(); - } catch (NullPointerException ex) { - throw new NullPointerException("SQL Execution returned an error!"); + private BigQuery getClientForUser(InterpreterContext context) throws IOException { + AuthenticationInfo authInfo = context.getAuthenticationInfo(); + + // Check if user has provided credentials via Zeppelin Credentials manager + if (authInfo != null && authInfo.getTicket() != null) { + // Typically we'd use something from credential manager, but let's assume JSON might be passed or we need z.input + // String userKey = authInfo.getTicket(); + } + + if (service != null) { + return service; } - } - //Function to poll a job for completion. Future use - public static Job pollJob(final Bigquery.Jobs.Get request, final long interval) - throws IOException, InterruptedException { - Job job = request.execute(); - while (!job.getStatus().getState().equals("DONE")) { - System.out.println("Job is " - + job.getStatus().getState() - + " waiting " + interval + " milliseconds..."); - Thread.sleep(interval); - job = request.execute(); + if (exceptionOnConnect != null) { + throw new IOException("Failed to initialize BigQuery client with ADC", exceptionOnConnect); } - return job; + + return createDefaultClient(); } - //Function to page through the results of an arbitrary bigQuery request - public static Iterator getPages( - final BigqueryRequest requestTemplate) { - class PageIterator implements Iterator { - private BigqueryRequest request; - private boolean hasNext = true; - PageIterator(final BigqueryRequest requestTemplate) { - this.request = requestTemplate; - } - public boolean hasNext() { - return hasNext; + private InterpreterResult executeSql(String sql, InterpreterContext context) { + BigQuery bqClient; + try { + bqClient = getClientForUser(context); + } catch (IOException e) { + // Fallback: Prompt user to input Service Account JSON via z.input + LOGGER.error("Authentication failed. Requesting service account JSON via GUI", e); + String saJson = (String) context.getGui().input("GCP Service Account JSON", ""); + if (StringUtils.isBlank(saJson)) { + return new InterpreterResult(Code.ERROR, "%html ⚠️ Authentication Required
    " + + "Could not find Application Default Credentials. Please input your Service Account JSON key in the form below and run again."); } - public T next() { - if (!hasNext) { - throw new NoSuchElementException(); - } - try { - T response = request.execute(); - if (response.containsKey("pageToken")) { - request = request.set("pageToken", response.get("pageToken")); - } else { - hasNext = false; - } - return response; - } catch (IOException e) { - return null; + try { + GoogleCredentials credentials = ServiceAccountCredentials.fromStream( + new ByteArrayInputStream(saJson.getBytes(StandardCharsets.UTF_8))); + BigQueryOptions.Builder builder = BigQueryOptions.newBuilder() + .setCredentials(credentials); + + String projId = getProperty(PROJECT_ID); + if (StringUtils.isNotBlank(projId)) { + builder.setProjectId(projId); } - } - - public void remove() { - this.next(); + + bqClient = builder.build().getService(); + service = bqClient; // Cache it for this interpreter instance + exceptionOnConnect = null; + } catch (IOException ex) { + return new InterpreterResult(Code.ERROR, "Failed to parse Service Account JSON: " + ex.getMessage()); } } - return new PageIterator(requestTemplate); - } - - //Function to call bigQuery to run SQL and return results to the Interpreter for output - private InterpreterResult executeSql(String sql) { - int counter = 0; - StringBuilder finalmessage = null; - finalmessage = new StringBuilder("%table "); - String projId = getProperty(PROJECT_ID); - long wTime = Long.parseLong(getProperty(WAIT_TIME)); - long maxRows = Long.parseLong(getProperty(MAX_ROWS)); + long wTime = Long.parseLong(getProperty(WAIT_TIME, "5000")); + long maxRows = Long.parseLong(getProperty(MAX_ROWS, "100000")); String sqlDialect = getProperty(SQL_DIALECT, "").toLowerCase(); String region = getProperty(REGION, null); - Boolean useLegacySql; + + QueryJobConfiguration.Builder queryConfigBuilder = QueryJobConfiguration.newBuilder(sql); + switch (sqlDialect) { case "standardsql": - useLegacySql = false; + queryConfigBuilder.setUseLegacySql(false); break; case "legacysql": - useLegacySql = true; + queryConfigBuilder.setUseLegacySql(true); break; default: - // Enable query prefix like '#standardSQL' if specified - useLegacySql = null; + // Use default (usually Standard SQL if not specified, though library default is legacy for backward compat. Better to set explicitly based on old behavior) + queryConfigBuilder.setUseLegacySql(null); } - Iterator pages; - try { - pages = run(sql, projId, wTime, maxRows, useLegacySql, region); - } catch (IOException ex) { - LOGGER.error(ex.getMessage()); - return new InterpreterResult(Code.ERROR, ex.getMessage()); + + QueryJobConfiguration queryConfig = queryConfigBuilder.build(); + + String jobIdStr = UUID.randomUUID().toString(); + if (StringUtils.isNotBlank(region)) { + currentJobId = JobId.newBuilder().setLocation(region).setJob(jobIdStr).build(); + } else { + currentJobId = JobId.of(jobIdStr); } + try { - while (pages.hasNext()) { - finalmessage.append(printRows(pages.next())); + LOGGER.info("Executing query: {}", sql); + Job queryJob = bqClient.create(JobInfo.newBuilder(queryConfig).setJobId(currentJobId).build()); + + // Wait for the query to complete + queryJob = queryJob.waitFor(); + + if (queryJob == null) { + return new InterpreterResult(Code.ERROR, "Job no longer exists"); + } else if (queryJob.getStatus().getError() != null) { + return new InterpreterResult(Code.ERROR, queryJob.getStatus().getError().toString()); } - return new InterpreterResult(Code.SUCCESS, finalmessage.toString()); - } catch (NullPointerException ex) { - return new InterpreterResult(Code.ERROR, ex.getMessage()); - } - } - //Function to run the SQL on bigQuery service - public static Iterator run(final String queryString, - final String projId, final long wTime, final long maxRows, - Boolean useLegacySql, final String region) - throws IOException { - try { - LOGGER.info("Use legacy sql: {}", useLegacySql); - QueryResponse query; - query = service - .jobs() - .query( - projId, - new QueryRequest().setTimeoutMs(wTime) - .setUseLegacySql(useLegacySql).setQuery(queryString) - .setMaxResults(maxRows)).execute(); - jobId = query.getJobReference().getJobId(); - projectId = query.getJobReference().getProjectId(); - GetQueryResults getRequest = service.jobs().getQueryResults( - projectId, - jobId); - if (StringUtils.isNotBlank(region)) { - getRequest = getRequest.setLocation(region); + TableResult result = queryJob.getQueryResults(); + + StringBuilder msg = new StringBuilder("%table "); + + // Get Schema + List schemaNames = new ArrayList<>(); + for (Field field : result.getSchema().getFields()) { + schemaNames.add(field.getName()); + } + msg.append(StringUtils.join(schemaNames, TAB)).append(NEWLINE); + + // Get Data + long count = 0; + for (FieldValueList row : result.iterateAll()) { + if (count >= maxRows) { + break; + } + List fieldValues = new ArrayList<>(); + for (FieldValue field : row) { + fieldValues.add(field.isNull() ? "null" : field.getValue().toString()); + } + msg.append(StringUtils.join(fieldValues, TAB)).append(NEWLINE); + count++; } - return getPages(getRequest); - } catch (IOException ex) { - throw ex; + + return new InterpreterResult(Code.SUCCESS, msg.toString()); + + } catch (Exception ex) { + LOGGER.error("Query execution failed", ex); + return new InterpreterResult(Code.ERROR, ex.getMessage()); + } finally { + currentJobId = null; } } @Override public void close() { LOGGER.info("Close bqsql connection!"); - service = null; } @Override public InterpreterResult interpret(String sql, InterpreterContext contextInterpreter) { LOGGER.info("Run SQL command '{}'", sql); - return executeSql(sql); + return executeSql(sql, contextInterpreter); } @Override @@ -320,18 +281,16 @@ public int getProgress(InterpreterContext context) { @Override public void cancel(InterpreterContext context) { LOGGER.info("Trying to Cancel current query statement."); - - if (service != null && jobId != null && projectId != null) { - try { - Bigquery.Jobs.Cancel request = service.jobs().cancel(projectId, jobId); - JobCancelResponse response = request.execute(); - jobId = null; - LOGGER.info("Query Execution cancelled"); - } catch (IOException ex) { - LOGGER.error("Could not cancel the SQL execution"); + if (service != null && currentJobId != null) { + boolean cancelled = service.cancel(currentJobId); + if (cancelled) { + LOGGER.info("Query Execution cancelled"); + } else { + LOGGER.warn("Query Execution cancellation returned false"); } + currentJobId = null; } else { - LOGGER.info("Query Execution was already cancelled"); + LOGGER.info("Query Execution was already cancelled or not started"); } } diff --git a/bigquery/src/test/java/org/apache/zeppelin/bigquery/BigQueryInterpreterTest.java b/bigquery/src/test/java/org/apache/zeppelin/bigquery/BigQueryInterpreterTest.java index 630530aa948..4001943737c 100644 --- a/bigquery/src/test/java/org/apache/zeppelin/bigquery/BigQueryInterpreterTest.java +++ b/bigquery/src/test/java/org/apache/zeppelin/bigquery/BigQueryInterpreterTest.java @@ -26,6 +26,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -53,7 +54,7 @@ public String getWrong() { @BeforeAll public static void initConstants() { - InputStream is = ClassLoader.class.getResourceAsStream("/constants.json"); + InputStream is = BigQueryInterpreterTest.class.getResourceAsStream("/constants.json"); constants = (new Gson()). fromJson(new InputStreamReader(is), Constants.class); } @@ -75,6 +76,10 @@ public void setUp() throws Exception { bqInterpreter = new BigQueryInterpreter(p); bqInterpreter.setInterpreterGroup(intpGroup); bqInterpreter.open(); + + context = InterpreterContext.builder() + .setAuthenticationInfo(AuthenticationInfo.ANONYMOUS) + .build(); } @Test diff --git a/docs/interpreter/bigquery.md b/docs/interpreter/bigquery.md index da696a74f2e..0169d0c1aa7 100644 --- a/docs/interpreter/bigquery.md +++ b/docs/interpreter/bigquery.md @@ -24,7 +24,7 @@ limitations under the License.
    ## Overview -[BigQuery](https://cloud.google.com/bigquery/what-is-bigquery) is a highly scalable no-ops data warehouse in the Google Cloud Platform. Querying massive datasets can be time consuming and expensive without the right hardware and infrastructure. Google BigQuery solves this problem by enabling super-fast SQL queries against append-only tables using the processing power of Google's infrastructure. Simply move your data into BigQuery and let us handle the hard work. You can control access to both the project and your data based on your business needs, such as giving others the ability to view or query your data. +[BigQuery](https://cloud.google.com/bigquery/what-is-bigquery) is a highly scalable no-ops data warehouse in the Google Cloud Platform. This interpreter uses the modern [google-cloud-bigquery](https://github.com/googleapis/java-bigquery) Cloud Client Library to provide high-performance data analytics. ## Configuration @@ -60,70 +60,42 @@ limitations under the License.
    +## Authentication -## BigQuery API -Zeppelin is built against BigQuery API version v2-rev265-1.21.0 - [API Javadocs](https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/) +The BigQuery interpreter supports two primary ways to authenticate: -## Enabling the BigQuery Interpreter +### 1. Application Default Credentials (ADC) -In a notebook, to enable the **BigQuery** interpreter, click the **Gear** icon and select **bigquery**. +This is the recommended approach for server environments. +- **Within GCP**: If Zeppelin is running on Google Compute Engine (GCE) or Google Kubernetes Engine (GKE), it will automatically use the attached service account. +- **Outside GCP**: Set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the path of your service account JSON key file, or run `gcloud auth application-default login` on the server. -### Provide Application Default Credentials +### 2. Service Account JSON Key (GUI Fallback) -Within Google Cloud Platform (e.g. Google App Engine, Google Compute Engine), -built-in credentials are used by default. +If no environment-level credentials are found, the interpreter will prompt you to input your **Service Account JSON key** directly in the notebook paragraph using an input form. -Outside of GCP, follow the Google API authentication instructions for [Zeppelin Google Cloud Storage](https://zeppelin.apache.org/docs/latest/setup/storage/storage.html#notebook-storage-in-google-cloud-storage) +**How to get a Service Account JSON key:** +1. Go to the [IAM & Admin > Service Accounts](https://console.cloud.google.com/iam-admin/serviceaccounts) page in the GCP Console. +2. Select or create a service account with `BigQuery User` and `BigQuery Data Viewer` roles. +3. Click the **Keys** tab, then **Add Key > Create new key**. +4. Choose **JSON** format and click **Create**. +5. When the BigQuery interpreter prompts you in Zeppelin, copy and paste the entire content of this JSON file into the input box. ## Using the BigQuery Interpreter -In a paragraph, use `%bigquery.sql` to select the **BigQuery** interpreter and then input SQL statements against your datasets stored in BigQuery. -You can use [BigQuery SQL Reference](https://cloud.google.com/bigquery/query-reference) to build your own SQL. +In a paragraph, use `%bigquery.sql` to select the **BigQuery** interpreter. -For Example, SQL to query for top 10 departure delays across airports using the flights public dataset +Example: Query top 10 departure delays using the flights public dataset (Standard SQL) -```bash +```sql %bigquery.sql -SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays -FROM [bigquery-samples:airline_ontime_data.flights] -group by departure_airport -order by 2 desc -limit 10 -``` - -Another Example, SQL to query for most commonly used java packages from the github data hosted in BigQuery - -```bash -%bigquery.sql -SELECT - package, - COUNT(*) count -FROM ( - SELECT - REGEXP_EXTRACT(line, r' ([a-z0-9\._]*)\.') package, - id - FROM ( - SELECT - SPLIT(content, '\n') line, - id - FROM - [bigquery-public-data:github_repos.sample_contents] - WHERE - content CONTAINS 'import' - AND sample_path LIKE '%.java' - HAVING - LEFT(line, 6)='import' ) - GROUP BY - package, - id ) -GROUP BY - 1 -ORDER BY - count DESC -LIMIT - 40 +SELECT departure_airport, count(case when departure_delay>0 then 1 else 0 end) as no_of_delays +FROM `bigquery-samples.airline_ontime_data.flights` +GROUP BY departure_airport +ORDER BY 2 DESC +LIMIT 10 ``` ## Technical description -For in-depth technical details on current implementation please refer to [bigquery/README.md](https://github.com/apache/zeppelin/blob/master/bigquery/README.md). +For more implementation details, please refer to the [BigQuery module README](https://github.com/apache/zeppelin/blob/master/bigquery/README.md). From 8c05db84311d16749c4c5471a4871fef6637e794 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Sun, 22 Mar 2026 14:29:40 +0900 Subject: [PATCH 2/6] [ZEPPELIN-6408] Address code review feedback: handle OAuth scopes, fix indentation, and add security warnings --- bigquery/README.md | 3 +- bigquery/pom.xml | 5 -- .../bigquery/BigQueryInterpreter.java | 79 +++++++++++-------- .../bigquery/BigQueryInterpreterTest.java | 9 ++- docs/interpreter/bigquery.md | 4 +- 5 files changed, 59 insertions(+), 41 deletions(-) diff --git a/bigquery/README.md b/bigquery/README.md index 73709de1b0d..ad6ac95168c 100644 --- a/bigquery/README.md +++ b/bigquery/README.md @@ -14,7 +14,8 @@ The interpreter supports multiple ways to authenticate with Google Cloud: 2. Select your project and service account. 3. Click **Keys** -> **Add Key** -> **Create new key**. 4. Select **JSON** and click **Create**. - 5. Copy the entire content of the downloaded JSON file and paste it into the Zeppelin input box when prompted. + 5. Copy the entire content of the downloaded JSON file and paste it into the Zeppelin input box when prompted. Treat this JSON key as a secret. + - **Security caution:** Do not paste this key into shared notes, notebooks, version control, or any place where it might be stored or visible to others. Prefer using Application Default Credentials (ADC) or Zeppelin's secure credentials mechanisms where possible, and only use this manual JSON key approach as a fallback when more secure options are not available. # Configuration | Property | Default | Description | diff --git a/bigquery/pom.xml b/bigquery/pom.xml index dd1bc61e88d..91accc8251f 100644 --- a/bigquery/pom.xml +++ b/bigquery/pom.xml @@ -50,11 +50,6 @@ gson ${gson.version} - - com.google.guava - guava - 32.1.3-jre - org.apache.commons commons-lang3 diff --git a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java index 66d1986a4a9..bddf82b02ce 100644 --- a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java +++ b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.UUID; @@ -52,31 +53,31 @@ /** * BigQuery interpreter for Zeppelin using modern google-cloud-bigquery client. - * + * *
      *
    • {@code zeppelin.bigquery.project_id} - Project ID in GCP
    • *
    • {@code zeppelin.bigquery.wait_time} - Query Timeout in ms
    • *
    • {@code zeppelin.bigquery.max_no_of_rows} - Max Result size
    • *
    - * + * *

    * How to use:
    * {@code %bigquery.sql
    * {@code - * SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays + * SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays * FROM `bigquery-samples.airline_ontime_data.flights` - * group by departure_airport - * order by 2 desc + * group by departure_airport + * order by 2 desc * limit 10 * } *

    - * + * */ public class BigQueryInterpreter extends Interpreter { private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryInterpreter.class); private static final char NEWLINE = '\n'; private static final char TAB = '\t'; - + private BigQuery service = null; static final String PROJECT_ID = "zeppelin.bigquery.project_id"; @@ -90,6 +91,10 @@ public class BigQueryInterpreter extends Interpreter { private static final List NO_COMPLETION = new ArrayList<>(); + private static final List BQ_SCOPES = Collections.singletonList( + "https://www.googleapis.com/auth/bigquery" + ); + public BigQueryInterpreter(Properties property) { super(property); } @@ -106,29 +111,35 @@ public void open() { LOGGER.info("Opened BigQuery SQL Connection with ADC"); } } catch (Exception e) { - LOGGER.warn("Cannot open connection with Application Default Credentials. Will try user credentials on interpret.", e); + LOGGER.warn("Cannot open connection with Application Default Credentials. " + + "Will try user credentials on interpret.", e); exceptionOnConnect = e; } } private BigQuery createDefaultClient() throws IOException { + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault(); + if (credentials.createScopedRequired()) { + credentials = credentials.createScoped(BQ_SCOPES); + } + BigQueryOptions.Builder builder = BigQueryOptions.newBuilder() - .setCredentials(GoogleCredentials.getApplicationDefault()); - + .setCredentials(credentials); + String projId = getProperty(PROJECT_ID); if (StringUtils.isNotBlank(projId)) { builder.setProjectId(projId); } - + return builder.build().getService(); } private BigQuery getClientForUser(InterpreterContext context) throws IOException { AuthenticationInfo authInfo = context.getAuthenticationInfo(); - + // Check if user has provided credentials via Zeppelin Credentials manager if (authInfo != null && authInfo.getTicket() != null) { - // Typically we'd use something from credential manager, but let's assume JSON might be passed or we need z.input + // Typically we'd use something from credential manager, but let's assume JSON might be passed // String userKey = authInfo.getTicket(); } @@ -139,7 +150,7 @@ private BigQuery getClientForUser(InterpreterContext context) throws IOException if (exceptionOnConnect != null) { throw new IOException("Failed to initialize BigQuery client with ADC", exceptionOnConnect); } - + return createDefaultClient(); } @@ -153,24 +164,30 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { String saJson = (String) context.getGui().input("GCP Service Account JSON", ""); if (StringUtils.isBlank(saJson)) { return new InterpreterResult(Code.ERROR, "%html ⚠️ Authentication Required
    " + - "Could not find Application Default Credentials. Please input your Service Account JSON key in the form below and run again."); + "Could not find Application Default Credentials. Please input your " + + "Service Account JSON key in the form below and run again."); } try { GoogleCredentials credentials = ServiceAccountCredentials.fromStream( new ByteArrayInputStream(saJson.getBytes(StandardCharsets.UTF_8))); + if (credentials.createScopedRequired()) { + credentials = credentials.createScoped(BQ_SCOPES); + } + BigQueryOptions.Builder builder = BigQueryOptions.newBuilder() .setCredentials(credentials); - + String projId = getProperty(PROJECT_ID); if (StringUtils.isNotBlank(projId)) { builder.setProjectId(projId); } - + bqClient = builder.build().getService(); service = bqClient; // Cache it for this interpreter instance exceptionOnConnect = null; } catch (IOException ex) { - return new InterpreterResult(Code.ERROR, "Failed to parse Service Account JSON: " + ex.getMessage()); + return new InterpreterResult(Code.ERROR, "Failed to parse Service Account JSON: " + + ex.getMessage()); } } @@ -180,7 +197,7 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { String region = getProperty(REGION, null); QueryJobConfiguration.Builder queryConfigBuilder = QueryJobConfiguration.newBuilder(sql); - + switch (sqlDialect) { case "standardsql": queryConfigBuilder.setUseLegacySql(false); @@ -189,17 +206,17 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { queryConfigBuilder.setUseLegacySql(true); break; default: - // Use default (usually Standard SQL if not specified, though library default is legacy for backward compat. Better to set explicitly based on old behavior) + // Use default (Usually Standard SQL if not specified) queryConfigBuilder.setUseLegacySql(null); } QueryJobConfiguration queryConfig = queryConfigBuilder.build(); - + String jobIdStr = UUID.randomUUID().toString(); if (StringUtils.isNotBlank(region)) { - currentJobId = JobId.newBuilder().setLocation(region).setJob(jobIdStr).build(); + currentJobId = JobId.newBuilder().setLocation(region).setJob(jobIdStr).build(); } else { - currentJobId = JobId.of(jobIdStr); + currentJobId = JobId.of(jobIdStr); } try { @@ -216,21 +233,21 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { } TableResult result = queryJob.getQueryResults(); - + StringBuilder msg = new StringBuilder("%table "); - + // Get Schema List schemaNames = new ArrayList<>(); for (Field field : result.getSchema().getFields()) { schemaNames.add(field.getName()); } msg.append(StringUtils.join(schemaNames, TAB)).append(NEWLINE); - + // Get Data long count = 0; for (FieldValueList row : result.iterateAll()) { if (count >= maxRows) { - break; + break; } List fieldValues = new ArrayList<>(); for (FieldValue field : row) { @@ -239,9 +256,9 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { msg.append(StringUtils.join(fieldValues, TAB)).append(NEWLINE); count++; } - + return new InterpreterResult(Code.SUCCESS, msg.toString()); - + } catch (Exception ex) { LOGGER.error("Query execution failed", ex); return new InterpreterResult(Code.ERROR, ex.getMessage()); @@ -284,9 +301,9 @@ public void cancel(InterpreterContext context) { if (service != null && currentJobId != null) { boolean cancelled = service.cancel(currentJobId); if (cancelled) { - LOGGER.info("Query Execution cancelled"); + LOGGER.info("Query Execution cancelled"); } else { - LOGGER.warn("Query Execution cancellation returned false"); + LOGGER.warn("Query Execution cancellation returned false"); } currentJobId = null; } else { diff --git a/bigquery/src/test/java/org/apache/zeppelin/bigquery/BigQueryInterpreterTest.java b/bigquery/src/test/java/org/apache/zeppelin/bigquery/BigQueryInterpreterTest.java index 4001943737c..6e79ed515b5 100644 --- a/bigquery/src/test/java/org/apache/zeppelin/bigquery/BigQueryInterpreterTest.java +++ b/bigquery/src/test/java/org/apache/zeppelin/bigquery/BigQueryInterpreterTest.java @@ -19,6 +19,7 @@ import com.google.gson.Gson; import static org.junit.jupiter.api.Assertions.assertEquals; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Properties; @@ -53,9 +54,11 @@ public String getWrong() { protected static Constants constants = null; @BeforeAll - public static void initConstants() { - InputStream is = BigQueryInterpreterTest.class.getResourceAsStream("/constants.json"); - constants = (new Gson()). fromJson(new InputStreamReader(is), Constants.class); + public static void initConstants() throws IOException { + try (InputStream is = BigQueryInterpreterTest.class.getResourceAsStream("/constants.json"); + InputStreamReader reader = new InputStreamReader(is)) { + constants = (new Gson()). fromJson(reader, Constants.class); + } } private InterpreterGroup intpGroup; diff --git a/docs/interpreter/bigquery.md b/docs/interpreter/bigquery.md index 0169d0c1aa7..837abb52393 100644 --- a/docs/interpreter/bigquery.md +++ b/docs/interpreter/bigquery.md @@ -79,7 +79,9 @@ If no environment-level credentials are found, the interpreter will prompt you t 2. Select or create a service account with `BigQuery User` and `BigQuery Data Viewer` roles. 3. Click the **Keys** tab, then **Add Key > Create new key**. 4. Choose **JSON** format and click **Create**. -5. When the BigQuery interpreter prompts you in Zeppelin, copy and paste the entire content of this JSON file into the input box. +5. When the BigQuery interpreter prompts you in Zeppelin, copy and paste the entire content of this JSON file into the input box. Treat this JSON key as a secret. + +**Security caution:** Do not paste this key into shared notes, notebooks, version control, or any place where it might be stored or visible to others. Prefer using Application Default Credentials (ADC) or Zeppelin's secure credentials mechanisms where possible, and only use this manual JSON key approach as a fallback when more secure options are not available. ## Using the BigQuery Interpreter From d65bd642da9ef36b5ff78a336cceccaa785cfa64 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Sun, 22 Mar 2026 14:30:49 +0900 Subject: [PATCH 3/6] [ZEPPELIN-6408] Address additional review feedback: volatile jobId, job timeout, and cancel error handling --- .../bigquery/BigQueryInterpreter.java | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java index bddf82b02ce..a93d9f24e45 100644 --- a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java +++ b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java @@ -86,7 +86,7 @@ public class BigQueryInterpreter extends Interpreter { static final String SQL_DIALECT = "zeppelin.bigquery.sql_dialect"; static final String REGION = "zeppelin.bigquery.region"; - private JobId currentJobId = null; + private volatile JobId currentJobId = null; private Exception exceptionOnConnect; private static final List NO_COMPLETION = new ArrayList<>(); @@ -183,7 +183,7 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { } bqClient = builder.build().getService(); - service = bqClient; // Cache it for this interpreter instance + // Do not cache this client in a shared field to avoid leaking user credentials exceptionOnConnect = null; } catch (IOException ex) { return new InterpreterResult(Code.ERROR, "Failed to parse Service Account JSON: " + @@ -196,7 +196,8 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { String sqlDialect = getProperty(SQL_DIALECT, "").toLowerCase(); String region = getProperty(REGION, null); - QueryJobConfiguration.Builder queryConfigBuilder = QueryJobConfiguration.newBuilder(sql); + QueryJobConfiguration.Builder queryConfigBuilder = QueryJobConfiguration.newBuilder(sql) + .setJobTimeoutMs(wTime); switch (sqlDialect) { case "standardsql": @@ -299,13 +300,18 @@ public int getProgress(InterpreterContext context) { public void cancel(InterpreterContext context) { LOGGER.info("Trying to Cancel current query statement."); if (service != null && currentJobId != null) { - boolean cancelled = service.cancel(currentJobId); - if (cancelled) { - LOGGER.info("Query Execution cancelled"); - } else { - LOGGER.warn("Query Execution cancellation returned false"); + try { + boolean cancelled = service.cancel(currentJobId); + if (cancelled) { + LOGGER.info("Query Execution cancelled"); + } else { + LOGGER.warn("Query Execution cancellation returned false"); + } + } catch (RuntimeException e) { + LOGGER.warn("Failed to cancel BigQuery job {}", currentJobId, e); + } finally { + currentJobId = null; } - currentJobId = null; } else { LOGGER.info("Query Execution was already cancelled or not started"); } From 1e786e519423e8cb757e8ed28e522e8aa8bcf66c Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Sun, 22 Mar 2026 14:58:34 +0900 Subject: [PATCH 4/6] [ZEPPELIN-6408] Fix Checkstyle violation: line length exceeded 100 characters --- .../java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java index a93d9f24e45..d7ded5945b3 100644 --- a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java +++ b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java @@ -222,7 +222,8 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { try { LOGGER.info("Executing query: {}", sql); - Job queryJob = bqClient.create(JobInfo.newBuilder(queryConfig).setJobId(currentJobId).build()); + Job queryJob = bqClient.create( + JobInfo.newBuilder(queryConfig).setJobId(currentJobId).build()); // Wait for the query to complete queryJob = queryJob.waitFor(); From 48b32e44f16e858d72fd31db9cef5e149ee56b1b Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Thu, 23 Apr 2026 12:52:26 +0900 Subject: [PATCH 5/6] [ZEPPELIN-6408] Use masked password input for Service Account JSON fallback Replace deprecated GUI.input() with GUI.password() for the Service Account JSON fallback prompt so the key is not rendered in plaintext in the notebook paragraph. Update README and docs accordingly. --- bigquery/README.md | 4 ++-- .../org/apache/zeppelin/bigquery/BigQueryInterpreter.java | 6 ++++-- docs/interpreter/bigquery.md | 4 ++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/bigquery/README.md b/bigquery/README.md index ad6ac95168c..4cf3e9715f8 100644 --- a/bigquery/README.md +++ b/bigquery/README.md @@ -8,13 +8,13 @@ The interpreter supports multiple ways to authenticate with Google Cloud: This is the recommended way. If Zeppelin is running on GCE, GKE, or any environment where `gcloud auth application-default login` has been executed, the interpreter will automatically discover the credentials. 2. **Service Account JSON Key (Manual Fallback)**: - If ADC is not available, the interpreter will prompt you to input your Service Account JSON key through the Zeppelin GUI. + If ADC is not available, the interpreter will prompt you to paste your Service Account JSON key into a **masked password field** rendered in the notebook paragraph. - To get a JSON key: 1. Go to the [GCP Console Service Accounts page](https://console.cloud.google.com/iam-admin/serviceaccounts). 2. Select your project and service account. 3. Click **Keys** -> **Add Key** -> **Create new key**. 4. Select **JSON** and click **Create**. - 5. Copy the entire content of the downloaded JSON file and paste it into the Zeppelin input box when prompted. Treat this JSON key as a secret. + 5. Copy the entire content of the downloaded JSON file and paste it into the Zeppelin masked password field when prompted. The input is masked so it is not displayed in plaintext, but you should still treat this JSON key as a secret. - **Security caution:** Do not paste this key into shared notes, notebooks, version control, or any place where it might be stored or visible to others. Prefer using Application Default Credentials (ADC) or Zeppelin's secure credentials mechanisms where possible, and only use this manual JSON key approach as a fallback when more secure options are not available. # Configuration diff --git a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java index d7ded5945b3..6e30ede6a94 100644 --- a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java +++ b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java @@ -159,9 +159,11 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { try { bqClient = getClientForUser(context); } catch (IOException e) { - // Fallback: Prompt user to input Service Account JSON via z.input + // Fallback: prompt for Service Account JSON via a masked password form + // to avoid rendering the key in plaintext in the note UI. LOGGER.error("Authentication failed. Requesting service account JSON via GUI", e); - String saJson = (String) context.getGui().input("GCP Service Account JSON", ""); + Object raw = context.getGui().password("GCP Service Account JSON"); + String saJson = raw == null ? "" : raw.toString(); if (StringUtils.isBlank(saJson)) { return new InterpreterResult(Code.ERROR, "%html ⚠️ Authentication Required
    " + "Could not find Application Default Credentials. Please input your " + diff --git a/docs/interpreter/bigquery.md b/docs/interpreter/bigquery.md index 837abb52393..40cd3936856 100644 --- a/docs/interpreter/bigquery.md +++ b/docs/interpreter/bigquery.md @@ -72,14 +72,14 @@ This is the recommended approach for server environments. ### 2. Service Account JSON Key (GUI Fallback) -If no environment-level credentials are found, the interpreter will prompt you to input your **Service Account JSON key** directly in the notebook paragraph using an input form. +If no environment-level credentials are found, the interpreter will prompt you to paste your **Service Account JSON key** directly in the notebook paragraph using a **masked password form**, so the key is not displayed in plaintext. **How to get a Service Account JSON key:** 1. Go to the [IAM & Admin > Service Accounts](https://console.cloud.google.com/iam-admin/serviceaccounts) page in the GCP Console. 2. Select or create a service account with `BigQuery User` and `BigQuery Data Viewer` roles. 3. Click the **Keys** tab, then **Add Key > Create new key**. 4. Choose **JSON** format and click **Create**. -5. When the BigQuery interpreter prompts you in Zeppelin, copy and paste the entire content of this JSON file into the input box. Treat this JSON key as a secret. +5. When the BigQuery interpreter prompts you in Zeppelin, copy and paste the entire content of this JSON file into the masked password field. The input is masked so it is not displayed in plaintext, but you should still treat this JSON key as a secret. **Security caution:** Do not paste this key into shared notes, notebooks, version control, or any place where it might be stored or visible to others. Prefer using Application Default Credentials (ADC) or Zeppelin's secure credentials mechanisms where possible, and only use this manual JSON key approach as a fallback when more secure options are not available. From 1ebddea157571e23a6e845dec5c4e5964b35a15c Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Thu, 30 Apr 2026 20:37:26 +0900 Subject: [PATCH 6/6] [ZEPPELIN-6408] Improve fallback UX and per-paragraph cancel safety - Hide the Service Account JSON form on subsequent runs once a value has been supplied. The form is only re-rendered when the value is missing or when the supplied key fails to parse. - Replace the single volatile currentJobId field with a per-paragraph ConcurrentMap of running queries (client + job id). In shared interpreter mode, concurrent paragraphs no longer clobber each other's job state, so cancel always targets the correct job. --- .../bigquery/BigQueryInterpreter.java | 75 ++++++++++++------- 1 file changed, 49 insertions(+), 26 deletions(-) diff --git a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java index 6e30ede6a94..11999d92b59 100644 --- a/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java +++ b/bigquery/src/main/java/org/apache/zeppelin/bigquery/BigQueryInterpreter.java @@ -41,6 +41,8 @@ import java.util.List; import java.util.Properties; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; @@ -86,9 +88,23 @@ public class BigQueryInterpreter extends Interpreter { static final String SQL_DIALECT = "zeppelin.bigquery.sql_dialect"; static final String REGION = "zeppelin.bigquery.region"; - private volatile JobId currentJobId = null; + private static final String SA_JSON_FORM_KEY = "GCP Service Account JSON"; + + // Tracks running queries per paragraph so concurrent paragraphs in shared + // interpreter mode don't clobber each other's job/cancel state. + private final ConcurrentMap runningQueries = new ConcurrentHashMap<>(); private Exception exceptionOnConnect; + private static final class RunningQuery { + private final BigQuery client; + private final JobId jobId; + + RunningQuery(BigQuery client, JobId jobId) { + this.client = client; + this.jobId = jobId; + } + } + private static final List NO_COMPLETION = new ArrayList<>(); private static final List BQ_SCOPES = Collections.singletonList( @@ -159,15 +175,17 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { try { bqClient = getClientForUser(context); } catch (IOException e) { - // Fallback: prompt for Service Account JSON via a masked password form - // to avoid rendering the key in plaintext in the note UI. - LOGGER.error("Authentication failed. Requesting service account JSON via GUI", e); - Object raw = context.getGui().password("GCP Service Account JSON"); - String saJson = raw == null ? "" : raw.toString(); + // Fallback: read a previously-supplied Service Account JSON from paragraph + // form params, or render a masked password form to collect it. + LOGGER.error("Authentication failed. Falling back to user-supplied SA JSON via GUI", e); + Object existing = context.getGui().getParams().get(SA_JSON_FORM_KEY); + String saJson = existing == null ? "" : existing.toString(); if (StringUtils.isBlank(saJson)) { + // No value yet: render the masked form so the user can enter the key. + context.getGui().password(SA_JSON_FORM_KEY); return new InterpreterResult(Code.ERROR, "%html ⚠️ Authentication Required
    " + "Could not find Application Default Credentials. Please input your " + - "Service Account JSON key in the form below and run again."); + "Service Account JSON key in the form and run again."); } try { GoogleCredentials credentials = ServiceAccountCredentials.fromStream( @@ -188,6 +206,8 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { // Do not cache this client in a shared field to avoid leaking user credentials exceptionOnConnect = null; } catch (IOException ex) { + // Re-render the masked form so the user can correct an invalid key. + context.getGui().password(SA_JSON_FORM_KEY); return new InterpreterResult(Code.ERROR, "Failed to parse Service Account JSON: " + ex.getMessage()); } @@ -216,16 +236,19 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { QueryJobConfiguration queryConfig = queryConfigBuilder.build(); String jobIdStr = UUID.randomUUID().toString(); + JobId jobId; if (StringUtils.isNotBlank(region)) { - currentJobId = JobId.newBuilder().setLocation(region).setJob(jobIdStr).build(); + jobId = JobId.newBuilder().setLocation(region).setJob(jobIdStr).build(); } else { - currentJobId = JobId.of(jobIdStr); + jobId = JobId.of(jobIdStr); } + String paragraphId = context.getParagraphId(); + runningQueries.put(paragraphId, new RunningQuery(bqClient, jobId)); try { LOGGER.info("Executing query: {}", sql); Job queryJob = bqClient.create( - JobInfo.newBuilder(queryConfig).setJobId(currentJobId).build()); + JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); // Wait for the query to complete queryJob = queryJob.waitFor(); @@ -267,7 +290,7 @@ private InterpreterResult executeSql(String sql, InterpreterContext context) { LOGGER.error("Query execution failed", ex); return new InterpreterResult(Code.ERROR, ex.getMessage()); } finally { - currentJobId = null; + runningQueries.remove(paragraphId); } } @@ -301,22 +324,22 @@ public int getProgress(InterpreterContext context) { @Override public void cancel(InterpreterContext context) { - LOGGER.info("Trying to Cancel current query statement."); - if (service != null && currentJobId != null) { - try { - boolean cancelled = service.cancel(currentJobId); - if (cancelled) { - LOGGER.info("Query Execution cancelled"); - } else { - LOGGER.warn("Query Execution cancellation returned false"); - } - } catch (RuntimeException e) { - LOGGER.warn("Failed to cancel BigQuery job {}", currentJobId, e); - } finally { - currentJobId = null; - } - } else { + String paragraphId = context.getParagraphId(); + LOGGER.info("Trying to cancel query for paragraph {}", paragraphId); + RunningQuery running = runningQueries.remove(paragraphId); + if (running == null) { LOGGER.info("Query Execution was already cancelled or not started"); + return; + } + try { + boolean cancelled = running.client.cancel(running.jobId); + if (cancelled) { + LOGGER.info("Query Execution cancelled"); + } else { + LOGGER.warn("Query Execution cancellation returned false"); + } + } catch (RuntimeException e) { + LOGGER.warn("Failed to cancel BigQuery job {}", running.jobId, e); } }