Spring Cloud Stream knowledge points inventory

Spring Cloud Stream knowledge points inventory

Click on "IT Ranch" above and select "Set as Star" to deliver technical dry goods daily!

Previously, we have discussed:

Spring Cloud Stream implements message filtering and consumptionSpring Cloud Stream error handling in detail

This article makes an inventory and summary of Spring Cloud Stream, including:

• Concepts • Stream annotations • Spring Cloud Integration (the bottom layer of Spring Cloud Stream) annotations • Spring Messaging (Spring messaging programming model) annotations • Spring Cloud Stream API

concept

group

There is only 1 instance consumption in the group. If no group is set, stream will automatically create an anonymous and independent group for each instance-so each instance will consume.

There is only 1 instance consumption in the group at a time, and load balancing will be polled. In general, when binding an application to a given target, it is best to always specify a consumer group.

destination binder

Components communicate with external messaging systems configured to Bindingprovide two methods, respectively, bindConsumerand bindProducerwhich are respectively configured for producers and consumers. Binder enables Spring Cloud Stream applications to be flexibly connected to middleware. Currently, spring provides binders for kafka and rabbitmq.

destination binding

Binding It is a bridge connecting applications and message middleware, used for message consumption and production, created by binder.

partition

TIPS is not strictly a concept, but a way for Stream to improve scalability and throughput. But I don’t want to make a new title, so write it here.

One or more producers send data to multiple consumers and ensure that data with common characteristics is processed by the same consumer. The default is to hashCode the message, and then take the remainder based on the number of partitions, so the same message will always fall on the same consumer.

annotation

Input(Stream)

Example:

public interface Barista {
    @Input("inboundOrders")
    SubscribableChannel orders();
}

effect:

•Used to receive messages•Generate channel instances for each binding•Specify the channel name•Generate a bean named inboundOrders and type SubscribableChannel in the spring container•Generate a class in the spring container to implement the Barista interface.

Output(Stream)

Example:

public interface Source {
    @Output
    MessageChannel output();
}

effect:

Similar to Input, but used to produce messages.

StreamListener(Stream)

Example:

@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
public void handle(String body) {
    System.out.println("Received: "+ body);
}

@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
        poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2"))
public MessageSource<String> test() {
    return () -> {
        Map<String, Object> map = new HashMap<>(1);
        map.put("type", "dog");
        return new GenericMessage<>("abcdef", map);
    };
}

effect:

Used to consume messages

The role of condition: Only when the condition is met, the processing method is entered.

The two conditions for condition to work:

• The annotated method does not return a value. • The method is an independent method and does not support the Reactive API

SendTo(messaging)

Example:

//Receive the message of the INPUT channel and send the return value to the OUTPUT channel
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String receive(String receiveMsg) {
   return "handle...";
}

effect:

Used to send messages

InboundChannelAdapter(Integration)

Example:

@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
        poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource<String> test() {
    return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}

effect:

Means to let the defined method produce the message.

Note: Use InboundChannelAdaptereven with useless parameters on the annotated method. That is, the following test method should not have parameters.

• fixedDelay: how many milliseconds to send once • maxMessagesPerPoll: send several messages at a time.

ServiceActivator(Integration)

Example:

@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
    return payload.toUpperCase();
}

effect:

Represents that the method can process the message or the effective content of the message, monitor the input message, process it with the code of the method body, and then output it to the output.

Transformer(Integration)

Example:

@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
  return message.toUpperCase();
}

effect:

And ServiceActivatorthe like, can be converted representation message, message header, or the content of the message is valid

PollableMessageSource(Stream)

Sample code:

@SpringBootApplication
@EnableBinding({ConsumerApplication.PolledProcessor.class})
@EnableScheduling
public class ConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Autowired
    private PolledProcessor polledProcessor;

    @Scheduled(fixedDelay = 5_000)
    public void poll() {
        polledProcessor.input().poll(message -> {
            byte[] bytes = (byte[]) message.getPayload();
            String payload = new String(bytes);
            System.out.println(payload);
        });
    }

    public interface PolledProcessor {
        @Input
        PollableMessageSource input();

        @Output
        MessageChannel output();
    }

    @Bean
    @InboundChannelAdapter(value = "output",
            poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1"))
    public MessageSource<String> test() {
        return () -> {
            Map<String, Object> map = new HashMap<>(1);
            map.put("type", "dog");
            return new GenericMessage<>("adfdfdsafdsfa", map);
        };
    }
}

If you don't want to do byte array conversion yourself, you can add configuration:

spring:
  cloud:
    stream:
      bindings:
        output:
          # Specify content-type
          content-type: text/plain

effect:

Allow consumers to control the rate of consumption.

related articles:

https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers[1]

Dry goods sharing

Recently, I organize personal study notes into a book and share them in PDF. Follow me, reply to the following code, you can get the Baidu disk address, no routine to receive! •001: "Java Concurrency and High Concurrency Solutions" study notes; •002: "Deep JVM Kernel-Principles, Diagnosis and Optimization" study notes; •003: "Java Interview Collection" • 004: "Docker Open Source Book" • 005: "Kubernetes Open Source Book" • 006: "DDD Crash (Domain Driven Design Crash)" • 007: All • 008: Add technical discussion group

Recent hot articles

Amazon's practice field-driven design approachSummary of several strategies and combination analysis of advantages and disadvantages in the process of caching useUnderstand QPS, TPS, PV, UV, GMV, IP, RPS in seconds! Spring Cloud Stream error handling detailed explanationMulti-account unified login to achieve the whole processSpring Cloud Stream to achieve message filtering consumption

References

[1]: https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers

Reference: https://cloud.tencent.com/developer/article/1486309 Spring Cloud Stream Knowledge Point Inventory-Cloud + Community-Tencent Cloud