自己定义一个分区类,需要自己实现一个Partitione接口 !(泛型最好写上)

    import org.apache.flink.api.common.functions.Partitioner
    public class MyPartition implements Partitioner<Long> {
        public int partition(Long key, int numPartitions) {
            System.out.println("分区总数"+numPartitions);
            if(key % 2== 0){
                return 0;
            }else {
                return 1;
            }
        }
    }

具体实现类:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple1;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class StreamingDemoWithMyPartition {
        /**
         * 使用自定义分区
         * 根据数字的奇偶性来进行分区
         *
         * @param args
         */
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<Long> text = env.addSource(new MyNoParalleSource());
            // 对数据进行转换,把long类型的转换成tuple类型
            DataStream<Tuple1<Long>> tupData = text.map(new MapFunction<Long, Tuple1<Long>>() {
                public Tuple1<Long> map(Long aLong) throws Exception {
                    return new Tuple1<Long>(aLong);
                }
            });
            //分区之后的数据
            DataStream<Tuple1<Long>> partData = tupData.partitionCustom(new MyPartition(), 0);
            SingleOutputStreamOperator<Long> maps = partData.map(new MapFunction<Tuple1<Long>, Long>() {
                public Long map(Tuple1<Long> value) throws Exception {
                    System.out.println("获取当前线程id" + Thread.currentThread().getId() + ",value" + value);
                    return value.getField(0);
                }
            });
    
            maps.print().setParallelism(1);
            env.execute("own definite partiotn");
            /**
             * 结果为:
             * 分区总数8
             * 获取当前线程id77,value(1)
             * 1
             * 分区总数8
             * 获取当前线程id81,value(2)
             * 2
             * 分区总数8
             * 获取当前线程id77,value(3)
             * 3
             * 分区总数8
             * 获取当前线程id81,value(4)
             * 4
             * 分区总数8
             * 获取当前线程id77,value(5)
             * 5
             * 分区总数8
             * 获取当前线程id81,value(6)
             * 6
             * 分区总数8
             * 获取当前线程id77,value(7)
             * 7
             * 分区总数8
             * 获取当前线程id81,value(8)
             * 8
             */
        }
    }

scala代码实现案例:


package streaming

import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object StreamingDemoWithMyPartition {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)

    import org.apache.flink.api.scala._
    val text = env.addSource(new MyNoParallelScala)
    val tupdata = text.map(line=>{Tuple1(line)})
    val partdata = tupdata.partitionCustom(new MyPartition,0)
    val result = partdata.map(line => {
      println("当前线程id" + Thread.currentThread().getId + ",value" + line)
      line._1
    })
    result.print().setParallelism(1)

    env.execute("StreamingDemoWithMyPartition")
  }

  class MyPartition extends Partitioner[Long]{
    override def partition(key: Long, numpartition: Int): Int = {
      System.out.println("总的分区数"+numpartition)
      if (key%2==0){
        0
      }else{
        1
      }
    }
  }
}

版权声明:本文为weixin_39150719原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_39150719/article/details/101029901