BigData LogManager для 1С

Сделал установку метки времени, на основании имени файла лога и строки записи. Добавил вывод полей описания события, текста запроса, плана запроса. Получается вот такой текст для установки pipeline’а:

[details=Summary]curl -XPUT “http://192.168.56.10:9200/_ingest/pipeline/onec_pipeline” -d’
{
“description”: “onec pipeline”,
“processors”: [
{
“grok”: {
“field”: “message”,
“patterns”: [
"%{INT:_ingest.tempmm}:%{BASE10NUM:_ingest.tempss}-%{INT:duration},%{WORD:event},%{INT:level}"
]
}
},
{
“grok”: {
“field”: “source”,
“patterns”: [
"%{INT:_ingest.tempyymmddhh}.log"
]
}
},
{
“set”: {
“field”: “_ingest.tempdate”,
“value”: “{{_ingest.tempyymmddhh}}{{_ingest.tempmm}}{{_ingest.tempss}}”
}
},
{
“date”: {
“field”: “_ingest.tempdate”,
“target_field”: “@timestamp”,
“formats”: [
“yyMMddHHmmss.SSSSSS”
]
}
},
{
“grok”: {
“field”: “message”,
“patterns”: [
“Descr=%{QS:Descr}”
],
“on_failure”: [
{
“set”: {
“field”: “Descr”,
“value”: “”
}
}
]
}
},
{
“grok”: {
“field”: “message”,
“patterns”: [
“Sql=%{QS:Sql}”
],
“on_failure”: [
{
“set”: {
“field”: “Sql”,
“value”: “”
}
}
]
}
},
{
“grok”: {
“field”: “message”,
“patterns”: [
“planSQLText=%{QS:planSQLText}”
],
“on_failure”: [
{
“set”: {
“field”: “planSQLText”,
“value”: “”
}
}
]
}
}
]
}’[/details]

Так как поля описания события, текста запроса или плана запроса могут присутствовать не во всех записях лога, то в grok-процессор нужно добавить параметр “on_failure”, который будет записывать в поле пустое значение (или любое другое, которое можно указать) если по шаблону ничего не найдено.
Интересная возможность использовать “локальные” переменные в pipeline. Такие поля можно писать в _ingest.ИмяПоля, например с помощью этой возможности собираю дату записи по кусочкам. Важно, что эти поля в индекс не попадают, а используются только в предобработке.
Обращение к полям возможно через {{ИмяПоля}}, т.е. используя двойные фигурные скобки. Это понадобится, чтобы конкатенировать значения полей в процессоре set.
И еще, оказывается pipeline по умолчанию сам создает в индексе новые поля, это можно явно запретить, но это удобно - нет необходимости создавать свой шаблон индекса в настройках filebeat.
В итоге, можно полноценно парсить логи ТЖ.
(текст команды для создания pipeline при вставке его в сообщение форма почему-то местами обрезал символ _ в обращениях к ingest - правильно нужно указывать _ingest, а не просто ingest).

2 Симпатий

Если 1С генерирует в минуту событий больше, чем успевает отправить в ES, то что можно предпринять?
Размазывать ES горизонтально и как-то разными потоками отправлять данные из 1С?

А как борешься, с тем то время увеличивается на разницу в часовом поясе?

Точно не могу сказать, но решение будет скорее всего в горизонтальном масштабировании ES. Пока возможностей Filebeat и ES хватает, так как в ТЖ выводится только самая необходимая информация (исключения, длительные запросы (без планов выполнения), таймауты и взаимоблокировки на управляемых блокировках). В день, примерно 200 Мб логов.

А вот здесь, не борюсь, смирился с этим) Но решение проблемы нужно будет попробовать найти.

А если попробовать через mutate и gsub преобразовать к виду 2017-01-31T15:25:17.234+0200? Вроде в таком виде должно подхватить.

Вот мой вариант с часовым поясом

{
    "description": "onec pipeline",
    "processors": [
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "%{INT:_ingest.tempmm}:%{BASE10NUM:_ingest.tempss}-%{INT:duration},%{WORD:event},%{INT:level}"
                ]
            }
        },
        {
            "grok": {
                "field": "source",
                "patterns": [
                    "%{INT:_ingest.tempyymmddhh}.log"
                ]
            }
        },
        {
            "set": {
                "field": "_ingest.tempdate",
                "value": "{{_ingest.tempyymmddhh}}{{_ingest.tempmm}}{{_ingest.tempss}}"
            }
        },
        {
            "date": {
                "field": "_ingest.tempdate",
                "target_field": "@timestamp",
                "formats": [
                    "yyMMddHHmmss.SSSSSS"
                ],
                 "timezone" : "Europe/Kiev"
            }
        },
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "Descr=%{QS:Descr}"
                ],
                "on_failure": [
                    {
                        "set": {
                            "field": "Descr",
                            "value": ""
                        }
                    }
                ]
            }
        },
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "Sql=%{QS:Sql}"
                ],
                "on_failure": [
                    {
                        "set": {
                            "field": "Sql",
                            "value": ""
                        }
                    }
                ]
            }
        },
        {
            "grok": {
                "field": "message",
                "patterns": [
                    "planSQLText=%{QS:planSQLText}"
                ],
                "on_failure": [
                    {
                        "set": {
                            "field": "planSQLText",
                            "value": ""
                        }
                    }
                ]
            }
        }
    ]
}

У меня подобная проблема была, BigData LogManager для 1С

Да, спасибо за решение вопроса по часовому поясу. Все работает!

Вот мой вариант, добавил выгребание контекста, имени пользователя и ИБ в отдельные поля. Работает без template индекса. Контекст пришлось забирать до конца сообщения в многострочном режиме, поскольку платформа любит его по-разному записывать - то внутри кавычек, то совсем без них.

{
  "description": "tjournal",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "%{INT:_ingest.tempmm}:%{BASE10NUM:_ingest.tempss}-%{INT:duration},%{WORD:event},%{INT:level}"
        ]
      }
    },
    {
      "grok": {
        "field": "source",
        "patterns": [
          "%{INT:_ingest.tempyymmddhh}.log"
        ]
      }
    },
    {
      "set": {
        "field": "_ingest.tempdate",
        "value": "{{_ingest.tempyymmddhh}}{{_ingest.tempmm}}{{_ingest.tempss}}"
      }
    },
    {
      "date": {
        "field": "_ingest.tempdate",
        "target_field": "@timestamp",
        "formats": [
          "yyMMddHHmmss.SSSSSS"
        ],
        "timezone": "Asia/Yekaterinburg"
      }
    },
    {
      "date_index_name": {
        "field": "@timestamp",
        "index_name_prefix": "tjournal-",
        "date_rounding": "d",
        "timezone": "Asia/Yekaterinburg"
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "Descr=%{QS:Descr}"
        ],
        "on_failure": [
          {
            "set": {
              "field": "Descr",
              "value": ""
            }
          }
        ]
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "Sql=%{QS:Sql}"
        ],
        "on_failure": [
          {
            "set": {
              "field": "Sql",
              "value": ""
            }
          }
        ]
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "planSQLText=%{QS:planSQLText}"
        ],
        "on_failure": [
          {
            "set": {
              "field": "planSQLText",
              "value": ""
            }
          }
        ]
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "p:processName=%{WORD:infobase},"
        ],
        "on_failure": [
          {
            "set": {
              "field": "infobase",
              "value": ""
            }
          }
        ]
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "Usr=%{WORD:username},"
        ],
        "on_failure": [
          {
            "set": {
              "field": "username",
              "value": ""
            }
          }
        ]
      }
    },
    {
      "grok": {
        "field": "message",
        "patterns": [
          "Context=%{CONT:Context}"
        ],
        "pattern_definitions": {
            "CONT": "(?m)(.*)"
        },
        "on_failure": [
          {
            "set": {
              "field": "Context",
              "value": ""
            }
          }
        ]
      }
    },
    {
      "convert": {
        "field": "duration",
        "type": "integer"
      }
    },
    {
      "convert": {
        "field": "level",
        "type": "integer"
      }
    }
  ]
}

По результатам работы сбора ТЖ и использования Ingest ноды на рабочем кластере серверов выявил одну неприятную особенность. Filebeat при большом количестве записей лог файлов начинает дублировать отправку сообщений на Ingest ноду, так как elastic не справляется с быстрой обработкой поступающих данных (ресурсы сервера с elastic у меня ограничены). Очень неприятная особенность, так как статистика по логам получается кривая.
Обойти это можно так - в процессоре устанавливать поле _id как:

"{{_ingest.tempdate}}-{{_source.offset}}"

Это всегда (ну почти) уникальное значение из даты и смещения в файле логов ТЖ.
Таким способом при повторной отправке документы в elastic просто обновляются. Да, это создает нагрузку на удаление старых версий, запись новых и индексацию, но мне важно, чтобы сообщения не дублировались.
При большой нагрузке, когда приходится собирать события TLOCK, версии некоторых документов доходили до 38. Т.е. 38 раз Filebeat слал в ingest ноду одну запись. Побороть такую особенность настройками Filebeat у меня не получилось.

По поводу процессора, все поля можно получать по такому универсальному шаблону:

{
  "grok": {
    "field": "message",
    "patterns": [
      "TechLogPropertyName=(%{DATA:TechLogPropertyName},|%{GREEDYDATA:TechLogPropertyName})"
    ],
    "on_failure": [
      {
        "set": {
          "field": "TechLogPropertyName",
          "value": ""
        }
      }
    ]
  }
}

В процессоре прописываем все необходимые поля по такому шаблону, заменяя TechLogPropertyName на наименование свойства (process, sql и т.д.) и собираем их как конструктор.
Только для поля Descr пришлось немного исправить паттерн:

"Descr=(%{QS:Descr}|%{DATA:Descr},|%{GREEDYDATA:Descr})"

Еще, в конце процессора добавил блок on_failure, чтобы при ошибке разбора сообщение все равно записывалось, а в поле error выводилась информация об ошибке. Вот что получилось:

"on_failure": [
{
  "set": {
    "field": "error",
    "value": "{{_ingest.on_failure_message}}"
  }
}

]

Мониторинг работает отлично. Прикрутил еще X-Pack от Elastic. Самые полезные плагины в нем, это плагин для ограничения доступа к серверу elastic и алерты. Но, X-Pack платный, по бесплатной лицензии можно использовать только плагин мониторинга сервера. В итоге, после окончания триального периода, пришлось искать другие решения.
Для алертов есть Elastalert (с трудом настроил), для ограничения доступа тоже есть свободные решения.

Вот итоговый pipeline который я использую:

"onec_techlog_pipeline": {
"description": "onec tech log pipeline",
"processors": [
  {
    "set": {
      "field": "LogRowsID",
      "value": [
        "{{source}}",
        "{{_source.offset}}"
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "%{INT:_ingest.tempmm}:%{BASE10NUM:_ingest.tempss}-%{INT:duration},(%{WORD:event}|%{SPACE:event}),%{INT:level}"
      ]
    }
  },
  {
    "grok": {
      "field": "source",
      "patterns": [
        "%{INT:_ingest.tempyymmddhh}.log"
      ]
    }
  },
  {
    "set": {
      "field": "_ingest.tempdate",
      "value": "{{_ingest.tempyymmddhh}}{{_ingest.tempmm}}{{_ingest.tempss}}"
    }
  },
  {
    "date": {
      "field": "_ingest.tempdate",
      "target_field": "@timestamp",
      "formats": [
        "yyMMddHHmmss.SSSSSS"
      ],
      "timezone": "Europe/Moscow"
    }
  },
  {
    "set": {
      "field": "_id",
      "value": "{{_ingest.tempdate}}-{{_source.offset}}"
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "process=(%{DATA:process},|%{GREEDYDATA:process})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "process",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "p:processName=(%{DATA:processName},|%{GREEDYDATA:processName})",
        "ProcessName=(%{DATA:processName},|%{GREEDYDATA:processName})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "processName",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "t:clientID=(%{DATA:clientID},|%{GREEDYDATA:clientID})",
        "ClientID=(%{DATA:clientID},|%{GREEDYDATA:clientID})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "clientID",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "t:applicationName=(%{DATA:applicationName},|%{GREEDYDATA:applicationName})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "applicationName",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "t:computerName=(%{DATA:computerName},|%{GREEDYDATA:computerName})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "computerName",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "t:connectID=(%{DATA:connectID},|%{GREEDYDATA:connectID})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "connectID",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "SessionID=(%{DATA:SessionID},|%{GREEDYDATA:SessionID})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "SessionID",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Usr=(%{DATA:Usr},|%{GREEDYDATA:Usr})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Usr",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Descr=(%{QS:Descr}|%{DATA:Descr},|%{GREEDYDATA:Descr})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Descr",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "AppID=(%{DATA:AppID},|%{GREEDYDATA:AppID})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "AppID",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Trans=(%{DATA:Trans},|%{GREEDYDATA:Trans})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Trans",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "WaitConnections=(%{DATA:WaitConnections},|%{GREEDYDATA:WaitConnections})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "WaitConnections",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Regions=(%{DATA:Regions},|%{GREEDYDATA:Regions})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Regions",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Locks=(%{DATA:Locks},|%{GREEDYDATA:Locks})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Locks",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "DeadlockConnectionIntersections=(%{DATA:DeadlockConnectionIntersections},|%{GREEDYDATA:DeadlockConnectionIntersections})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "DeadlockConnectionIntersections",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Exception=(%{DATA:Exception},|%{GREEDYDATA:Exception})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Exception",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Sql=(%{QS:Sql}|%{DATA:Sql},|%{GREEDYDATA:Sql})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Sql",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "planSQLText=(%{QS:planSQLText}|%{DATA:planSQLText},|%{GREEDYDATA:planSQLText})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "planSQLText",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Rows=(%{DATA:Rows},|%{GREEDYDATA:Rows})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Rows",
            "value": 0
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "RowsAffected=(%{DATA:RowsAffected},|%{GREEDYDATA:RowsAffected})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "RowsAffected",
            "value": 0
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "dbpid=(%{DATA:dbpid},|%{GREEDYDATA:dbpid})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "dbpid",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Func=(%{QS:Func}|%{DATA:Func},|%{GREEDYDATA:Func})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Func",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Sdbl=(%{QS:Sdbl}|%{DATA:Sdbl},|%{GREEDYDATA:Sdbl})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Sdbl",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "SrcName=(%{DATA:SrcName},|%{GREEDYDATA:SrcName})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "SrcName",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "OSThread=(%{DATA:OSThread},|%{GREEDYDATA:OSThread})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "OSThread",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Prm=(%{QS:Prm}|%{DATA:Prm},|%{GREEDYDATA:Prm})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Prm",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Context=(%{QS:Context}|%{DATA:Context},|%{GREEDYDATA:Context})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Context",
            "value": ""
          }
        }
      ]
    }
  },
  {
    "grok": {
      "field": "message",
      "patterns": [
        "Txt=(%{QS:Txt}|%{DATA:Txt},|%{GREEDYDATA:Txt})"
      ],
      "on_failure": [
        {
          "set": {
            "field": "Txt",
            "value": ""
          }
        }
      ]
    }
  }
],
"on_failure": [
  {
    "set": {
      "field": "error",
      "value": "{{_ingest.on_failure_message}}"
    }
  }
]

}

По полю Context - многострочный контекст такая конструкция обработает? У меня ни QS, ни DATA, ни GREEDYDATA с этим не справились, пришлось городить свой огород.

Про настройки filebeat - пробовали менять bulk_max_size?

Да, поле Context получается без проблем:

Из такого сообщения:

По filebeat - настройки bulk_max_size менял, не сильно помогло. Вообще, эта особенность с дублированием сообщений большей частью из-за слабого оборудования на сервере elastic. Дублируются сообщения только при большой нагрузке. В будущем будем переезжать на более хорошее.

Спасибо, попробую у себя применить этот вариант.

Про elastic - у меня сейчас он крутится на весьма слабой машинке под моим столом, в принципе, хватает просасывать логи ТЖ + ЖР. Тоже планируем переезд, сразу в виртуалку.

Пожалуйста! Соглашусь с Вашим паттерном для строк, только немного добавлю:

{
    "grok": {
      "field": "message",
      "patterns": [
        "Descr=(%{onecstr:Descr},|%{onecstr:Descr}|%{DATA:Descr},|%{GREEDYDATA:Descr})"
      ],
      "pattern_definitions": {
        "onecstr": "('|\")(?m)(.*)('|\")"
      },
      "on_failure": [
        {
          "set": {
            "field": "Descr",
            "value": ""
          }
        }
      ]
    }
  }

Обнаружил, что 1С в логах ТЖ допускает использование двух идущих подряд одинарных кавычек. Выглядят они как двойные. Вот такую строку нашел в ТЖ:

Ошибка при выполнении обработчика - ''ОбработкаЗаполнения''

Само свойство Descr начинается и заканчивается с одинарных кавычек, а вот ОбработкаЗаполнения окружена двумя идущими подряд одинарными кавычками. Хотя в мануалах к ТЖ указано, что если строка обрамлена одинарными кавычками, то в ней могут быть только двойные кавычки. Оказывается это не так :slight_smile:

Очередная подлость, на которые так щедра 1С :slight_smile:

Развернул ELK для сбора ТЖ. Версия 5.6.2

Сработало указание pipeline через

properties.pipeline: “test-pipeline”

а не как в документации https://www.elastic.co/guide/en/beats/filebeat/current/configuring-ingest-node.html

Кто-нибудь может подсказать, если ли возможность асинхронной загрузки данных в elastic, не используя фоновые задания 1С? Что-то вроде нативной очереди.