Databricks Notebook Commands Command Purpose Example %config Set configuration options for the notebook %env Set environment variables %fs Interact with the Databricks file system %fs ls dbfs:/repo %load Loads the contents of a file into a cell %lsmagic List all magic commands %jobs Lists all running jobs %matplotlib sets up the matplotlib backend %md Write Markdown text %pip Install Python packages %python Executes python code %python dbutils.fs.rm("/user/hive/warehouse/test/", True) %r Execute R code %reload reloads module contents %run Executes a Python file or a notebook %scala Executes scala code %sh Executes shell commands on the cluster nodes %sh git clone https://github.com/repo/test %sql Executes SQL queries %who Lists all the variables in the current scope Notebook Widgets # Create widgets dbutils.widgets.text("param_name", "default_value", "label") dbutils.widgets.dropdown("param_name", "default", ["option1", "option2"]) dbutils.widgets.multiselect("param_name", "default", ["option1", "option2"]) dbutils.widgets.combobox("param_name", "default", ["option1", "option2"]) # Get widget values param_value = dbutils.widgets.get("param_name") # Remove widget dbutils.widgets.remove("param_name") dbutils.widgets.removeAll() Secrets Management # Create secret scope dbutils.secrets.createScope("scope_name") # Store secret dbutils.secrets.put("scope_name", "secret_key", "secret_value") # Retrieve secret secret_value = dbutils.secrets.get("scope_name", "secret_key") # List secrets dbutils.secrets.list("scope_name") # Delete secret dbutils.secrets.delete("scope_name", "secret_key") Accessing Files /path/to/file (local) dbfs:/path/to/file (DBFS) file:/path/to/file (driver filesystem) s3://path/to/file (S3) /Volumes/catalog/schema/volume/path (Unity Catalog Volumes) Copying Files %fs cp file:/<path> /Volumes/<catalog>/<schema>/<volume>/<path> %python dbutils.fs.cp("file:/<path>", "/Volumes/<catalog>/<schema>/<volume>/<path>") %python dbutils.fs.cp("file:/databricks/driver/test", "dbfs:/repo", True) %sh cp /<path> /Volumes/<catalog>/<schema>/<volume>/<path> SQL Statements (DDL) Create & Use Schema CREATE SCHEMA test; CREATE SCHEMA custom LOCATION 'dbfs:/custom'; USE SCHEMA test; Unity Catalog (UC) -- Create catalog CREATE CATALOG my_catalog COMMENT "Production catalog"; -- Create schema in UC CREATE SCHEMA my_catalog.my_schema; USE CATALOG my_catalog; USE SCHEMA my_schema; -- Create volume (for files) CREATE VOLUME my_catalog.my_schema.my_volume; ALTER VOLUME my_catalog.my_schema.my_volume OWNER TO `team@company.com`; -- List catalogs, schemas, volumes SHOW CATALOGS; SHOW SCHEMAS IN my_catalog; SHOW VOLUMES IN my_catalog.my_schema; -- Grant permissions GRANT USAGE ON CATALOG my_catalog TO `user@company.com`; GRANT READ_VOLUME ON VOLUME my_catalog.my_schema.my_volume TO `user@company.com`; Create Table CREATE TABLE test(col1 INT, col2 STRING, col3 STRING, col4 BIGINT, col5 INT, col6 FLOAT); CREATE TABLE test AS SELECT * EXCEPT (_rescued_data) FROM read_files('/repo/data/test.csv'); CREATE TABLE test USING CSV LOCATION '/repo/data/test.csv'; CREATE TABLE test USING CSV OPTIONS (header="true") LOCATION '/repo/data/test.csv'; CREATE TABLE test AS SELECT * EXCEPT (_rescued_data) FROM read_files('/repo/data/test.csv'); CREATE TABLE test AS ... CREATE TABLE test USING ... CREATE TABLE test(id INT, title STRING, col1 STRING, publish_time BIGINT, pages INT, price FLOAT) COMMENT 'This is comment for the table itself'; CREATE TABLE test AS SELECT * EXCEPT (_rescued_data) FROM read_files('/repo/data/test.json', format => 'json'); CREATE TABLE test_raw AS SELECT * EXCEPT (_rescued_data) FROM read_files('/repo/data/test.csv', sep => ';'); CREATE TABLE custom_table_test LOCATION 'dbfs:/custom-table' AS SELECT * EXCEPT (_rescued_data) FROM read_files('/repo/data/test.csv'); CREATE TABLE test PARTITIONED BY (col1) AS SELECT * EXCEPT (_rescued_data) FROM read_files('/repo/data/test.csv') CREATE TABLE users( firstname STRING, lastname STRING, full_name STRING GENERATED ALWAYS AS (concat(firstname, ' ', lastname)) ); CREATE OR REPLACE TABLE test AS SELECT * EXCEPT (_rescued_data) FROM read_files('/repo/data/test.csv'); CREATE OR REPLACE TABLE test AS SELECT * FROM json.`/repo/data/test.json`; CREATE OR REPLACE TABLE test AS SELECT * FROM read_files('/repo/data/test.csv'); Create View CREATE VIEW view_test AS SELECT * FROM test WHERE col1 = 'test'; CREATE VIEW view_test AS SELECT col1, col1 FROM test JOIN test2 ON test.col2 == test2.col2; CREATE TEMP VIEW temp_test AS SELECT * FROM test WHERE col1 = 'test'; CREATE TEMP VIEW temp_test AS SELECT * FROM read_files('/repo/data/test.csv'); CREATE GLOBAL TEMP VIEW view_test AS SELECT * FROM test WHERE col1 = 'test'; SELECT * FROM global_temp.view_test; CREATE TEMP VIEW jdbc_example USING JDBC OPTIONS ( url "<jdbc-url>", dbtable "<table-name>", user '<username>', password '<password>'); CREATE OR REPLACE TEMP VIEW test AS SELECT * FROM delta.`<logpath>`; CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-id>"); CREATE OR REPLACE TEMP VIEW test_view AS SELECT test.col1 AS col1 FROM test_table WHERE col1 = 'value1' ORDER BY timestamp DESC LIMIT 1; Drop DROP TABLE test; Describe SHOW TABLES; DESCRIBE EXTENDED test; SQL Statements (DML) Select SELECT * FROM csv.`/repo/data/test.csv`; SELECT * FROM read_files('/repo/data/test.csv'); SELECT * FROM read_files('/repo/data/test.csv', format => 'csv', header => 'true', sep => ',') SELECT * FROM json.`/repo/data/test.json`; SELECT * FROM json.`/repo/data/*.json`; SELECT * FROM test WHERE year(from_unixtime(test_time)) > 1900; SELECT * FROM test WHERE title LIKE '%a%' SELECT * FROM test WHERE title LIKE 'a%' SELECT * FROM test WHERE title LIKE '%a' SELECT * FROM test TIMESTAMP AS OF '2024-01-01T00:00:00.000Z'; SELECT * FROM test VERSION AS OF 2; SELECT * FROM test@v2; SELECT * FROM event_log("<pipeline-id>"); SELECT count(*) FROM VALUES (NULL), (10), (10) AS example(col); SELECT count(col) FROM VALUES (NULL), (10), (10) AS example(col); SELECT count_if(col1 = 'test') FROM test; SELECT from_unixtime(test_time) FROM test; SELECT cast(test_time / 1 AS timestamp) FROM test; SELECT cast(cast(test_time AS BIGINT) AS timestamp) FROM test; SELECT element.sub_element FROM test; SELECT flatten(array(array(1, 2), array(3, 4))); SELECT * FROM ( SELECT col1, col2 FROM test ) PIVOT ( sum(col1) for col2 in ('item1','item2') ); SELECT *, CASE WHEN col1 > 10 THEN 'value1' ELSE 'value2' END FROM test; SELECT * FROM test ORDER BY (CASE WHEN col1 > 10 THEN col2 ELSE col3 END); WITH t(col1, col2) AS (SELECT 1, 2) SELECT * FROM t WHERE col1 = 1; SELECT details:flow_definition.output_dataset as output_dataset, details:flow_definition.input_datasets as input_dataset FROM event_log_raw, latest_update WHERE event_type = 'flow_definition' AND origin.update_id = latest_update.id; Insert INSERT OVERWRITE test SELECT * FROM read_files('/repo/data/test.csv'); INSERT INTO test(col1, col2) VALUES ('value1', 'value2'); Merge Into MERGE INTO test USING test_to_delete ON test.col1 = test_to_delete.col1 WHEN MATCHED THEN DELETE; MERGE INTO test USING test_to_update ON test.col1 = test_to_update.col1 WHEN MATCHED THEN UPDATE SET *; MERGE INTO test USING test_to_insert ON test.col1 = test_to_insert.col1 WHEN NOT MATCHED THEN INSERT *; Copy Into COPY INTO test FROM '/repo/data' FILEFORMAT = CSV FILES = ('test.csv') FORMAT_OPTIONS('header' = 'true', 'inferSchema' = 'true'); Spark DataFrame API Read Data # Read CSV df = spark.read.format("csv").option("header", "true").load("/path/to/file.csv") df = spark.read.csv("/path/to/file.csv", header=True) # Read Parquet df = spark.read.parquet("/path/to/file.parquet") # Read JSON df = spark.read.json("/path/to/file.json") # Read Delta table df = spark.read.table("my_table") df = spark.read.format("delta").load("/path/to/delta/table") # Read from Volumes df = spark.read.csv("/Volumes/catalog/schema/volume/file.csv", header=True) Write Data # Write modes: overwrite, append, ignore, error df.write.mode("overwrite").format("parquet").save("/path/to/output") df.write.mode("overwrite").option("mergeSchema", "true").format("delta").save("/path/to/delta") # Write to table df.write.mode("overwrite").saveAsTable("my_table") df.write.mode("overwrite").option("path", "/path").saveAsTable("my_table") # Write to Volume df.write.mode("overwrite").parquet("/Volumes/catalog/schema/volume/output") Common Transformations # Select columns df.select("col1", "col2") df.select(df.col1, df.col2) # Filter/Where df.filter(df.col1 > 10) df.where("col1 > 10") # GroupBy and aggregations df.groupBy("col1").agg({"col2": "sum", "col3": "count"}) from pyspark.sql.functions import sum, count, avg df.groupBy("col1").agg(sum("col2"), count("col3")) # Joins df1.join(df2, on="col1", how="inner") df1.join(df2, (df1.col1 == df2.col1) & (df1.col2 == df2.col2), how="left") # Distinct/Dedup df.distinct() df.dropDuplicates(["col1", "col2"]) # Sort df.sort("col1", ascending=False) df.orderBy(df.col1.desc()) Performance Optimization Delta Lake Optimization -- Optimize table (compacts small files) OPTIMIZE my_table; OPTIMIZE my_table ZORDER BY col1, col2; -- Check table stats ANALYZE TABLE my_table COMPUTE STATISTICS; ANALYZE TABLE my_table COMPUTE STATISTICS FOR COLUMNS col1, col2; -- View statistics DESCRIBE EXTENDED my_table; Partitioning Strategy # Write with partitioning df.write \ .mode("overwrite") \ .partitionBy("date", "region") \ .format("delta") \ .save("/path/to/table") # Partition pruning (applied automatically) # SELECT * FROM table WHERE date = '2024-01-01' AND region = 'US' Query Performance # Enable adaptive query execution spark.conf.set("spark.sql.adaptive.enabled", "true") # Enable vectorized execution spark.conf.set("spark.sql.execution.arrow.enabled", "true") # Set shuffle partitions spark.conf.set("spark.sql.shuffle.partitions", "200") # Monitor query plans df.explain(mode="extended") Delta Lake Statements DESCRIBE HISTORY test; DESCRIBE HISTORY test LIMIT 1; INSERT INTO test SELECT * FROM test@v2 WHERE id = 3; OPTIMIZE test; OPTIMIZE test ZORDER BY col1; RESTORE TABLE test TO VERSION AS OF 0; SELECT * FROM test TIMESTAMP AS OF '2024-01-01T00:00:00.000Z'; SELECT * FROM test VERSION AS OF 2; SELECT * FROM test@v2; VACUUM test; VACUUM test RETAIN 240 HOURS; %fs ls dbfs:/user/hive/warehouse/test/_delta_log %python spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false") Delta Live Table Statements CREATE OR REFRESH LIVE TABLE test_raw AS SELECT * FROM json.`/repo/data/test.json`; CREATE OR REFRESH STREAMING TABLE test AS SELECT * FROM STREAM read_files('/repo/data/test*.json'); CREATE OR REFRESH LIVE TABLE test_cleaned AS SELECT col1, col2, col3, col4 FROM live.test_raw; CREATE OR REFRESH LIVE TABLE recent_test AS SELECT col1, col2 FROM live.test2 ORDER BY creation_time DESC LIMIT 10; Functions CREATE OR REPLACE FUNCTION test_function(temp DOUBLE) RETURNS DOUBLE RETURN (col1 - 10); CREATE OR REPLACE FUNCTION add_numbers(a INT, b INT) RETURNS INT RETURN a + b; Useful dbutils Functions File System Operations # List files dbutils.fs.ls("dbfs:/path") dbutils.fs.ls("/Volumes/catalog/schema/volume") # Get file info dbutils.fs.getStatus("dbfs:/path/file.txt") # Move/Rename dbutils.fs.mv("dbfs:/old/path", "dbfs:/new/path") # Remove files dbutils.fs.rm("dbfs:/path", recurse=True) # Create directory dbutils.fs.mkdirs("dbfs:/new/directory") # Copy files dbutils.fs.cp("dbfs:/source", "dbfs:/dest", recurse=True) # Head (preview file) dbutils.fs.head("dbfs:/path/file.txt", 1000) Notebook Context # Get notebook path dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() # Get current user dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get() # Exit notebook dbutils.notebook.exit("Exit message") # Run another notebook dbutils.notebook.run("./other_notebook", timeout_seconds=3600, arguments={"param1": "value1"}) Auto Loader %python spark.readStream.format("cloudFiles")\ .option("cloudFiles.format", "json")\ .option("cloudFiles.schemaLocation", "/autoloader-schema")\ .option("pathGlobFilter", "test*.json")\ .load("/repo/data")\ .writeStream\ .option("mergeSchema", "true")\ .option("checkpointLocation", "/autoloader-checkpoint")\ .start("demo") %fs head /autoloader-schema/_schemas/0 CREATE OR REFRESH STREAMING TABLE test AS SELECT * FROM cloud_files( '/repo/data', 'json', map("cloudFiles.inferColumnTypes", "true", "pathGlobFilter", "test*.json") ); CONSTRAINT positive_timestamp EXPECT (creation_time > 0) CONSTRAINT positive_timestamp EXPECT (creation_time > 0) ON VIOLATION DROP ROW CONSTRAINT positive_timestamp EXPECT (creation_time > 0) ON VIOLATION FAIL UPDATE CDC Statements APPLY CHANGES INTO live.target FROM stream(live.cdc_source) KEYS (col1) APPLY AS DELETE WHEN col2 = "DELETE" SEQUENCE BY col3 COLUMNS * EXCEPT (col); Security Statements GRANT <privilege> ON <object_type> <object_name> TO <user_or_group>; GRANT SELECT ON TABLE test TO `databricks@degols.net`; REVOKE <privilege> ON <object_type> <object_name> FROM `test@gmail.com`; -- UC Specific GRANT USAGE ON CATALOG my_catalog TO `user@company.com`; GRANT CREATE ON SCHEMA my_catalog.my_schema TO `team@company.com`; GRANT READ_VOLUME ON VOLUME my_catalog.my_schema.my_volume TO `user@company.com`; GRANT WRITE_VOLUME ON VOLUME my_catalog.my_schema.my_volume TO `user@company.com`; Jobs and Workflows # List running jobs %jobs # Submit job via API from databricks.sdk import WorkspaceClient w = WorkspaceClient() job = w.jobs.create( name="my_job", tasks=[{ "task_key": "task1", "notebook_task": {"notebook_path": "/Users/me/notebook"}, "new_cluster": {"spark_version": "14.3.x-scala2.12", "num_workers": 2, "node_type_id": "i3.xlarge"} }] ) Links Official Databricks Documentation
...