中文字幕av专区_日韩电影在线播放_精品国产精品久久一区免费式_av在线免费观看网站

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

kafka-Storm中如何將日志文件打印到local

發布時間:2021-11-25 17:35:01 來源:億速云 閱讀:178 作者:柒染 欄目:云計算

這篇文章給大家介紹kafka-Storm中如何將日志文件打印到local,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。

閱讀前提:

        1 : 您可能需要對  logback 日志系統有所了解

        2 :您可能需要對于 kafka 有初步的了解

        3:請代碼查看之前,請您仔細參考系統的業務圖解

    由于kafka本身自帶了和『Hadoop』的接口,如果需要將kafka中的文件直接遷移到HDFS,請參看本ID的另外一篇博文:

        業務系統-kafka-Storm【日志本地化】 - 2 :直接通過kafka將日志傳遞到HDFS

    1: 一個正式環境系統的系統設計圖解:

                kafka-Storm中如何將日志文件打印到local

              通過kafka集群,在2個相同的topic之下,通過kafka-storm, he kafka-hadoop,2 個Consumer,針對同樣的一份數據,我們分流了2個管道:

            其一: 實時通道

            其二:離線通道

       在日志本地化的過程之中,前期,由于日志的清洗,過濾的工作是放在Storm集群之中,也就是說,留存到本地locla的日志。是我們在Storm集群之中進行了清洗的數據。

      也就是:

            如下圖所示:

kafka-Storm中如何將日志文件打印到local

      在kafka之中,通常而言,有如下的 代碼 用來處理:

         在這里我們針對了2種日志,有兩個Consumer用來處理

package com.mixbox.kafka.consumer;

public class logSave {

	public static void main(String[] args) throws Exception {

		Consumer_Thread visitlog = new Consumer_Thread(KafkaProperties.visit);
		visitlog.start();

		Consumer_Thread orderlog = new Consumer_Thread(KafkaProperties.order);
		orderlog.start();

	}
}

     在這里,我們依據不同的原始字段,將不同的數據保存到不同的文件之中。

package com.mixbox.kafka.consumer;

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**
 * @author Yin Shuai
 */
public class Consumer_Thread extends Thread {

	// 在事實上我們會依據傳遞的topic名稱,來生成不桐的記錄機器
	// private Logger _log_order = LoggerFactory.getLogger("order");
	// private Logger _log_visit = LoggerFactory.getLogger("visit");

	private Logger _log = null;

	private final ConsumerConnector _consumer;
	private final String _topic;

	public Consumer_Thread(String topic) {

		_consumer = kafka.consumer.Consumer
				.createJavaConsumerConnector(createConsumerConfig());
		this._topic = topic;

		_log = LoggerFactory.getLogger(_topic);

		System.err.println("log的名稱" + _topic);

	}

	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", KafkaProperties.zkConnect);
		// 在這里我們的組ID為logSave
		props.put("group.id", KafkaProperties.logSave);
		props.put("zookeeper.session.timeout.ms", "100000");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		return new ConsumerConfig(props);

	}

	public void run() {

		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(_topic, new Integer(1));

		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = _consumer
				.createMessageStreams(topicCountMap);

		for (KafkaStream<byte[], byte[]> kafkaStream : consumerMap.get(_topic)) {
			ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
			while (iterator.hasNext()) {
				MessageAndMetadata<byte[], byte[]> next = iterator.next();
				try {

					// 在這里我們分拆了一個Consumer 來處理visit日志
					logFile(next);
					System.out.println("message:"
							+ new String(next.message(), "utf-8"));
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
			}
		}
	}

	private void logFile(MessageAndMetadata<byte[], byte[]> next)
			throws UnsupportedEncodingException {
		_log.info(new String(next.message(), "utf-8"));
	}

}

    一個簡單的小tips:

        logback.xml  ,提醒您注意,這里的配置文件太過粗淺。如有需要,請自行填充。

<?xml version="1.0" encoding="UTF-8" ?>
<configuration>

	<jmxConfigurator />
	<!-- 控制臺輸出日志 -->
	<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">

		<!-- 過濾掉 TRACE 和 DEBUG 級別的日志 -->
		<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter"> -->
		<!-- <level>INFO</level> -->
		<!-- </filter> -->

		<!-- 按天來回滾,如果需要按小時來回滾,則設置為{yyyy-MM-dd_HH} -->
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>f:/opt/log/test.%d{yyyy-MM-dd}.log</fileNamePattern>
			<!-- 如果按天來回滾,則最大保存時間為1天,1天之前的都將被清理掉 -->
		</rollingPolicy>

		<!-- 日志輸出格式 -->
		<layout class="ch.qos.logback.classic.PatternLayout">
			<pattern>
				%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
				%logger{36}-%msg%n</pattern>
		</layout>
	</appender>



	<!-- 記錄到日志 文件的滾動日志 -->
	<appender name="ERROR"
		class="ch.qos.logback.core.rolling.RollingFileAppender">

		<file>
			e:/logs/error/error.log
		</file>
		<filter class="ch.qos.logback.classic.filter.LevelFilter">
			<level>
				ERROR
			</level>
			<onMatch>ACCEPT</onMatch>
			<onMismatch>DENY</onMismatch>
		</filter>
		<!-- 定義每天生成一個日志文件 -->
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>e:/logs/yuanshi-%d{yyyy-MM-dd}.log</fileNamePattern>
			<MaxHistory>10</MaxHistory>
		</rollingPolicy>

		<!-- 日志樣式 -->
		<layout class="ch.qos.logback.classic.PatternLayout">
			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
				%logger{36}-%msg%n</pattern>
		</layout>
	</appender>


	<!-- 記錄到日志 文件的滾動日志 -->
	<appender name="FILE"
		class="ch.qos.logback.core.rolling.RollingFileAppender">

		<file>E:\logs\file\file.log</file>

		<filter class="ch.qos.logback.classic.filter.LevelFilter">
			<level>INFO</level>
			<onMatch>ACCEPT</onMatch>
			<onMismatch>DENY</onMismatch>
		</filter>

		<!-- 定義每天生成一個日志文件 -->
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>e:/logs/venality-%d{yyyy-MM-dd}.log
			</fileNamePattern>
			<MaxHistory>10</MaxHistory>
		</rollingPolicy>

		<!-- 日志樣式 -->
		<layout class="ch.qos.logback.classic.PatternLayout">
			<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level
				%logger{36}-%msg%n</pattern>
		</layout>
	</appender>


	<appender name="visit"
	class="ch.qos.logback.core.rolling.RollingFileAppender">
		<File>
			E:\logs\visitlog\visit.log
		</File>
		<encoder>
			<pattern>%msg%n</pattern>
		</encoder>

		<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
			<level>INFO</level>
		</filter>
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>E:\logs\visit.log.%d{yyyy-MM-dd}
			</fileNamePattern>
		</rollingPolicy>
	</appender>
	<logger name="visit" additivity="false" level="INFO">
		<appender-ref ref="visit" />
	</logger>


	<appender name="order"
		class="ch.qos.logback.core.rolling.RollingFileAppender">
		<File>
			E:\logs\orderlog\order.log
		</File>
		<encoder>
			<pattern>%msg%n
			</pattern>
		</encoder>

		<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
			<level>INFO</level>
		</filter>
		<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
			<fileNamePattern>E:\logs\order.log.%d{yyyy-MM-dd}
			</fileNamePattern>
		</rollingPolicy>
	</appender>
	<logger name="order" additivity="false" level="INFO">
		<appender-ref ref="order" />
	</logger>


	<root level="DEBUG">
		<appender-ref ref="FILE" />
	</root>
</configuration>

關于kafka-Storm中如何將日志文件打印到local就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

吉林市| 金川县| 中卫市| 内丘县| 涟水县| 西乌珠穆沁旗| 临江市| 浙江省| 宁陕县| 原阳县| 滦平县| 筠连县| 农安县| 青神县| 合作市| 杭锦后旗| 华亭县| 大化| 清河县| 东阿县| 壤塘县| 巧家县| 河池市| 六安市| 饶河县| 渭源县| 屯昌县| 武威市| 新余市| 大庆市| 那坡县| 永济市| 普格县| 洪洞县| 洪雅县| 东丰县| 陆丰市| 青河县| 汶上县| 元阳县| 孝义市|