分库分表之第五篇
分库分表之第五篇
电商平台商品列表展示,每个列表项中除了包含商品基本信息、商品描述信息之外,还包括了商品所属的店铺信息,如下 :
本案例实现功能如下:
1、添加商品
2、商品分页查询
3、商品统计
数据库设计如下,其中商品与店铺信息之间进行了垂直分库,分为了product_db(商品库)和store_db(店铺库);商品信息还进行了垂直分表,分为了商品基本信息(product_info)和商品描述信息(product_descript),地理区域信息(region)作为公共表,冗余在两库中 :
考虑到商品信息的数据增长性,对product_db(商品库)进行了水平分库,分片键使用店铺id,分片策略为店铺 id%2 + 1,因此商品描述信息对所属店铺id进行了冗余;
对商品基本信息(product_info)和商品描述信息(product_descript)进行水平分表,分片键使用商品id,分片策略为 商品id%2 + 1,并将为这两个表设置为绑定表,避免笛卡尔积join;
为避免主键冲突,id生成策略采用雪花算法来生成全局唯一id,最终数据库设计为下图:
要求使用读写分离来提升性能,可用性。
- 操作系统 :win10
- 数据库 :mysql-5.7.25
- jdk :64位 jdk1.8.0_201
- 应用框架 :spring-bbot-2.1.3.release,mybatis3.5.0
- sharding-jdbc :sharding-jdbc-spring-boot-starter-4.0.0-rc1
参考读写分离章节,对以下库进行主从同步配置 :
# 设置需要同步的数据库 binlog‐do‐db=store_db binlog‐do‐db=product_db_1 binlog‐do‐db=product_db_2
创建store_db数据库,并执行以下脚本创建表 :
drop table if exists `region`; create table `region` ( `id` bigint(20) not null comment 'id', `region_code` varchar(50) character set utf8 collate utf8_general_ci null default null comment '地理区域编码', `region_name` varchar(100) character set utf8 collate utf8_general_ci null default null comment '地理区域名称', `level` tinyint(1) null default null comment '地理区域级别(省、市、县)', `parent_region_code` varchar(50) character set utf8 collate utf8_general_ci null default null comment '上级地理区域编码', primary key (`id`) using btree ) engine = innodb character set = utf8 collate = utf8_general_ci row_format = dynamic; insert into `region` values (1, '110000', '北京', 0, null); insert into `region` values (2, '410000', '河南省', 0, null); insert into `region` values (3, '110100', '北京市', 1, '110000'); insert into `region` values (4, '410100', '郑州市', 1, '410000'); drop table if exists `store_info`; create table `store_info` ( `id` bigint(20) not null comment 'id', `store_name` varchar(100) character set utf8 collate utf8_general_ci null default null comment '店铺名称', `reputation` int(11) null default null comment '信誉等级', `region_code` varchar(50) character set utf8 collate utf8_general_ci null default null comment '店铺所在地', primary key (`id`) using btree ) engine = innodb character set = utf8 collate = utf8_general_ci row_format = dynamic; insert into `store_info` values (1, 'xx零食店', 4, '110100'); insert into `store_info` values (2, 'xx饮品店', 3, '410100');
创建product_db_1、product_db_2数据库,并分别对两库执行以下脚本创建表:
drop table if exists `product_descript_1`; create table `product_descript_1` ( `id` bigint(20) not null comment 'id', `product_info_id` bigint(20) null default null comment '所属商品id', `descript` longtext character set utf8 collate utf8_general_ci null comment '商品描述',`store_info_id` bigint(20) null default null comment '所属店铺id', primary key (`id`) using btree, index `fk_reference_2`(`product_info_id`) using btree ) engine = innodb character set = utf8 collate = utf8_general_ci row_format = dynamic; drop table if exists `product_descript_2`; create table `product_descript_2` ( `id` bigint(20) not null comment 'id', `product_info_id` bigint(20) null default null comment '所属商品id', `descript` longtext character set utf8 collate utf8_general_ci null comment '商品描述', `store_info_id` bigint(20) null default null comment '所属店铺id', primary key (`id`) using btree, index `fk_reference_2`(`product_info_id`) using btree ) engine = innodb character set = utf8 collate = utf8_general_ci row_format = dynamic; drop table if exists `product_info_1`; create table `product_info_1` ( `product_info_id` bigint(20) not null comment 'id', `store_info_id` bigint(20) null default null comment '所属店铺id', `product_name` varchar(100) character set utf8 collate utf8_general_ci null default null comment '商品名称', `spec` varchar(50) character set utf8 collate utf8_general_ci null default null comment '规 格', `region_code` varchar(50) character set utf8 collate utf8_general_ci null default null comment '产地', `price` decimal(10, 0) null default null comment '商品价格', `image_url` varchar(100) character set utf8 collate utf8_general_ci null default null comment '商品图片', primary key (`product_info_id`) using btree, index `fk_reference_1`(`store_info_id`) using btree ) engine = innodb character set = utf8 collate = utf8_general_ci row_format = dynamic; drop table if exists `product_info_2`; create table `product_info_2` ( `product_info_id` bigint(20) not null comment 'id', `store_info_id` bigint(20) null default null comment '所属店铺id', `product_name` varchar(100) character set utf8 collate utf8_general_ci null default null comment '商品名称', `spec` varchar(50) character set utf8 collate utf8_general_ci null default null comment '规 格', `region_code` varchar(50) character set utf8 collate utf8_general_ci null default null comment '产地', `price` decimal(10, 0) null default null comment '商品价格', `image_url` varchar(100) character set utf8 collate utf8_general_ci null default null comment '商品图片', primary key (`product_info_id`) using btree, index `fk_reference_1`(`store_info_id`) using btree ) engine = innodb character set = utf8 collate = utf8_general_ci row_format = dynamic; drop table if exists `region`; create table `region` ( `id` bigint(20) not null comment 'id', `region_code` varchar(50) character set utf8 collate utf8_general_ci null default null comment '地理区域编码', `region_name` varchar(100) character set utf8 collate utf8_general_ci null default null comment '地理区域名称', `level` tinyint(1) null default null comment '地理区域级别(省、市、县)', `parent_region_code` varchar(50) character set utf8 collate utf8_general_ci null default null comment '上级地理区域编码', primary key (`id`) using btree ) engine = innodb character set = utf8 collate = utf8_general_ci row_format = dynamic; insert into `region` values (1, '110000', '北京', 0, null); insert into `region` values (2, '410000', '河南省', 0, null); insert into `region` values (3, '110100', '北京市', 1, '110000'); insert into `region` values (4, '410100', '郑州市', 1, '410000');
(1)搭建工程maven工程shopping,并做好spring boot相关配置。
(2)引入maven依赖
<dependency> <groupid>org.apache.shardingsphere</groupid> <artifactid>sharding‐jdbc‐spring‐boot‐starter</artifactid> <version>4.0.0‐rc1</version> </dependency>
既然是分库分表,那么就需要定义多个真实数据源,每一个数据库链接信息就是一个数据源定义,如 :
spring.shardingsphere.datasource.m0.type = com.alibaba.druid.pool.druiddatasource spring.shardingsphere.datasource.m0.driver‐class‐name = com.mysql.jdbc.driver spring.shardingsphere.datasource.m0.url = jdbc:mysql://localhost:3306/store_db?useunicode=true spring.shardingsphere.datasource.m0.username = root spring.shardingsphere.datasource.m0.password = root
m0,就是这个真实数据源的名称,然后需要告诉sharding-jdbc,咋们有那些真实数据源,如 :
spring.shardingsphere.datasource.names = m0,m1,m2,s0,s1,s2
如果需要配置读写分离,还需要告诉sharding-jdbc,这么多真实数据源,那么有几个是一套读写分离?也就是定义主从逻辑数据源 :
spring.shardingsphere.sharding.master‐slave‐rules.ds0.master‐data‐source‐name=m0 spring.shardingsphere.sharding.master‐slave‐rules.ds0.slave‐data‐source‐names=s0
若我们已经对m0和s0做了mysql主从同步,那我们需要告诉sharding-jdbc,m0、s0为一组主从同步数据源,其 中m0为主,s0为从,并且定义名称为ds0,这个ds0就是主从逻辑数据源。
最终配置如下,具体的分库分表策略参考注释内容:
# 真实数据源定义 m为主库 s为从库 spring.shardingsphere.datasource.names = m0,m1,m2,s0,s1,s2 spring.shardingsphere.datasource.m0.type = com.alibaba.druid.pool.druiddatasource spring.shardingsphere.datasource.m0.driver‐class‐name = com.mysql.jdbc.driver spring.shardingsphere.datasource.m0.url = jdbc:mysql://localhost:3306/store_db?useunicode=true spring.shardingsphere.datasource.m0.username = root spring.shardingsphere.datasource.m0.password = root spring.shardingsphere.datasource.m1.type = com.alibaba.druid.pool.druiddatasource spring.shardingsphere.datasource.m1.driver‐class‐name = com.mysql.jdbc.driver spring.shardingsphere.datasource.m1.url = jdbc:mysql://localhost:3306/product_db_1? useunicode=true spring.shardingsphere.datasource.m1.username = root spring.shardingsphere.datasource.m1.password = root spring.shardingsphere.datasource.m2.type = com.alibaba.druid.pool.druiddatasource spring.shardingsphere.datasource.m2.driver‐class‐name = com.mysql.jdbc.driver spring.shardingsphere.datasource.m2.url = jdbc:mysql://localhost:3306/product_db_2? useunicode=true spring.shardingsphere.datasource.m2.username = root spring.shardingsphere.datasource.m2.password = root spring.shardingsphere.datasource.s0.type = com.alibaba.druid.pool.druiddatasource spring.shardingsphere.datasource.s0.driver‐class‐name = com.mysql.jdbc.driver spring.shardingsphere.datasource.s0.url = jdbc:mysql://localhost:3307/store_db?useunicode=true spring.shardingsphere.datasource.s0.username = root spring.shardingsphere.datasource.s0.password = root spring.shardingsphere.datasource.s1.type = com.alibaba.druid.pool.druiddatasource spring.shardingsphere.datasource.s1.driver‐class‐name = com.mysql.jdbc.driver spring.shardingsphere.datasource.s1.url = jdbc:mysql://localhost:3307/product_db_1? useunicode=true spring.shardingsphere.datasource.s1.username = root spring.shardingsphere.datasource.s1.password = root spring.shardingsphere.datasource.s2.type = com.alibaba.druid.pool.druiddatasource spring.shardingsphere.datasource.s2.driver‐class‐name = com.mysql.jdbc.driver spring.shardingsphere.datasource.s2.url = jdbc:mysql://localhost:3307/product_db_2? useunicode=true spring.shardingsphere.datasource.s2.username = root spring.shardingsphere.datasource.s2.password = root # 主库从库逻辑数据源定义 ds0为store_db ds1为product_db_1 ds2为product_db_2 spring.shardingsphere.sharding.master‐slave‐rules.ds0.master‐data‐source‐name=m0 spring.shardingsphere.sharding.master‐slave‐rules.ds0.slave‐data‐source‐names=s0 spring.shardingsphere.sharding.master‐slave‐rules.ds1.master‐data‐source‐name=m1 spring.shardingsphere.sharding.master‐slave‐rules.ds1.slave‐data‐source‐names=s1 spring.shardingsphere.sharding.master‐slave‐rules.ds2.master‐data‐source‐name=m2 spring.shardingsphere.sharding.master‐slave‐rules.ds2.slave‐data‐source‐names=s2 # 默认分库策略,以store_info_id为分片键,分片策略为store_info_id % 2 + 1,也就是store_info_id为双数的 数据进入ds1,为单数的进入ds2 spring.shardingsphere.sharding.default‐database‐strategy.inline.sharding‐column = store_info_id spring.shardingsphere.sharding.default‐database‐strategy.inline.algorithm‐expression = ds$‐> {store_info_id % 2 + 1} # store_info分表策略,固定分配至ds0的store_info真实表 spring.shardingsphere.sharding.tables.store_info.actual‐data‐nodes = ds$‐>{0}.store_info spring.shardingsphere.sharding.tables.store_info.table‐strategy.inline.sharding‐column = id spring.shardingsphere.sharding.tables.store_info.table‐strategy.inline.algorithm‐expression = store_info # product_info分表策略,分布在ds1,ds2的product_info_1 product_info_2表 ,分片策略为product_info_id % 2 + 1,product_info_id生成为雪花算法,为双数的数据进入product_info_1表,为单数的进入product_info_2 表 spring.shardingsphere.sharding.tables.product_info.actual‐data‐nodes = ds$‐> {1..2}.product_info_$‐>{1..2} spring.shardingsphere.sharding.tables.product_info.table‐strategy.inline.sharding‐column = product_info_id spring.shardingsphere.sharding.tables.product_info.table‐strategy.inline.algorithm‐expression = product_info_$‐>{product_info_id % 2 + 1} spring.shardingsphere.sharding.tables.product_info.key‐generator.column=product_info_id spring.shardingsphere.sharding.tables.product_info.key‐generator.type=snowflake # product_descript分表策略,分布在ds1,ds2的product_descript_1 product_descript_2表 ,分片策略为 product_info_id % 2 + 1,id生成为雪花算法,product_info_id为双数的数据进入product_descript_1表,为单 数的进入product_descript_2 spring.shardingsphere.sharding.tables.product_descript.actual‐data‐nodes = ds$‐> {1..2}.product_descript_$‐>{1..2} spring.shardingsphere.sharding.tables.product_descript.table‐strategy.inline.sharding‐column = product_info_id spring.shardingsphere.sharding.tables.product_descript.table‐strategy.inline.algorithm‐ expression = product_descript_$‐>{product_info_id % 2 + 1} spring.shardingsphere.sharding.tables.product_descript.key‐generator.column=id spring.shardingsphere.sharding.tables.product_descript.key‐generator.type=snowflake # 设置product_info,product_descript为绑定表 spring.shardingsphere.sharding.binding‐tables[0] = product_info,product_descript # 设置region为广播表(公共表),每次更新操作会发送至所有数据源 spring.shardingsphere.sharding.broadcast‐tables=region # 打开sql输出日志 spring.shardingsphere.props.sql.show = true
实体类 :
dao实现
@mapper @component public interface productdao { //添加商品基本信息 @insert("insert into product_info(store_info_id,product_name,spec,region_code,price) value(#{storeinfoid},#{productname},#{spec},#{regioncode},#{price})") @options(usegeneratedkeys = true,keyproperty = "productinfoid",keycolumn = "id") int insertproductinfo(productinfo productinfo); //添加商品描述信息 @insert("insert into product_descript(product_info_id,descript,store_info_id) value(# {productinfoid},#{descript},#{storeinfoid})") @options(usegeneratedkeys = true,keyproperty = "id",keycolumn = "id") int insertproductdescript(productdescript productdescript); }
service实现,针对垂直分库的两个库,分别实现店铺服务、商品服务
@service public class productserviceimpl implements productservice { @autowired private productdao productdao; @override @transactional public void createproduct(productinfo product) { productdescript productdescript = new productdescript(); productdescript.setdescript(product.getdescript()); productdao.insertproductinfo(product);//新增商品基本信息 productdescript.setproductinfoid(product.getproductinfoid()); productdescript.setstoreinfoid(product.getstoreinfoid()); //冗余店铺信息 productdao.insertproductdescript(productdescript);//新增商品描述信息 } }
controller实现:
/** * 卖家商品展示 */ @restcontroller public class sellercontroller { @autowired private productservice productservice; @postmapping("/products") public string createproject(@requestbody productinfo productinfo) { productservice.createproduct(productinfo); return "创建成功!"; }
单元测试:
@runwith(springrunner.class) @springboottest(classes = shardingjdbcdemobootstrap.class) public class shardingtest { @autowired productservice productservice; @test public void testcreateproduct(){ for(long i=1;i<10;i++){ //store_info_id,product_name,spec,region_code,price,image_url productinfo productinfo = new productinfo(); productinfo.setproductname("java编程思想"+i); productinfo.setdescript("java编程思想是一本非常好的java教程"+i); productinfo.setregioncode("110000"); productinfo.setstoreinfoid(1); productinfo.setprice(new bigdecimal(i)); productservice.createproduct(productinfo); } }
这是使用了sharding-jdbc所提供的全局主键生成方式之一雪花算法,来生成全局业务唯一主键。通过添加商品接口新增商品进行分库验证,store_info_id为偶数的数据在product_db_1,为奇数的数据在product_db_2。
通过添加商品接口新增商品进行分表验证,product_id为偶数的数据在product_info_1、product_descript_1,为奇数的数据在product_info_2、produt_descript_2。
dao实现 :
在productdao中定义商品查询方法 :
@select("select i.*, d.descript, r.region_name placeoforigin " + "from product_info i join product_descript d on i.id = d.product_info_id " + "join region r on r.region_code = i.region_code order by i.id desc limit #{start},# {pagesize}") list<productinfo> selectproductlist(@param("start")int start,@param("pagesize") int pagesize);
service实现 :
在productserviceimpl定义商品查询方法 :
@override public list<productinfo> queryproduct(int page,int pagesize) { int start = (page‐1)*pagesize; return productdao.selectproductlist(start,pagesize); }
controller实现 :
@getmapping(value = "/products/{page}/{pagesize}") public list<productinfo> queryproduct(@pathvariable("page")int page,@pathvariable("pagesize")int pagesize){ return productservice.queryproduct(page,pagesize); }
单元测试 :
@test public void testselectproductlist(){ list<productinfo> productinfos = productservice.queryproduct(1,10); system.out.println(productinfos); }
通过查询商品列表接口,能够查询到所有分片的商品信息,关联的地理区域,店铺信息正确。
总结 :
分页查询是业务中最常见的场景,sharding-jdbc支持常用关系数据库的分页查询,不过sharding-jdbc的分页功能比较容易让使用者误解,用户通常认为分页归并会占用大量内存。在分布式的场景中,将limit 10000000,10改写为limit 0,10000000,才能保证其数据的正确性。用户非常容易产生shardingsphere会将大量无意义的数据加载至内存中,造成内存溢出风险的错觉。其实大部分情况都通过流式归并获取数据结果集,因此sharding-sphere会通过结果集的next方法将无需取出的数据全部跳过,并不会将其存入内存。
但同时需要注意的是,由于排序的需要,大量的数据仍然需要传输到sharding-jdbc的内存空间。因此,采用limit这种方式分页,并非最佳实践。由于limit并不能通过索引查询数据,因此如果可以保证id的连续性,通过id进行分页是比较好的解决方案,例如 :
select * from t_order where id > 100000 and id <= 100010 order by id;
或通过记录上次查询结果的最后一条记录的id进行下一页的查询,例如 :
select * from t_order where id > 10000000 limit 10;
排序功能是由sharding-jdbc的排序归并来完成,由于在sql中存在order by语句,因此每个数据结果集自身是有序的,因此只需要将数据结果集当前游标指向的数据值进行排序即可。这相当于多个有序的数组进行排序,归并排序是最适合此场景的排序算法。
本小节实现商品总数统计,商品分组统计
dao实现,在productdao中定义 :
//总数统计 @select("select count(1) from product_info") int selectcount(); //分组统计 @select("select count(1) as num from product_info group by region_code having num>1 order by region_code asc") list<map> selectproductgrouplist();
单元测试 :
@test public void testselectcount(){ int i = productdao.selectcount(); system.out.println(i); } @test public void testselectgrouplist(){ list<map> maps = productdao.selectproductgrouplist(); system.out.println(maps); }
总结 :
分组统计
分组统计也是业务中常见的场景,分组功能的实现由sharding-jdbc分组归并完成。分组归并的情况最为复杂,它分为流式分组归并和内存分组归并。流式分组归并要求sql的排序项与分组项的字段必须保存一致,否则只能通过内存归并才能保证其数据的正确性。
举例说明,假设根据科目分片,表结构中包含考生的姓名(为了简单起见,不考虑重名的情况)和分数。通过sql获取每位考生的总分,可通过如下sql :
select name, sum(score) from t_score group by name order by name;
在分组项与排序项完全一致的情况下,取得的数据是连续的,分组所需的数据全数存在于各个数据结果集的当前游标所指向的数据值,因此可以采用流式归并。如下图所示。
进行归并时,逻辑与排序归并类似。 下图展现了进行next调用的时候,流式分组归并是如何进行的。
通过图中我们可以看到,当进行第一次next调用时,排在队列首位的t_score_java将会被弹出队列,并且将分组值 同为“jetty”的其他结果集中的数据一同弹出队列。 在获取了所有的姓名为“jetty”的同学的分数之后,进行累加操 作,那么,在第一次next调用结束后,取出的结果集是“jetty”的分数总和。 与此同时,所有的数据结果集中的游标 都将下移至数据值“jetty”的下一个不同的数据值,并且根据数据结果集当前游标指向的值进行重排序。 因此,包含 名字顺着第二位的“john”的相关数据结果集则排在的队列的前列。
为什么分库分表?分库分表就是为了解决由于数据量过大而导致数据库性能降低的问题,将原来独立的数据库拆分成若干数据库组成 ,将数据大表拆分成若干数据表组成,使得单一数据库、单一数据表的数据量变小,从而达到提升数据库性能的目的。
分库分表方式:垂直分表、垂直分库、水平分库、水平分表
分库分表带来问题:由于数据分散在多个数据库,服务器导致了事务一致性问题、跨节点join问题、跨节点分页、 排序、函数,主键需要全局唯一,公共表。
sharding-jdbc基础概念:逻辑表,真实表,数据节点,绑定表,广播表,分片键,分片算法,分片策略,主键生 成策略
sharding-jdbc核心功能:数据分片,读写分离
sharding-jdbc执行流程: sql解析 => 查询优化 => sql路由 => sql改写 => sql执行 => 结果归并
最佳实践:
系统在设计之初就应该对业务数据的耦合松紧进行考量,从而进行垂直分库、垂直分表,使数据层架构清晰明了。
若非必要,无需进行水平切分,应先从缓存技术着手降低对数据库的访问压力。如果缓存使用过后,数据库访问量 还是非常大,可以考虑数据库读、写分离原则。若当前数据库压力依然大,且业务数据持续增长无法估量,最后可 考虑水平分库、分表,单表拆分数据控制在1000万以内。
附 sql支持说明
详细参考:https://shardingsphere.apache.org/document/current/cn/features/sharding/use-norms/sql/ 说明:以下为官方显示内容,具体是否适用以实际测试为准 。
支持的sql
sql | 必要条件 |
---|---|
select * from tbl_name | |
select * from tbl_name where (col1 = ? or col2 = ?) and col3 = ? | |
select * from tbl_name where col1 = ? order by col2 desc limit ? | |
select count(*), sum(col1), min(col1), max(col1), avg(col1) from tbl_name where col1 =? | |
select count(col1) from tbl_name where col2 = ? group by col1 order by col3 desc limit ?, ? | |
insert into tbl_name (col1, col2,…) values (?, ?, …) | |
insert into tbl_name values (?, ?,…) | |
insert into tbl_name (col1, col2, …) values (?, ?, …), (?, ?, …) | |
update tbl_name set col1 = ? where col2 = ? | |
delete from tbl_name where col1 = ? | |
create table tbl_name (col1 int, …) | |
alter table tbl_name add col1 varchar(10) | |
drop table tbl_name | |
truncate table tbl_name | |
create index idx_name on tbl_name | |
drop index idx_name on tbl_name | |
drop index idx_name | |
select distinct * from tbl_name where col1 = ? | |
select count(distinct col1) from tbl_name |
不支持的sql
sql | 不支持原因 |
---|---|
insert into tbl_name (col1, col2, …) values(1+2, ?, …) | values语句不支持运算 表达式 |
insert into tbl_name (col1, col2, …) select col1, col2, … from tbl_name where col3 = ? | insert … select |
select count(col1) as count_alias from tbl_name group by col1 having count_alias > ? | having |
select * from tbl_name1 union select * from tbl_name2 | union |
select * from tbl_name1 union all select * from tbl_name2 | union all |
select * from ds.tbl_name1 | 包含schema |
select sum(distinct col1), sum(col1) from tbl_name | 详见distinct支持情况详 细说明 |
distinct支持情况详细说明
支持的sql
select distinct * from tbl_name where col1 = ? select distinct col1 from tbl_name select distinct col1, col2, col3 from tbl_name select distinct col1 from tbl_name order by col1 select distinct col1 from tbl_name order by col2 select distinct(col1) from tbl_name select avg(distinct col1) from tbl_name select sum(distinct col1) from tbl_name select count(distinct col1) from tbl_name select count(distinct col1) from tbl_name group by col1 select count(distinct col1 + col2) from tbl_name select count(distinct col1), sum(distinct col1) from tbl_name select count(distinct col1), col1 from tbl_name group by col1 select col1, count(distinct col1) from tbl_name group by col1
不支持的sql
sql | 不支持原因 |
---|---|
select sum(distinct col1), sum(col1) from tbl_name | 同时使用普通聚合函数和distinct聚合函数 |