Spark SQL 表的命名方式是db_name.table_name,只有数据库名称和数据表名称。如果没有指定db_name而直接引用table_name,实际上是引用default 数据库下的表。在Spark SQL中,数据库只是指定表文件存储的路径,每个表都可以使用不同的文件格式来存储数据,从这个角度来看,可以把database看作是Databricks 表的上层目录,用于组织数据表及其文件。
在python语言环境中,可以使用 %sql 切换到SQL命令模式:
%sql
一,管理数据库
常用的数据库命令,切换当前的数据库、显示数据库列表、表列表、视图列表和列信息:
use db_name
show databases
show tables [in db_name]
show views [in db_name]
show columns in db_name.table_name
1,创建数据库
创建数据库,通过LOCATION 指定数据库文件存储的位置:
CREATE { DATABASE | SCHEMA } [ IF NOT EXISTS ] database_name
[ LOCATION database_directory ]
LOCATION database_directory:指定存储数据库文件系统的路径,如果底层的文件系统中不存在该路径,那么需要先创建该目录。如果未指定LOCATION参数,那么使用默认的数据仓库目录来创建数据库,默认的数据仓库目录是由静态配置参数spark.sql.warehouse.dir指定的。
2,查看数据库的描述
{ DESC | DESCRIBE } DATABASE [ EXTENDED ] db_name
extended 选项表示查看数据库的扩展属性。
3,删除数据库
DROP { DATABASE | SCHEMA } [ IF EXISTS ] dbname [ RESTRICT | CASCADE ]
IF EXISTS:该选项表示在数据库不存在时,DROP操作不会引发异常。
RESTRICT:该选项表示不能删除非空数据库,并在默认情况下启用。
CASCADE:该选项表示删除数据库中所有关联的表和函数。
二,创建数据表
表有两种作用域:全局和本地,全局表可以在所有的Cluster中引用,而本地表只能在本地的Cluster中引用,被称作临时视图。用户可以从DBFS中的文件或存储在任何受支持数据源中的数据来填充表。
在创建表时,需要指定存储表数据的文件格式,以及表数据文件存储的位置。
1,使用数据源创建表(标准的CREATE TABLE命令)
创建表的语法,注意:如果数据库中已存在同名的表,则会引发异常。
CREATE TABLE [ IF NOT EXISTS ] [db_name].table_name
[ ( col_name1 col_type1, ... ) ]
USING data_source
[ OPTIONS ( key1=val1, key2=val2, ... ) ]
[ PARTITIONED BY ( col_name1, col_name2, ... ) ]
[ CLUSTERED BY ( col_name3, col_name4, ... )
[ SORTED BY ( col_name [ ASC | DESC ], ... ) ]
INTO num_buckets BUCKETS ]
[ LOCATION path ]
[ AS select_statement ]
参数注释:
2,使用Delta Lake(增量Lake)创建表
用户可以使用标准的CREATE TABLE命令来创建存储在delta lake中的表,除了标准的创建delta table的命令之外,还可以使用以下的语法来创建delta表:
CREATE [OR REPLACE] TABLE table_identifier[(col_name1 col_type1 [NOT NULL], ...)]
USING DELTA
[LOCATION <path-to-delta-files>]
table_identifier 有两种格式:
LOCATION <path-to-delta-files> :如果指定的 LOCATION 已包含增量 lake 中存储的数据,Delta lake 会执行以下操作:
如果仅指定了表名称和位置,例如:
CREATE TABLE events
USING DELTA
LOCATION '/mnt/delta/events'
Hive 元存储中的表会自动继承现有数据的架构、分区和表属性,此功能可用于把数据“导入”到元存储(metastore)中。
如果你指定了任何配置(架构、分区或表属性),那么 Delta Lake 会验证指定的内容是否与现有数据的配置完全匹配。如果指定的配置与数据的配置并非完全匹配,则 Delta Lake 会引发一个描述差异的异常。
3,创建表的示例
--Use data source
CREATE TABLE student (id INT, name STRING, age INT) USING PARQUET;
--Use data from another table
CREATE TABLE student_copy USING PARQUET
AS SELECT * FROM student;
--Omit the USING clause, which uses the default data source (parquet by default)
CREATE TABLE student (id INT, name STRING, age INT);
--Create partitioned and bucketed table
CREATE TABLE student (id INT, name STRING, age INT)
USING PARQUET
PARTITIONED BY (age)
CLUSTERED BY (Id) INTO 4 buckets;
三,和数据源的交互
数据源表的作用类似于指向基础数据源的指针,例如,您可以使用JDBC数据源在Azure Databricks中创建表foo,该表指向MySQL中的表bar。当读写表foo时,实际上就是读写表bar。
通常,CREATE TABLE会创建一个“指针”,并且必须确保它指向的对象是存在的,一个例外是文件源,例如Parquet,JSON,如果您未指定LOCATION选项,那么Azure Databricks会创建一个默认表位置。
对于CREATE TABLE AS SELECT,Azure Databricks使用select查询的输出数据来覆盖(overwrite)底层的数据源,以确保创建的表包含与输入查询完全相同的数据。
四,向表插入数据
用户可以向表中插入数据,也可以向Spark支持的文件中插入数据。
1,向表中插入数据
使用INSERT INTO 命令向表中追加数据,不会影响表中的现有数据;使用INSERT OVERWRITE 命令,会覆盖表中的现有数据。
INSERT INTO [ TABLE ] table_identifier [ partition_spec ]
{ VALUES ( { value | NULL } [ , ... ] ) [ , ( ... ) ] | query }
INSERT OVERWRITE [ TABLE ] table_identifier [ partition_spec [ IF NOT EXISTS ] ]
{ VALUES ( { value | NULL } [ , ... ] ) [ , ( ... ) ] | query }
参数注释:
举个例子,创建表之后,通过VALUES子句向表中插入少量的值,也可以通过 SELECT 子句、TABLE和FROM向表中批量插入数据。
CREATE TABLE students (name VARCHAR(64), address VARCHAR(64), student_id INT)
USING PARQUET PARTITIONED BY (student_id);
-- VALUES
INSERT INTO students VALUES
('Bob Brown', '456 Taylor St, Cupertino', 222222),
('Cathy Johnson', '789 Race Ave, Palo Alto', 333333);
-- SELECT
INSERT INTO students PARTITION (student_id = 444444)
SELECT name, address FROM persons WHERE name = "Dora Williams";
-- TABLE
INSERT INTO students TABLE visiting_students;
-- FROM
INSERT INTO students
FROM applicants SELECT name, address, id applicants WHERE qualified = true;
2,向文件中插入数据
使用给定的Spark文件格式用新值覆盖目录中的现有数据,也就是说,向目录中插入数据时,只能用新数据覆盖现有的数据:
INSERT OVERWRITE [ LOCAL ] DIRECTORY [ directory_path ]
USING file_format [ OPTIONS ( key = val [ , ... ] ) ]
{ VALUES ( { value | NULL } [ , ... ] ) [ , ( ... ) ] | query }
参数注释:
示例,使用新数据覆盖目录中的数据:
INSERT OVERWRITE DIRECTORY '/tmp/destination'
USING parquet
OPTIONS (col1 1, col2 2, col3 'test')
SELECT * FROM test_table;
INSERT OVERWRITE DIRECTORY
USING parquet
OPTIONS ('path' '/tmp/destination', col1 1, col2 2, col3 'test')
SELECT * FROM test_table;