Ignite实战

乎语百科 111 0

1.概述

本篇博客将对Ignite的基础环境、集群快照、分布式计算、SQL查询与处理、机器学习等内容进行介绍。

2.内容

2.1 什么是Ignite?

在学习Ignite之前,我们先来了解一下什么是Ignite?首先,Ignite是Apache开源的顶级项目之一。Ignite 内存数组组织框架是一个高性能、集成和分布式的内存计算和事务平台,用于大规模的数据集处理,比传统的基于磁盘或闪存的技术具有更高的性能,同时他还为应用和不同的数据源之间提供高性能、分布式内存中数据组织管理的功能。

2.2 安装环境要求

Apache Ignite官方在如下环境中进行了测试:

  • JDK:Oracle JDK8、11或17,Open JDK8、11或17,IBM JDK8、11或17;
  • OS:Linux(任何版本),Mac OS X(10.6及以上),Windows(XP及以上),Windows Server(2008及以上),Oracle Solaris;
  • 网络:没有限制(建议10G甚至更快的网络带宽);
  • 架构:x86,x64,SPARC,PowerPC。

支持Docker、DEB/RPM、Kubernetes、虚拟机等安装模式。

2.3 Ignite启动和停止

节点的类型有两种:服务端节点和客户端节点。服务端节点参与缓存、计算的执行、流数据处理等。客户端节点提供远程接入服务端的能力,有完整的Ignite API支持,包括近缓存、事务、计算、流处理、服务等。所有的节点默认都以服务端模式启动,客户端模式需要显式指定。

1.启动服务端节点

执行如下所示命令:

ignite.sh path/to/configuration.xml

2.启动客户端节点

执行如下Java代码片段:

IgniteConfiguration cfg = new IgniteConfiguration();

// 开启客户端模式
cfg.setClientMode(true);

// 启动客户端
Ignite ignite = Ignition.start(cfg);

3.停止服务节点

强制停止某个节点时,可能会导致数据丢失或数据不一致,甚至会使节点无法重启。当节点没有响应且无法正常关闭时,应将强制停止作为最后的手段。正常停止可以使节点完成关键操作并正确完成其生命周期,执行正常停止的正确过程如下:

  • 使用以下方法之一停止节点

    • 以编程方式调用Ignite.close();
    • 以编程方式调用System.exit();
    • 发送用户中断信号。Ignite使用JVM关闭钩子在JVM停止之前执行自定义逻辑。如果通过运行ignite.sh来启动节点并且不将其与终端分离,则可以通过按下Ctrl+C来停止节点。
  • 从基线拓扑中删除该节点。如果启用了基线自动调整,则可以不执行此步骤。

从基准拓扑中删除节点将在其余节点上开始再平衡过程。如果计划在停止后立即重启该节点,则不必进行再平衡。在这种情况下,请勿从基准拓扑中删除该节点。

2.4 集群快照

Ignite 提供了使用Ignite Persistence为部署创建完整集群快照的能力 。Ignite 快照包括持久在磁盘上的所有数据记录的一致的集群范围副本以及恢复过程所需的一些其他文件。快照结构类似于 Ignite Persistence 存储目录的布局,但有几个例外。让我们以这个快照为例来回顾一下结构:

work
└── snapshots
    └── backup23012020
        └── db
            ├── binary_meta
            │         ├── node1
            │         ├── node2
            │         └── node3
            ├── marshaller
            │         ├── node1
            │         ├── node2
            │         └── node3
            ├── node1
            │    └── my-sample-cache
            │        ├── cache_data.dat
            │        ├── part-3.bin
            │        ├── part-4.bin
            │        └── part-6.bin
            ├── node2
            │    └── my-sample-cache
            │        ├── cache_data.dat
            │        ├── part-1.bin
            │        ├── part-5.bin
            │        └── part-7.bin
            └── node3
                └── my-sample-cache
                    ├── cache_data.dat
                    ├── part-0.bin
                    └── part-2.bin
  • 快照位于该目录下,并work\snapshots命名为Ignite 的工作目录。backup23012020work
  • 快照是为 3 节点集群创建的,所有节点都在同一台机器上运行。在此示例中,节点被命名为node1、node2和node3,而在实践中,名称等于节点的 一致 ID。
  • my-sample-cache快照保留缓存的副本。
  • 该文件夹将数据记录的db副本保存在文件中。只要当前还原过程不需要预写和检查点,就不会将其添加到快照中。part-N.bincache_data.dat
  • binary_meta和目录存储元数据和特定于marshaller编组器的信息。
注意:通常快照分布在整个集群中
前面的示例显示了为在同一台物理机上运行的集群创建的快照。因此,整个快照位于一个位置。在实践中,所有节点都将运行在不同的机器上,快照数据分布在集群中。每个节点保存一段快照,其中包含属于该特定节点的数据。恢复过程解释了如何在恢复过程中将所有段连接在一起。

2.4.1 配置

1.快照目录

默认情况下,快照的一部分存储在各个 Ignite 节点的工作目录中,并使用 Ignite Persistence 保存数据、索引、WAL 和其他文件的相同存储介质。由于快照可以消耗与持久性文件已经占用的空间一样多的空间,并且可以通过与 Ignite Persistence 例程共享磁盘 I/O 来影响应用程序的性能,因此建议将快照和持久性文件存储在不同的媒体上。

2.快照执行池

默认情况下,快照线程池大小的值为4。减少快照创建过程中涉及的线程数会增加拍摄快照的总时间。但是,这会将磁盘负载保持在合理的范围内。

2.4.2 创建快照

Ignite 提供了几个用于创建快照的 API。

1.使用控制脚本

Ignite 提供了支持以下列出的与快照相关的命令的控制脚本:

# Create a cluster snapshot named "snapshot_09062021" in the background:
control.(sh|bat) --snapshot create snapshot_09062021

# Create a cluster snapshot named "snapshot_09062021" and wait for the entire operation to complete:
control.(sh|bat) --snapshot create snapshot_09062021 --sync

# Create a cluster snapshot named "snapshot_09062021" in the "/tmp/ignite/snapshots" folder (the full path to the snapshot files will be /tmp/ignite/snapshots/snapshot_09062021):
control.(sh|bat) --snapshot create snapshot_09062021 -dest /tmp/ignite/snapshots

# Cancel a running snapshot named "snapshot_09062021":
control.(sh|bat) --snapshot cancel snapshot_09062021

# Kill a running snapshot named "snapshot_09062021":
control.(sh|bat) --kill SNAPSHOT snapshot_09062021

2.使用JMX

使用该SnapshotMXBean接口通过 JMX 执行特定于快照的过程:

方法 描述
createSnapshot(String snpName) 创建快照

cancelSnapshot(String snpName)

取消节点上的快照已启动其创建

3.使用Java API

此外,还可以在 Java 中以编程方式创建快照:

CacheConfiguration<Integer, String> ccfg = new CacheConfiguration<>("snapshot-cache");

try (IgniteCache<Integer, String> cache = ignite.getOrCreateCache(ccfg)) {
    cache.put(1, "Maxim");

    // Start snapshot operation.
    ignite.snapshot().createSnapshot("snapshot_02092020").get();
}
finally {
    ignite.destroyCache(ccfg.getName());
}

2.4.3 检查快照一致性

通常所有集群节点都运行在不同的机器上,并且快照数据分布在整个集群中。每个节点都存储自己的快照段,因此在某些情况下,可能需要在从快照恢复之前检查快照的数据完整性和整个集群的数据一致性。

对于这种情况,Apache Ignite 提供了内置的快照一致性检查命令,使您能够验证内部数据一致性,计算数据分区哈希和页面校验和,并在发现问题时打印结果。check 命令还将主分区的哈希值与相应的备份分区进行比较,并报告任何差异。

2.4.4 从快照恢复

快照可以在停止的集群上手动恢复,也可以在活动的集群上自动恢复。下面描述了这两个过程,但是,最好只使用控制脚本中的恢复命令。

1.手动快照恢复过程

快照结构类似于 Ignite Native Persistence 的布局,因此对于手动快照还原,您必须仅在具有相同节点的相同集群consistentId和拍摄快照的相同拓扑上执行快照还原。如果您需要在不同的集群或不同的集群拓扑上恢复快照,请使用 自动快照恢复过程。

一般来说,停止集群,然后用快照中的数据替换持久化数据和其他文件,然后重新启动节点。

详细过程如下所示:

  • 停止要恢复的集群
  • 从检查点$IGNITE_HOME/work/cp目录中删除所有文件
  • 在每个节点上执行以下操作:
    • {nodeId}从目录中删除与 相关的文件$IGNITE_HOME/work/db/binary_meta。
    • {nodeId}从目录中删除与 相关的文件$IGNITE_HOME/work/db/marshaller。
    • {nodeId}删除与您的目录下相关的文件和子目录$IGNITE_HOME/work/db。db/{node_id}如果目录不在 Ignite 目录下,请单独清理该目录work。
    • 将属于具有{node_id}快照的节点的文件复制到$IGNITE_HOME/work/目录中。如果该db/{node_id}目录不在 Ignitework目录下,那么您需要将数据文件复制到那里。
  • 重启集群

2.自动快照恢复过程

自动恢复过程允许用户使用 Java API 或命令行脚本从活动集群上的快照恢复缓存组。

目前,此过程有几个限制,将在未来的版本中解决:

  • 仅当快照的所有部分都存在于集群中时,才能进行恢复。每个节点通过给定的快照名称和一致的节点 ID 在配置的快照路径中查找本地快照数据。
  • 恢复过程只能应用于用户创建的缓存组。
  • 要从快照恢复的缓存组不得存在于集群中。如果它们存在,则用户必须在开始此操作之前将它们销毁。
  • 不允许并发还原操作。因此,如果一个操作已经开始,则只有在第一个操作完成后才能启动另一个操作。

以下代码片段演示了如何从快照恢复单个缓存组。

// Restore cache named "snapshot-cache" from the snapshot "snapshot_02092020".
ignite.snapshot().restoreSnapshot("snapshot_02092020", Collections.singleton("snapshot-cache")).get();

3.使用 CLI 控制还原操作

该control.sh|bat脚本提供了启动和停止恢复操作的能力。

# Start restoring all user-created cache groups from the snapshot "snapshot_09062021" in the background.
control.(sh|bat) --snapshot restore snapshot_09062021 --start

# Start restoring all user-created cache groups from the snapshot "snapshot_09062021" and wait for the entire operation to complete.
control.(sh|bat) --snapshot restore snapshot_09062021 --start --sync

# Start restoring all user-created cache groups from the snapshot "snapshot_09062021" located in the "/tmp/ignite/snapshots" folder (the full path to the snapshot files should be /tmp/ignite/snapshots/snapshot_09062021):
control.(sh|bat) --snapshot restore snapshot_09062021 --src /tmp/ignite/snapshots

# Start restoring only "cache-group1" and "cache-group2" from the snapshot "snapshot_09062021" in the background.
control.(sh|bat) --snapshot restore snapshot_09062021 --start --groups cache-group1,cache-group2

# Cancel the restore operation for "snapshot_09062021".
control.(sh|bat) --snapshot restore snapshot_09062021 --cancel

2.4.5 一致性保证

在集群范围内的并发操作以及与 Ignite 的持续更改方面,所有快照都是完全一致的。持久化数据、索引、模式、二进制元数据、编组器和节点上的其他文件。

集群范围的快照一致性是通过触发Partition-Map-Exchange 过程来实现的。通过这样做,集群最终将到达所有先前启动的事务都完成并暂停新事务的时间点。一旦发生这种情况,集群将启动快照创建过程。PME 过程确保快照包括处于一致状态的主备份和备份。

Ignite Persistence 文件与其快照副本之间的一致性是通过将原始文件复制到目标快照目录并跟踪所有并发正在进行的更改来实现的。跟踪更改可能需要 Ignite Persistence 存储介质上的额外空间(最多为存储介质的 1 倍大小)。

2.5 分布式计算

Ignite 提供了一个 API,用于以平衡和容错的方式在集群节点之间分配计算。您可以提交单个任务以供执行,也可以通过自动任务拆分来实现 MapReduce 模式。API 提供对作业分配策略的细粒度控制。

2.5.1 获取计算接口

运行分布式计算的主要入口点是计算接口,它可以从Ignite.

Ignite ignite = Ignition.start();

IgniteCompute compute = ignite.compute();

2.5.2 指定计算的节点集

计算接口的每个实例都与执行任务的一组节点相关联。不带参数调用时,ignite.compute()返回与所有服务器节点关联的计算接口。要获取特定节点子集的实例,请使用Ignite.compute(ClusterGroup group). 在以下示例中,计算接口仅绑定到远程节点,即除运行此代码的节点之外的所有节点。

Ignite ignite = Ignition.start();

IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());

2.5.3 执行任务

Ignite 提供了三个接口,可以实现代表一个任务并通过计算接口执行:

  • IgniteRunnable— 其扩展java.lang.Runnable可用于实现没有输入参数且不返回结果的计算。
  • IgniteCallablejava.util.concurrent.Callable—返回特定值的扩展。
  • IgniteClosure— 接受参数并返回值的功能接口。

您可以执行一次任务(在其中一个节点上)或将其广播到所有节点。

2.5.4 执行一个可运行的任务

要执行可运行的任务,请使用run(…​)计算接口的方法。任务被发送到与计算实例关联的节点之一。

IgniteCompute compute = ignite.compute();

// Iterate through all words and print
// each word on a different cluster node.
for (String word : "Print words on different cluster nodes".split(" ")) {
    compute.run(() -> System.out.println(word));
}

2.5.5 执行可调用任务

要执行可调用任务,请使用call(…​)计算接口的方法。

Collection<IgniteCallable<Integer>> calls = new ArrayList<>();

// Iterate through all words in the sentence and create callable jobs.
for (String word : "How many characters".split(" "))
    calls.add(word::length);

// Execute the collection of callables on the cluster.
Collection<Integer> res = ignite.compute().call(calls);

// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum();

2.5.6 执行IgniteClosure

要执行IgniteClosure,请使用apply(…​)计算接口的方法。该方法接受任务和任务的输入参数。IgniteClosure参数在执行时传递给给定的。

IgniteCompute compute = ignite.compute();

// Execute closure on all cluster nodes.
Collection<Integer> res = compute.apply(String::length, Arrays.asList("How many characters".split(" ")));

// Add all the word lengths received from cluster nodes.
int total = res.stream().mapToInt(Integer::intValue).sum();

2.5.7 广播任务

该方法在与计算实例关联的所有节点broadcast()上执行任务。

// Limit broadcast to remote nodes only.
IgniteCompute compute = ignite.compute(ignite.cluster().forRemotes());

// Print out hello message on remote nodes in the cluster group.
compute.broadcast(() -> System.out.println("Hello Node: " + ignite.cluster().localNode().id()));

2.5.8 异步执行

前几节中描述的所有方法都有异步对应物:

  • callAsync(…​)
  • runAsync(…​)
  • applyAsync(…​)
  • broadcastAsync(…​)

异步方法返回一个IgniteFuture表示操作结果的值。在以下示例中,异步执行一组可调用任务。

IgniteCompute compute = ignite.compute();

Collection<IgniteCallable<Integer>> calls = new ArrayList<>();

// Iterate through all words in the sentence and create callable jobs.
for (String word : "Count characters using a callable".split(" "))
    calls.add(word::length);

IgniteFuture<Collection<Integer>> future = compute.callAsync(calls);

future.listen(fut -> {
    // Total number of characters.
    int total = fut.get().stream().mapToInt(Integer::intValue).sum();

    System.out.println("Total number of characters: " + total);
});

2.5.9 执行超时任务

您可以设置任务执行的超时时间。如果任务没有在给定的时间范围内完成,它会被停止并取消该任务产生的所有作业。

要执行超时任务,请使用withTimeout(…​)计算接口的方法。该方法返回一个计算接口,该接口以时间限制的方式执行给它的第一个任务。后续任务没有超时:您需要调用withTimeout(…​)每个应该有超时的任务。

IgniteCompute compute = ignite.compute();

compute.withTimeout(300_000).run(() -> {
    // your computation
    // ...
});

2.5.10 在本地节点上的作业之间共享状态

在一个节点上执行的不同计算作业之间共享状态通常很有用。为此,每个节点上都有一个共享的并发本地映射。

IgniteCluster cluster = ignite.cluster();

ConcurrentMap<String, Integer> nodeLocalMap = cluster.nodeLocalMap();

节点局部值类似于线程局部变量,因为这些值不分布并且仅保留在本地节点上。节点本地数据可用于在计算作业之间共享状态。它也可以被部署的服务使用。

在以下示例中,作业每次在某个节点上执行时都会增加一个节点本地计数器。结果,每个节点上的节点本地计数器告诉我们作业在该节点上执行了多少次。

IgniteCallable<Long> job = new IgniteCallable<Long>() {
    @IgniteInstanceResource
    private Ignite ignite;

    @Override
    public Long call() {
        // Get a reference to node local.
        ConcurrentMap<String, AtomicLong> nodeLocalMap = ignite.cluster().nodeLocalMap();

        AtomicLong cntr = nodeLocalMap.get("counter");

        if (cntr == null) {
            AtomicLong old = nodeLocalMap.putIfAbsent("counter", cntr = new AtomicLong());

            if (old != null)
                cntr = old;
        }

        return cntr.incrementAndGet();
    }
};

2.5.11 从计算任务访问数据

如果您的计算任务需要访问存储在缓存中的数据,您可以通过以下实例来完成Ignite:

public class MyCallableTask implements IgniteCallable<Integer> {

    @IgniteInstanceResource
    private Ignite ignite;

    @Override
    public Integer call() throws Exception {

        IgniteCache<Long, Person> cache = ignite.cache("person");

        // Get the data you need
        Person person = cache.get(1L);

        // do with the data what you need to do

        return 1;
    }
}

请注意,上面显示的示例可能不是最有效的方法。原因是key对应的person对象1可能位于与执行任务的节点不同的节点上。在这种情况下,对象是通过网络获取的。这可以通过将任务与数据放在一起来避免。

注意:
如果要在IgniteCallable和IgniteRunnable任务中使用键和值对象,请确保键和值类部署在所有集群节点上。

2.6 SQL查询与处理

Ignite 带有符合 ANSI-99、水平可扩展和容错的分布式 SQL 数据库。根据用例,通过跨集群节点对数据进行分区或完全复制来提供分布。

作为 SQL 数据库,Ignite 支持所有 DML 命令,包括 SELECT、UPDATE、INSERT 和 DELETE 查询,并且还实现了与分布式系统相关的 DDL 命令子集。

您可以通过连接来自外部工具和应用程序的JDBC或ODBC驱动程序与 Ignite 进行交互,就像与任何其他启用了 SQL 的存储一样。Java、.NET 和 C++ 开发人员可以利用本机 SQL API。

在内部,SQL 表与键值缓存具有相同的数据结构。这意味着您可以更改数据的分区分布并利用亲和力托管技术来获得更好的性能。

Ignite 的默认 SQL 引擎使用 H2 数据库来解析和优化查询并生成执行计划,但也可以启用基于 Apache Calcite 的 SQL 引擎来执行查询。

2.6.1 分布式查询

针对分区表的查询以分布式方式执行:

  • 查询被解析并拆分为多个“map”查询和一个“reduce”查询。
  • 所有地图查询都在所需数据所在的所有节点上执行。
  • 所有节点都向查询发起者提供本地执行的结果集,查询发起者反过来会将提供的结果集合并到最终结果中。

您可以强制在本地处理查询,即在存储在执行查询的节点上的数据子集上。

2.6.2 本地查询

如果对复制表执行查询,它将针对本地数据运行。

对分区表的查询以分布式方式执行。但是,您可以强制对分区表执行本地查询。

2.6.3 SQL架构

Ignite 有许多默认模式并支持创建自定义模式。

默认情况下有两种可用的模式:

  • SYS 模式,其中包含许多带有集群节点信息的系统视图。您不能在此架构中创建表。有关详细信息,请参阅系统视图页面。
  • PUBLIC 架构,在未指定架构时默认使用。

在以下情况下会创建自定义模式:

  • 您可以在集群配置中指定自定义模式。
  • Ignite 为通过其中一个编程接口或 XML 配置创建的每个缓存创建一个模式

1.公共模式

每当需要并且未指定模式时,默认使用 PUBLIC 模式。例如,当您通过 JDBC 连接到集群而不显式设置模式时,您将连接到 PUBLIC 模式。

2.自定义模式

可以通过 的sqlSchemas属性设置自定义模式IgniteConfiguration。您可以在启动集群之前在配置中指定模式列表,然后在运行时在这些模式中创建对象。

下面是一个带有两个自定义模式的配置示例。

IgniteConfiguration cfg = new IgniteConfiguration();

SqlConfiguration sqlCfg = new SqlConfiguration();

sqlCfg.setSqlSchemas("MY_SCHEMA", "MY_SECOND_SCHEMA" );

cfg.setSqlConfiguration(sqlCfg);

要通过例如 JDBC 驱动程序连接到特定模式,请在连接字符串中提供模式名称:

jdbc:ignite:thin://127.0.0.1/MY_SCHEMA

3.缓存和架构名称

当您使用可查询字段创建缓存时,您可以使用SQL API操作缓存的数据。在 SQL 术语中,每个这样的缓存对应于一个单独的模式,其名称等于缓存的名称。

同样,当您通过 DDL 语句创建表时,您可以通过 Ignite 支持的编程接口将其作为键值缓存进行访问。可以通过在语句部分提供CACHE_NAME参数来指定相应缓存的名称。WITHCREATE TABLE

CREATE TABLE City (
  ID INT(11),
  Name CHAR(35),
  CountryCode CHAR(3),
  District CHAR(20),
  Population INT(11),
  PRIMARY KEY (ID, CountryCode)
) WITH "backups=1, CACHE_NAME=City";

2.6.4 SQL索引

Ignite 自动为每个主键和亲和键字段创建索引。当您在值对象中的字段上定义索引时,Ignite 会创建一个由索引字段和缓存的主键组成的复合索引。在 SQL 术语中,这意味着索引将由两列组成:要索引的列和主键列。

1.使用注解配置索引

@QuerySqlField可以通过注释从代码中配置索引以及可查询字段。在下面的示例中,Ignite SQL 引擎将为id和salary字段创建索引。

public class Person implements Serializable {
    /** Indexed field. Will be visible to the SQL engine. */
    @QuerySqlField(index = true)
    private long id;

    /** Queryable field. Will be visible to the SQL engine. */
    @QuerySqlField
    private String name;

    /** Will NOT be visible to the SQL engine. */
    private int age;

    /**
     * Indexed field sorted in descending order. Will be visible to the SQL engine.
     */
    @QuerySqlField(index = true, descending = true)
    private float salary;
}

类型名用作 SQL 查询中的表名。在这种情况下,我们的表名将是Person(模式名称的使用和定义在模式部分中解释)。

id和都是salary索引字段。id将按升序(默认)和salary降序排序。

如果你不想索引一个字段,但你仍然需要在 SQL 查询中使用它,那么该字段必须在没有index = true参数的情况下进行注释。这样的字段称为可查询字段。在上面的示例中,name被定义为可查询字段。

该age字段既不可查询也不是索引字段,因此无法从 SQL 查询中访问。

定义索引字段时,需要注册索引类型。

2.索引嵌套对象

嵌套对象的字段也可以使用注释进行索引和查询。例如,考虑一个Person将Address对象作为字段的对象:

public class Person {
    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField(index = true)
    private long id;

    /** Queryable field. Will be visible for SQL engine. */
    @QuerySqlField
    private String name;

    /** Will NOT be visible for SQL engine. */
    private int age;

    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField(index = true)
    private Address address;
}

类的结构Address可能如下所示:

public class Address {
    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField (index = true)
    private String street;

    /** Indexed field. Will be visible for SQL engine. */
    @QuerySqlField(index = true)
    private int zip;
}

在上面的示例中,@QuerySqlField(index = true)在类的所有字段以及Address类中的Address对象上都指定了注解Person。

这使得执行如下 SQL 查询成为可能:

QueryCursor<List<?>> cursor = personCache.query(new SqlFieldsQuery( "select * from Person where street = 'street1'"));

请注意,您不需要address.street在 SQL 查询的 WHERE 子句中指定。这是因为Address类的字段在表中被展平,Person这仅允许我们直接访问Address查询中的字段。

3.注册索引类型

定义索引和可查询字段后,必须在 SQL 引擎中注册它们以及它们所属的对象类型。

要指定应该索引哪些类型,请在方法中传递相应的键值对,CacheConfiguration.setIndexedTypes()如下例所示。

// Preparing configuration.
CacheConfiguration<Long, Person> ccfg = new CacheConfiguration<>();

// Registering indexed type.
ccfg.setIndexedTypes(Long.class, Person.class);

此方法只接受成对的类型:一个用于键类,另一个用于值类。基元作为盒装类型传递。

4.组索引

要设置可以加速复杂条件查询的多字段索引,可以使用@QuerySqlField.Group注解。如果您希望一个字段成为多个组的一部分,您可以添加多个@QuerySqlField.Group注释。orderedGroups

例如,在Person下面的类中,我们有一个age属于索引组的字段,该age_salary_idx组以“0”的组顺序和降序排序。此外,在同一个组中,我们有salary一个组顺序为“3”和升序排序的字段。此外,该字段salary本身是一个单列索引(index = true除了orderedGroups声明之外还指定了参数)。组order不必是特定的数字。只需要对特定组内的字段进行排序。

public class Person implements Serializable {
    /** Indexed in a group index with "salary". */
    @QuerySqlField(orderedGroups = { @QuerySqlField.Group(name = "age_salary_idx", order = 0, descending = true) })

    private int age;

    /** Indexed separately and in a group index with "age". */
    @QuerySqlField(index = true, orderedGroups = { @QuerySqlField.Group(name = "age_salary_idx", order = 3) })
    private double salary;
}

5.使用查询实体配置索引

索引和可查询字段也可以通过org.apache.ignite.cache.QueryEntity便于基于 Spring XML 的配置的类进行配置。

作为上面基于注释的配置的一部分讨论的所有概念也适用于QueryEntity基于方法。此外,其字段配置了@QuerySqlField注解并注册到CacheConfiguration.setIndexedTypes()方法的类型在内部转换为查询实体。

下面的示例展示了如何定义单个字段索引、组索引和可查询字段。

CacheConfiguration<Long, Person> cache = new CacheConfiguration<Long, Person>("myCache");

QueryEntity queryEntity = new QueryEntity();

queryEntity.setKeyFieldName("id").setKeyType(Long.class.getName()).setValueType(Person.class.getName());

LinkedHashMap<String, String> fields = new LinkedHashMap<>();
fields.put("id", "java.lang.Long");
fields.put("name", "java.lang.String");
fields.put("salary", "java.lang.Long");

queryEntity.setFields(fields);

queryEntity.setIndexes(Arrays.asList(new QueryIndex("name"),
        new QueryIndex(Arrays.asList("id", "salary"), QueryIndexType.SORTED)));

cache.setQueryEntities(Arrays.asList(queryEntity));

在这种情况下,我们的表名将是Person(模式名称的使用和定义在Schemas页面上进行了解释)。

定义好之后,就QueryEntity可以按如下方式执行 SQL 查询:

SqlFieldsQuery qry = new SqlFieldsQuery("SELECT id, name FROM Person" + "WHERE id > 1500 LIMIT 10");

6.配置索引内联大小

适当的索引内联大小可以帮助加快对索引字段的查询。有关如何选择合适的内联大小的信息,请参阅SQL 调优指南中的专门部分。

在大多数情况下,您只需为可变长度字段(例如字符串或数组)上的索引设置内联大小。默认值为 10。

您可以通过设置来更改默认值

  • 每个索引单独的内联大小,或
  • CacheConfiguration.sqlIndexMaxInlineSize给定缓存中所有索引的属性,或
  • IGNITE_MAX_INDEX_PAYLOAD_SIZE集群中所有索引的系统属性

设置按上面列出的顺序应用。

您还可以单独为每个索引配置内联大小,这将覆盖默认值。要为用户定义的索引设置索引内联大小,请使用以下方法之一。在所有情况下,该值都以字节为单位。

  • 使用注解时:
@QuerySqlField(index = true, inlineSize = 13)
private String country;
  • 使用时QueryEntity:
QueryIndex idx = new QueryIndex("country");
idx.setInlineSize(13);
queryEntity.setIndexes(Arrays.asList(idx));
  • 如果您使用该CREATE INDEX命令创建索引,则可以使用该INLINE_SIZE选项设置内联大小:
create index country_idx on Person (country) INLINE_SIZE 13;

7.自定义键

如果您只对主键使用预定义的 SQL 数据类型,那么您不需要对 SQL 模式配置执行额外的操作。这些数据类型由GridQueryProcessor.SQL_TYPES常量定义,如下所示。

预定义的 SQL 数据类型包括:

  • 所有原语及其包装器,除了char和Character
  • String
  • BigDecimal
  • byte[]
  • java.util.Date, java.sql.Date,java.sql.Timestamp
  • java.util.UUID

但是,一旦您决定引入自定义复杂键并从 DML 语句中引用其字段,您需要:

  • QueryEntity以与为值对象设置字段相同的方式定义这些字段。
  • 使用新的配置参数QueryEntity.setKeyFields(..)来区分键字段和值字段。

下面的示例显示了如何执行此操作。

// Preparing cache configuration.
CacheConfiguration<Long, Person> cacheCfg = new CacheConfiguration<Long, Person>("personCache");

// Creating the query entity.
QueryEntity entity = new QueryEntity("CustomKey", "Person");

// Listing all the queryable fields.
LinkedHashMap<String, String> fields = new LinkedHashMap<>();

fields.put("intKeyField", Integer.class.getName());
fields.put("strKeyField", String.class.getName());

fields.put("firstName", String.class.getName());
fields.put("lastName", String.class.getName());

entity.setFields(fields);

// Listing a subset of the fields that belong to the key.
Set<String> keyFlds = new HashSet<>();

keyFlds.add("intKeyField");
keyFlds.add("strKeyField");

entity.setKeyFields(keyFlds);

// End of new settings, nothing else here is DML related

entity.setIndexes(Collections.<QueryIndex>emptyList());

cacheCfg.setQueryEntities(Collections.singletonList(entity));

ignite.createCache(cacheCfg);

2.6.5 SQL API

除了使用 JDBC 驱动程序之外,Java 开发人员还可以使用 Ignite 的 SQL API 来查询和修改存储在 Ignite 中的数据。

该类SqlFieldsQuery是用于执行 SQL 语句和浏览结果的接口。SqlFieldsQuery通过IgniteCache.query(SqlFieldsQuery)返回查询游标的方法执行。

1.配置可查询字段

如果要使用 SQL 语句查询缓存,则需要定义值对象的哪些字段是可查询的。可查询字段是 SQL 引擎可以“看到”和查询的数据模型的字段。

在 Java 中,可以通过两种方式配置可查询字段:

  • 使用注释
  • 通过定义查询实体

要使特定字段可查询,​​请在值类定义中使用@QuerySqlField注解和调用来注解字段CacheConfiguration.setIndexedTypes(…​)

class Person implements Serializable {
    /** Indexed field. Will be visible to the SQL engine. */
    @QuerySqlField(index = true)
    private long id;

    /** Queryable field. Will be visible to the SQL engine. */
    @QuerySqlField
    private String name;

    /** Will NOT be visible to the SQL engine. */
    private int age;

    /**
     * Indexed field sorted in descending order. Will be visible to the SQL engine.
     */
    @QuerySqlField(index = true, descending = true)
    private float salary;
}

public static void main(String[] args) {
    Ignite ignite = Ignition.start();
    CacheConfiguration<Long, Person> personCacheCfg = new CacheConfiguration<Long, Person>();
    personCacheCfg.setName("Person");

    personCacheCfg.setIndexedTypes(Long.class, Person.class);
    IgniteCache<Long, Person> cache = ignite.createCache(personCacheCfg);
}

确保调用CacheConfiguration.setIndexedTypes(…​)以让 SQL 引擎知道带注释的字段。

2.查询实体

QueryEntity您可以使用该类定义可查询字段。查询实体可以通过 XML 配置进行配置。

class Person implements Serializable {
    private long id;

    private String name;

    private int age;

    private float salary;
}

public static void main(String[] args) {
    Ignite ignite = Ignition.start();
    CacheConfiguration<Long, Person> personCacheCfg = new CacheConfiguration<Long, Person>();
    personCacheCfg.setName("Person");

    QueryEntity queryEntity = new QueryEntity(Long.class, Person.class)
            .addQueryField("id", Long.class.getName(), null).addQueryField("age", Integer.class.getName(), null)
            .addQueryField("salary", Float.class.getName(), null)
            .addQueryField("name", String.class.getName(), null);

    queryEntity.setIndexes(Arrays.asList(new QueryIndex("id"), new QueryIndex("salary", false)));

    personCacheCfg.setQueryEntities(Arrays.asList(queryEntity));

    IgniteCache<Long, Person> cache = ignite.createCache(personCacheCfg);
}

3.查询

要在缓存上执行选择查询,只需创建一个对象,SqlFieldsQuery将查询字符串提供给构造函数并运行cache.query(…​)。请注意,在以下示例中,必须将 Person 缓存配置为对 SQL 引擎可见。

IgniteCache<Long, Person> cache = ignite.cache("Person");

SqlFieldsQuery sql = new SqlFieldsQuery(
        "select concat(firstName, ' ', lastName) from Person");

// Iterate over the result set.
try (QueryCursor<List<?>> cursor = cache.query(sql)) {
    for (List<?> row : cursor)
        System.out.println("personName=" + row.get(0));
}

SqlFieldsQuery返回一个游标,该游标遍历与 SQL 查询匹配的结果。

4.本地执行

要强制本地执行查询,请使用SqlFieldsQuery.setLocal(true). 在这种情况下,查询是针对存储在运行查询的节点上的数据执行的。这意味着查询的结果几乎总是不完整的。仅当您确信自己了解此限制时才使用本地模式。

5.WHERE子句中的子查询

SELECT在INSERTandMERGE语句中使用的查询以及SELECT由UPDATEandDELETE操作生成的查询以colocated 或 non-colocated 分布式模式分布和执行。

但是,如果有一个子查询作为WHERE子句的一部分执行,则它只能在 colocated 模式下执行。

例如,让我们考虑以下查询:

DELETE FROM Person WHERE id IN
    (SELECT personId FROM Salary s WHERE s.amount > 2000);

SQL 引擎生成SELECT查询以获取要删除的条目列表。该查询在整个集群中分布和执行,如下所示:

SELECT _key, _val FROM Person WHERE id IN
    (SELECT personId FROM Salary s WHERE s.amount > 2000);

但是,IN子句 ( SELECT personId FROM Salary …​) 中的子查询不会进一步分布,而是在节点上可用的本地数据集上执行。

6.插入、更新、删除和合并

SqlFieldsQuery您可以执行其他 DML 命令以修改数据:

// 插入
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(
        new SqlFieldsQuery("INSERT INTO Person(id, firstName, lastName) VALUES(?, ?, ?)")
                .setArgs(1L, "John", "Smith"))
        .getAll();
// 更新
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(new SqlFieldsQuery("UPDATE Person set lastName = ? " + "WHERE id >= ?")
        .setArgs("Jones", 2L)).getAll();
// 删除
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(new SqlFieldsQuery("DELETE FROM Person " + "WHERE id >= ?").setArgs(2L))
        .getAll();
// 合并
IgniteCache<Long, Person> cache = ignite.cache("personCache");

cache.query(new SqlFieldsQuery("MERGE INTO Person(id, firstName, lastName)"
        + " values (1, 'John', 'Smith'), (5, 'Mary', 'Jones')")).getAll();

用于SqlFieldsQuery执行 DDL 语句时,必须调用getAll()从query(…​)方法返回的游标。

7.指定架构

默认情况下,执行的任何 SELECT 语句SqlFieldsQuery都将针对 PUBLIC 模式进行解析。但是,如果您要查询的表在不同的架构中,您可以通过调用来指定架构SqlFieldsQuery.setSchema(…​)。在这种情况下,语句在给定的模式中执行。

SqlFieldsQuery sql = new SqlFieldsQuery("select name from City").setSchema("PERSON");

或者,您可以在语句中定义架构:

SqlFieldsQuery sql = new SqlFieldsQuery("select name from Person.City");

8.创建表

您可以将任何受支持的 DDL 语句传递到SqlFieldsQuery缓存并在缓存上执行,如下所示。

IgniteCache<Long, Person> cache = ignite
        .getOrCreateCache(new CacheConfiguration<Long, Person>().setName("Person"));

// Creating City table.
cache.query(new SqlFieldsQuery(
        "CREATE TABLE City (id int primary key, name varchar, region varchar)")).getAll();

在 SQL 模式方面,执行代码会创建以下表:

  • “Person”模式中的表“Person”(如果之前没有创建过)。
  • “人员”模式中的表“城市”。

要查询“City”表,请使用select * from Person.Cityor之类new SqlFieldsQuery("select * from City").setSchema("PERSON")的语句(注意大写)。

9.取消查询

有两种方法可以取消长时间运行的查询。

第一种方法是通过设置查询执行超时来防止查询失控。

SqlFieldsQuery query = new SqlFieldsQuery("SELECT * from Person");

// Setting query execution timeout
query.setTimeout(10_000, TimeUnit.SECONDS);

第二种方法是通过使用来停止查询QueryCursor.close()。

SqlFieldsQuery query = new SqlFieldsQuery("SELECT * FROM Person");

// Executing the query
QueryCursor<List<?>> cursor = cache.query(query);

// Halting the query that might be still in progress.
cursor.close();

2.6.6 机器学习

Apache Ignite 机器学习 (ML) 是一组简单、可扩展且高效的工具,无需昂贵的数据传输即可构建预测机器学习模型。

将机器和深度学习 (DL) 添加到 Apache Ignite 的基本原理非常简单。今天的数据科学家必须处理阻碍 ML 被主流采用的两个主要因素:

  • 首先,在不同的系统中训练和部署模型(训练结束后)。数据科学家必须等待 ETL 或其他一些数据传输过程才能将数据移动到 Apache Mahout 或 Apache Spark 等系统中以进行培训。然后他们必须等待此过程完成并在生产环境中重新部署模型。整个过程可能需要数小时才能将数 TB 的数据从一个系统转移到另一个系统。此外,训练部分通常发生在旧数据集上。
  • 第二个因素与可扩展性有关。必须处理不再适合单个服务器单元的数据集的 ML 和 DL 算法正在不断增长。这促使数据科学家提出复杂的解决方案,或者转向分布式计算平台,如 Apache Spark 和 TensorFlow。然而,这些平台大多只解决了模型训练的一部分难题,这使得开发人员决定以后如何在生产中部署模型成为负担。

Ignite实战

1.零 ETL 和大规模可扩展性

Ignite 机器学习依赖于 Ignite 以内存为中心的存储,它为 ML 和 DL 任务带来了巨大的可扩展性,并消除了 ETL 在不同系统之间施加的等待。例如,它允许用户直接在 Ignite 集群中跨内存和磁盘存储的数据上运行 ML/DL 训练和推理。接下来,Ignite 提供了大量针对 Ignite 的并置分布式处理进行优化的 ML 和 DL 算法。当针对大量数据集或增量针对传入数据流运行时,这些实现提供内存速度和无限的水平可扩展性,而无需将数据移动到另一个存储中。通过消除数据移动和较长的处理等待时间

2.容错和持续学习

Apache Ignite 机器学习可以容忍节点故障。这意味着在学习过程中出现节点故障的情况下,所有的恢复过程对用户都是透明的,学习过程不会中断,我们会在类似于所有节点都正常工作的情况下得到结果。

3.算法和适用性

3.1 分类

根据训练集识别新观察属于哪个类别。

  • 适用性:垃圾邮件检测、图像识别、信用评分、疾病识别。
  • 算法: 逻辑回归、线性 SVM(支持向量机)、k-NN 分类、朴素贝叶斯、决策树、随机森林、多层感知器、梯度提升、ANN(近似最近邻)

3.2 回归

对标量因变量 (y) 与一个或多个解释变量或自变量 (x) 之间的关系进行建模。

  • 适用性:药物反应、股票价格、超市收入。
  • 算法:线性回归、决策树回归、k-NN 回归。

3.3 聚类

以这样一种方式对一组对象进行分组,即同一组(称为集群)中的对象彼此之间(在某种意义上)比其他组(集群)中的对象更相似。

  • 适用性:客户细分、实验结果分组、购物项目分组。
  • 算法: K-Means 聚类、高斯混合 (GMM)。

3.4 推荐

构建推荐系统,它是信息过滤系统的子类,旨在预测用户对项目的“评分”或“偏好”。

  • 适用性: 视频和音乐服务的播放列表生成器,服务的产品推荐器
  • 算法: 矩阵分解。

3.5 预处理

特征提取和归一化。

  • 适用性:转换输入数据(例如文本)以用于机器学习算法,以提取我们需要适应的特征,对输入数据进行规范化。
  • 算法: Apache Ignite ML 支持使用基于分区的数据集功能进行自定义预处理,并具有默认预处理器,例如规范化预处理器、one-hot-encoder、min-max 缩放器等。

3.总结

Ignite和Hadoop解决的是不同业务场景的问题,即使在一定程度上可能应用了类似的底层基础技术。Ignite是一种多用途,和OLAP/ OLTP内存中数据结构相关的,而Hadoop仅仅是Ignite原生支持的诸多数据来源之一。

Spark是一个和Ignite类似的项目。但是Spark聚焦于OLAP,而Ignite凭借强大的事务处理能力在混合型的OLTP/ OLAP场景中表现能力更好。特别是针对Hadoop,Ignite将为现有的MapReduce框架,Hive作业提供即插即用模式的加速,避免了推倒重来的做法,而Spark需要先做数据ETL,更适合开发新的分析应用。

4.结束语

这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

另外,博主出书了《Kafka并不难学》和《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同学, 可以在公告栏那里点击购买链接购买博主的书进行学习,在此感谢大家的支持。关注下面公众号,根据提示,可免费获取书籍的教学视频。

标签:

留言评论

  • 这篇文章还没有收到评论,赶紧来抢沙发吧~