blog

Elastic Jobシリーズの入門デモを熟練して使用する。

Elastic-Job-LiteとElastic-Job-Cloudは統一されたジョブインターフェースを提供します。開発者はジョブインターフェースの実装を行うだけで、異なるコンフィギュレーションとデプ...

Jan 19, 2021 · 9 min. read
シェア

シリーズ・ナビゲーション

Elastic-Job-LiteとElastic-Job-Cloudは統一されたジョブ・インターフェースを提供し、開発者はジョブ・インターフェースを独自に実装するだけで、様々な設定やデプロイメントを分散ジョブに適用することができます。

Java設定の起動

  1. 最初にmaven依存性を導入します。
<dependency>
 <groupId>com.dangdang</groupId>
 <artifactId>elastic-job-lite-core</artifactId>
 <version>2.1.5</version>
</dependency>
  1. 次に、業務用の統一されたジョブ・インターフェースを実装します。
@Slf4j public class MyElasticJob implements SimpleJob { @Override public void execute(ShardingContext context) { switch (context.getShardingItem()) { case 0: log.info("0000"); break; case 1: log.info("1111"); break; case 2: log.info("2222"); break; } } }
  1. ジョブの設定作業
// ジョブコンテンツの基本構成
private static LiteJobConfiguration createJobConfiguration() {
 // ジョブのコアコンフィギュレーションを定義する
 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
 // SIMPLE型構成を定義する
 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
 // Liteジョブルート構成を定義する
 return LiteJobConfiguration.newBuilder(simpleJobConfig).build();
}
// zookeeperレジストリ構成
private static CoordinatorRegistryCenter createRegistryCenter() {
 CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("zk_host:2181", "elastic-job-demo"));
 regCenter.init();
 return regCenter;
}
  1. ジョブの実行開始
public class JavaMain { public static void main(String[] args) { new JobScheduler(createRegistryCenter(), createJobConfiguration()).init(); } }

コンソールのプリントアウトから、Demoが正常に書き込みと実行を行っていることがわかります!

ほとんどの人がスプリングかスプリングブーツを使っているので、この2つの方法も次に紹介します。

Springコンフィギュレーションの使用開始

  1. 次のmaven依存関係を追加します。
<dependency>
 <groupId>com.dangdang</groupId>
 <artifactId>elastic-job-lite-spring</artifactId>
 <version>2.1.5</version>
</dependency>
 <dependency>
 <groupId>org.springframework</groupId>
 <artifactId>spring-context</artifactId>
</dependency>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://..org/schema/beans"
 xmlns:xsi="http://..org/2100"/XMLSchema-instance"
 xmlns:reg="http://..com/schema/ddframe/reg"
 xmlns:job="http://..com/schema/ddframe/job"
 xsi:schemaLocation="http://..org/schema/beans
 http://..org/schema/beans/spring-.xsd
 http://..com/schema/ddframe/reg
 http://..com/schema/ddframe/reg/.xsd
 http://..com/schema/ddframe/job
 http://..com/schema/ddframe/job/.xsd
 ">
 <!--ジョブレジストリを設定する>
 <reg:zookeeper id="regCenter" server-lists="192.168.104.102:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000"
 max-retries="3"/>
 <!-- ジョブを構成する>
 <job:simple id="demoSimpleSpringJob" class="example.MyElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3"
 sharding-item-parameters="0=A,1=B,2=C"/>
</beans>
  1. ジョブの開始 プログラムはスプリング設定ファイルの読み込みを開始し、ジョブは自動的にロードされます。
public class JavaMain { public static void main(String[] args) { new ClassPathXmlApplicationContext("classpath:applicationContext.xml"); } }

ジョブの種類の紹介

Elastic-Jobはシンプル、データフロー、スクリプトの3種類のジョブを提供します。

Simpleジョブの種類

単純に実装された、カプセル化されていない型を意味します。SimpleJob インターフェースを実装する必要があります。このインターフェースは、オーバーライドするための単一のメソッドのみを提供し、このメソッドは定期的に実行されます。Quartz ネイティブインターフェースに似ていますが、エラスティックなスケーリングやスライシングなどの機能を提供します。

public class MyElasticJob implements SimpleJob {
 
 @Override
 public void execute(ShardingContext context) {
 switch (context.getShardingItem()) {
 case 0: 
 // do something by sharding item 0
 break;
 case 1: 
 // do something by sharding item 1
 break;
 case 2: 
 // do something by sharding item 2
 break;
 // case n: ...
 }
 }
}

Dataflowジョブの種類

Dataflowタイプはデータストリームを処理するために使用され、DataflowJobインターフェースを実装する必要があります。このインターフェースはオーバーライドする2つのメソッドを提供し、それぞれデータの取得と処理に使われます。

ストリーミングするかどうかは、DataflowJobConfigurationを介して設定することができます。

ストリーミングデータ処理は、fetchDataメソッドの戻り値がNULL、またはコレクションの長さが空の場合のみ、ジョブはクロールを停止し、それ以外のジョブは実行し続けます。非ストリーミングデータ処理は、fetchDataメソッドとprocessDataメソッドの各ジョブの実行中に1回のみ実行され、その後、ジョブを完了します。

ストリーミングジョブ処理を使用する場合、fetchDataが再びデータを取得するのを避けるために、processDataがデータ処理後にその状態を更新し、ジョブが停止しないようにすることをお勧めします。 ストリーミングデータ処理はTbScheduleを参考に設計されており、非断続的なデータ処理に適しています。

public class MyElasticJob implements DataflowJob<Foo> {
 
 @Override
 public List<Foo> fetchData(ShardingContext context) {
 switch (context.getShardingItem()) {
 case 0: 
 List<Foo> data = // get data from database by sharding item 0
 return data;
 case 1: 
 List<Foo> data = // get data from database by sharding item 1
 return data;
 case 2: 
 List<Foo> data = // get data from database by sharding item 2
 return data;
 // case n: ...
 }
 }
 
 @Override
 public void processData(ShardingContext shardingContext, List<Foo> data) {
 // process data
 // ...
 }
}

Scriptジョブの種類

スクリプトタイプジョブとは、スクリプトタイプのジョブを意味し、シェル、パイソン、perlなどのあらゆるタイプのスクリプトをサポートします。コンソールやコードから scriptCommandLine を設定するだけで、コーディングは必要ありません。スクリプトを実行するパスはパラメーターを含むことができ、パラメーターが渡された後、ジョブフレームワークは自動的に最後のパラメーターをジョブの実行時情報に追加します。

#!/bin/bash
echo sharding execution context is $*

ジョブが実行されると

sharding execution context is {「jobName”:"scriptElasticDemoJob”,"shardingTotalCount”:10,「jobParameter”: ”,「shardingItem”:0,「shardingParameter”: ”}

上記のメソッドのパラメータshardingContextには、ジョブ構成、スライス、ランタイム情報が含まれます。getShardingTotalCount()スライスの総数や、このジョブサーバ上で実行されているスライスのシーケンス番号などは、それぞれ , getShardingItem() などで取得できます。

コンフィギュレーション

コンフィギュレーション

Elastic-Jobのコンフィギュレーションは、Core、Type、Rootの3つの階層に分かれており、それぞれの階層はデコレーター・パターンと同様の方法で組み立てられています。

Core は JobCoreConfiguration に対応し、ジョブ名、総スライス数、CRON 式などのジョブコア設定情報を提供するために使用されます。

Type は JobTypeConfiguration に対応し、SIMPLE、DATAFLOW、SCRIPT の 3 つのサブクラスがあります。

RootはJobRootConfigurationに対応し、LiteとCloudのデプロイメントタイプに対応する2つのサブクラスがあり、Liteタイプがローカルコンフィギュレーションをオーバーライドする必要があるか、CloudがCPUやメモリの数を占有するかなど、異なるデプロイメントタイプで必要なコンフィギュレーションを提供します。

次のようなコードです。

// ジョブのコア構成を定義する
JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
// SIMPLE型構成を定義する
SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, SimpleDemoJob.class.getCanonicalName());
// Liteジョブルート構成を定義する
JobRootConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();

SpringJavaの設定メモ: Lite

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://..org/schema/beans"
 xmlns:xsi="http://..org/2100"/XMLSchema-instance"
 xmlns:reg="http://..com/schema/ddframe/reg"
 xmlns:job="http://..com/schema/ddframe/job"
 xsi:schemaLocation="http://..org/schema/beans 
 http://..org/schema/beans/spring-.xsd 
 http://..com/schema/ddframe/reg 
 http://..com/schema/ddframe/reg/.xsd 
 http://..com/schema/ddframe/job 
 http://..com/schema/ddframe/job/.xsd 
 ">
 <!--ジョブレジストリを設定する>
 <reg:zookeeper id="regCenter" server-lists="yourhost:2181" namespace="dd-job" base-sleep-time-milliseconds="1000" max-sleep-time-milliseconds="3000" max-retries="3" />
 
 <!-- シンプルなジョブの設定>
 <job:simple id="simpleElasticJob" class="xxx.MySimpleElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
 
 <bean id="yourRefJobBeanId" class="xxx.MySimpleRefElasticJob">
 <property name="fooService" ref="xxx.FooService"/>
 </bean>
 
 <!-- 関連するBeanジョブを設定する>
 <job:simple id="simpleRefElasticJob" job-ref="yourRefJobBeanId" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
 
 <!-- データフロークジョブを設定する>
 <job:dataflow id="throughputDataflow" class="xxx.MyThroughputDataflowElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" />
 
 <!-- 設定スクリプトジョブ>
 <job:script id="scriptElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" script-command-line="/your/file/path/demo.sh" />
 
 <!-- リスナーを使ったシンプルなジョブの設定>
 <job:simple id="listenerElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C">
 <job:listener class="xx.MySimpleJobListener"/>
 <job:distributed-listener class="xx.MyOnceSimpleJobListener" started-timeout-milliseconds="1000" completed-timeout-milliseconds="2000" />
 </job:simple>
 
 <!-- ジョブデータベースのイベントトラッキングでシンプルなジョブを設定する>
 <job:simple id="eventTraceElasticJob" class="xxx.MySimpleListenerElasticJob" registry-center-ref="regCenter" cron="0/10 * * * * ?" sharding-total-count="3" sharding-item-parameters="0=A,1=B,2=C" event-trace-rdb-data-source="yourDataSource">
 </job:simple>
</beans>

JavaJavaの設定メモ

public final class JavaMain {
 public static void main(String[] args) {
 new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
 }
 private static CoordinatorRegistryCenter createRegistryCenter() {
 CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(
 new ZookeeperConfiguration("192.168.104.102:2181", "elastic-job-demo-quick-demo"));
 regCenter.init();
 return regCenter;
 }
 private static LiteJobConfiguration createJobConfiguration() {
 // ジョブのコア構成を定義する
 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob", "0/15 * * * * ?", 10).build();
 // SIMPLE型構成を定義する
 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MyElasticJob.class.getCanonicalName());
 // Liteジョブルート構成を定義する
 return LiteJobConfiguration.newBuilder(simpleJobConfig).build();
 }
}
Read next

Android 6.0 動的パーミッションの仕組み

動的パーミッションのメカニズムの導入により、いくつかのシステムパーミッションはアプリのランタイムで指定されるのではなく、アプリのランタイムで割り当てられる必要があります。 この記事では、動的パーミッションの基本的な割り当てプロセスを分析します。 Android M ...

Jan 19, 2021 · 22 min read